* Functions returning results from a remote database
*
* Joe Conway <mail@joeconway.com>
+ * And contributors:
+ * Darko Prenosil <Darko.Prenosil@finteh.hr>
+ * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
*
- * Copyright (c) 2001, 2002 by PostgreSQL Global Development Group
+ * Copyright (c) 2001, 2002, 2003 by PostgreSQL Global Development Group
* ALL RIGHTS RESERVED;
*
* Permission to use, copy, modify, and distribute this software and its
*
*/
#include "postgres.h"
-
#include "libpq-fe.h"
-
#include "fmgr.h"
#include "funcapi.h"
#include "access/tupdesc.h"
#include "utils/array.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
+#include "utils/palloc.h"
+#include "utils/dynahash.h"
+#include "utils/hsearch.h"
+#include "utils/memutils.h"
#include "dblink.h"
+typedef struct remoteConn
+{
+ PGconn *con; /* Hold the remote connection */
+ bool remoteTrFlag; /* Indicates whether or not a transaction
+ * on remote database is in progress*/
+} remoteConn;
+
/*
* Internal declarations
*/
-static dblink_results *init_dblink_results(MemoryContext fn_mcxt);
+static remoteConn *getConnectionByName(const char *name);
+static HTAB *createConnHash(void);
+static bool createNewConnection(const char *name,remoteConn *con);
+static void deleteConnection(const char *name);
static char **get_pkey_attnames(Oid relid, int16 *numatts);
static char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
static char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
static int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key);
static HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
static Oid get_relid_from_relname(text *relname_text);
-static dblink_results *get_res_ptr(int32 res_id_index);
-static void append_res_ptr(dblink_results * results);
-static void remove_res_ptr(dblink_results * results);
static TupleDesc pgresultGetTupleDesc(PGresult *res);
static char *generate_relation_name(Oid relid);
/* Global */
-List *res_id = NIL;
-int res_id_index = 0;
-PGconn *persistent_conn = NULL;
+List *res_id = NIL;
+int res_id_index = 0;
+PGconn *persistent_conn = NULL;
+static HTAB *remoteConnHash=NULL;
+
+/*
+Following is list that holds multiple remote connections.
+Calling convention of each dblink function changes to accept
+connection name as the first parameter. The connection list is
+much like ecpg e.g. a mapping between a name and a PGconn object.
+*/
+
+typedef struct remoteConnHashEnt
+{
+ char name[NAMEDATALEN];
+ remoteConn *rcon;
+} remoteConnHashEnt;
+
+/* initial number of connection hashes */
+#define NUMCONN 16
+/* general utility */
#define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp)))
#define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
#define xpfree(var_) \
var_ = NULL; \
} \
} while (0)
+#define DBLINK_RES_ERROR(p1, p2) \
+ do { \
+ msg = pstrdup(PQerrorMessage(conn)); \
+ if (res) \
+ PQclear(res); \
+ elog(ERROR, "%s: %s: %s", p1, p2, msg); \
+ } while (0)
+#define DBLINK_CONN_NOT_AVAIL(p1) \
+ do { \
+ if(conname) \
+ elog(ERROR, "%s: connection %s not available", p1, conname); \
+ else \
+ elog(ERROR, "%s: connection not available", p1); \
+ } while (0)
+#define DBLINK_GET_CONN(p1) \
+ do { \
+ char *conname_or_str = GET_STR(PG_GETARG_TEXT_P(0)); \
+ rcon = getConnectionByName(conname_or_str); \
+ if(rcon) \
+ { \
+ conn = rcon->con; \
+ freeconn = false; \
+ } \
+ else \
+ { \
+ connstr = conname_or_str; \
+ conn = PQconnectdb(connstr); \
+ if (PQstatus(conn) == CONNECTION_BAD) \
+ { \
+ msg = pstrdup(PQerrorMessage(conn)); \
+ PQfinish(conn); \
+ elog(ERROR, "%s: connection error: %s", p1, msg); \
+ } \
+ } \
+ } while (0)
/*
Datum
dblink_connect(PG_FUNCTION_ARGS)
{
- char *connstr = GET_STR(PG_GETARG_TEXT_P(0));
+ char *connstr = NULL;
+ char *connname = NULL;
char *msg;
- text *result_text;
MemoryContext oldcontext;
+ PGconn *conn = NULL;
+ remoteConn *rcon = NULL;
- if (persistent_conn != NULL)
- PQfinish(persistent_conn);
+ if(PG_NARGS()==2)
+ {
+ connstr = GET_STR(PG_GETARG_TEXT_P(1));
+ connname = GET_STR(PG_GETARG_TEXT_P(0));
+ }
+ else if(PG_NARGS()==1)
+ connstr = GET_STR(PG_GETARG_TEXT_P(0));
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
- persistent_conn = PQconnectdb(connstr);
+
+ if(connname)
+ rcon=(remoteConn *) palloc(sizeof(remoteConn));
+ conn = PQconnectdb(connstr);
+
MemoryContextSwitchTo(oldcontext);
- if (PQstatus(persistent_conn) == CONNECTION_BAD)
+ if (PQstatus(conn) == CONNECTION_BAD)
{
- msg = pstrdup(PQerrorMessage(persistent_conn));
- PQfinish(persistent_conn);
- persistent_conn = NULL;
+ msg = pstrdup(PQerrorMessage(conn));
+ PQfinish(conn);
+ if(rcon)
+ pfree(rcon);
elog(ERROR, "dblink_connect: connection error: %s", msg);
}
- result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
- PG_RETURN_TEXT_P(result_text);
+ if(connname)
+ {
+ rcon->con = conn;
+ if(createNewConnection(connname, rcon) == false)
+ {
+ PQfinish(conn);
+ pfree(rcon);
+ elog(ERROR, "dblink_connect: cannot save named connection");
+ }
+ }
+ else
+ persistent_conn = conn;
+
+ PG_RETURN_TEXT_P(GET_TEXT("OK"));
}
/*
Datum
dblink_disconnect(PG_FUNCTION_ARGS)
{
- text *result_text;
+ char *str = NULL;
+ remoteConn *rcon = NULL;
+ PGconn *conn = NULL;
+
+ if (PG_NARGS() ==1 )
+ {
+ str = GET_STR(PG_GETARG_TEXT_P(0));
+ rcon = getConnectionByName(str);
+ if (rcon)
+ conn = rcon->con;
+ }
+ else
+ conn = persistent_conn;
- if (persistent_conn != NULL)
- PQfinish(persistent_conn);
+ if (!conn)
+ {
+ if (str)
+ elog(ERROR,"dblink_disconnect: connection named \"%s\" not found",
+ str);
+ else
+ elog(ERROR,"dblink_disconnect: connection not found");
+ }
- persistent_conn = NULL;
+ PQfinish(conn);
+ if (rcon)
+ {
+ deleteConnection(str);
+ pfree(rcon);
+ }
- result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
- PG_RETURN_TEXT_P(result_text);
+ PG_RETURN_TEXT_P(GET_TEXT("OK"));
}
/*
char *msg;
PGresult *res = NULL;
PGconn *conn = NULL;
- text *result_text;
- char *curname = GET_STR(PG_GETARG_TEXT_P(0));
- char *sql = GET_STR(PG_GETARG_TEXT_P(1));
+ char *curname = NULL;
+ char *sql = NULL;
+ char *conname = NULL;
StringInfo str = makeStringInfo();
+ remoteConn *rcon = NULL;
- if (persistent_conn != NULL)
+ if(PG_NARGS() == 2)
+ {
+ curname = GET_STR(PG_GETARG_TEXT_P(0));
+ sql = GET_STR(PG_GETARG_TEXT_P(1));
conn = persistent_conn;
- else
- elog(ERROR, "dblink_open: no connection available");
+ }
+ else if(PG_NARGS() == 3)
+ {
+ conname = GET_STR(PG_GETARG_TEXT_P(0));
+ curname = GET_STR(PG_GETARG_TEXT_P(1));
+ sql = GET_STR(PG_GETARG_TEXT_P(2));
+ rcon = getConnectionByName(conname);
+ if (rcon)
+ conn = rcon->con;
+ }
+
+ if (!conn)
+ DBLINK_CONN_NOT_AVAIL("dblink_open");
res = PQexec(conn, "BEGIN");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- {
- msg = pstrdup(PQerrorMessage(conn));
- PQclear(res);
+ DBLINK_RES_ERROR("dblink_open", "begin error");
- PQfinish(conn);
- persistent_conn = NULL;
-
- elog(ERROR, "dblink_open: begin error: %s", msg);
- }
PQclear(res);
appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
- {
- msg = pstrdup(PQerrorMessage(conn));
-
- PQclear(res);
-
- PQfinish(conn);
- persistent_conn = NULL;
+ DBLINK_RES_ERROR("dblink_open", "sql error");
- elog(ERROR, "dblink: sql error: %s", msg);
- }
+ PQclear(res);
- result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
- PG_RETURN_TEXT_P(result_text);
+ PG_RETURN_TEXT_P(GET_TEXT("OK"));
}
/*
{
PGconn *conn = NULL;
PGresult *res = NULL;
- char *curname = GET_STR(PG_GETARG_TEXT_P(0));
+ char *curname = NULL;
+ char *conname = NULL;
StringInfo str = makeStringInfo();
- text *result_text;
char *msg;
+ remoteConn *rcon = NULL;
- if (persistent_conn != NULL)
+ if (PG_NARGS() == 1)
+ {
+ curname = GET_STR(PG_GETARG_TEXT_P(0));
conn = persistent_conn;
- else
- elog(ERROR, "dblink_close: no connection available");
+ }
+ else if (PG_NARGS()==2)
+ {
+ conname = GET_STR(PG_GETARG_TEXT_P(0));
+ curname = GET_STR(PG_GETARG_TEXT_P(1));
+ rcon = getConnectionByName(conname);
+ if(rcon)
+ conn = rcon->con;
+ }
+
+ if (!conn)
+ DBLINK_CONN_NOT_AVAIL("dblink_close");
appendStringInfo(str, "CLOSE %s", curname);
/* close the cursor */
res = PQexec(conn, str->data);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
- {
- msg = pstrdup(PQerrorMessage(conn));
- PQclear(res);
-
- PQfinish(persistent_conn);
- persistent_conn = NULL;
-
- elog(ERROR, "dblink_close: sql error: %s", msg);
- }
+ DBLINK_RES_ERROR("dblink_close", "sql error");
PQclear(res);
/* commit the transaction */
res = PQexec(conn, "COMMIT");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- {
- msg = pstrdup(PQerrorMessage(conn));
- PQclear(res);
-
- PQfinish(persistent_conn);
- persistent_conn = NULL;
+ DBLINK_RES_ERROR("dblink_close", "commit error");
- elog(ERROR, "dblink_close: commit error: %s", msg);
- }
PQclear(res);
- result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
- PG_RETURN_TEXT_P(result_text);
+ PG_RETURN_TEXT_P(GET_TEXT("OK"));
}
/*
char *msg;
PGresult *res = NULL;
MemoryContext oldcontext;
+ char *conname = NULL;
+ remoteConn *rcon=NULL;
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
Oid funcid = fcinfo->flinfo->fn_oid;
PGconn *conn = NULL;
StringInfo str = makeStringInfo();
- char *curname = GET_STR(PG_GETARG_TEXT_P(0));
- int howmany = PG_GETARG_INT32(1);
+ char *curname = NULL;
+ int howmany = 0;
+
+ if (PG_NARGS() == 3)
+ {
+ conname = GET_STR(PG_GETARG_TEXT_P(0));
+ curname = GET_STR(PG_GETARG_TEXT_P(1));
+ howmany = PG_GETARG_INT32(2);
+
+ rcon = getConnectionByName(conname);
+ if(rcon)
+ conn = rcon->con;
+ }
+ else if (PG_NARGS() == 2)
+ {
+ curname = GET_STR(PG_GETARG_TEXT_P(0));
+ howmany = PG_GETARG_INT32(1);
+ conn = persistent_conn;
+ }
+
+ if(!conn)
+ DBLINK_CONN_NOT_AVAIL("dblink_fetch");
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
*/
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
- if (persistent_conn != NULL)
- conn = persistent_conn;
- else
- elog(ERROR, "dblink_fetch: no connection available");
-
appendStringInfo(str, "FETCH %d FROM %s", howmany, curname);
res = PQexec(conn, str->data);
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
{
- msg = pstrdup(PQerrorMessage(conn));
- PQclear(res);
-
- PQfinish(persistent_conn);
- persistent_conn = NULL;
-
- elog(ERROR, "dblink_fetch: sql error: %s", msg);
+ DBLINK_RES_ERROR("dblink_fetch", "sql error");
}
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
/* cursor does not exist - closed already or bad name */
PQclear(res);
- elog(ERROR, "dblink_fetch: cursor %s does not exist", curname);
+ elog(ERROR, "dblink_fetch: cursor not found: %s", curname);
}
funcctx->max_calls = PQntuples(res);
SRF_RETURN_NEXT(funcctx, result);
}
else
-/* do when there is no more left */
{
+ /* do when there is no more left */
PQclear(res);
SRF_RETURN_DONE(funcctx);
}
bool is_sql_cmd = false;
char *sql_cmd_status = NULL;
MemoryContext oldcontext;
+ bool freeconn = true;
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
PGconn *conn = NULL;
char *connstr = NULL;
char *sql = NULL;
+ char *conname = NULL;
+ remoteConn *rcon=NULL;
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
*/
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
- if (fcinfo->nargs == 2)
+ if (PG_NARGS() == 2)
{
- connstr = GET_STR(PG_GETARG_TEXT_P(0));
+ DBLINK_GET_CONN("dblink");
sql = GET_STR(PG_GETARG_TEXT_P(1));
-
- conn = PQconnectdb(connstr);
- if (PQstatus(conn) == CONNECTION_BAD)
- {
- msg = pstrdup(PQerrorMessage(conn));
- PQfinish(conn);
- elog(ERROR, "dblink: connection error: %s", msg);
- }
}
- else if (fcinfo->nargs == 1)
+ else if (PG_NARGS() == 1)
{
+ conn = persistent_conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
-
- if (persistent_conn != NULL)
- conn = persistent_conn;
- else
- elog(ERROR, "dblink: no connection available");
}
else
elog(ERROR, "dblink: wrong number of arguments");
+ if(!conn)
+ DBLINK_CONN_NOT_AVAIL("dblink_record");
+
res = PQexec(conn, sql);
if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
+ DBLINK_RES_ERROR("dblink", "sql error");
+
+ if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
- msg = pstrdup(PQerrorMessage(conn));
- PQclear(res);
- PQfinish(conn);
- if (fcinfo->nargs == 1)
- persistent_conn = NULL;
+ is_sql_cmd = true;
+
+ /* need a tuple descriptor representing one TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1, false);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+ TEXTOID, -1, 0, false);
- elog(ERROR, "dblink: sql error: %s", msg);
+ /*
+ * and save a copy of the command status string to return
+ * as our result tuple
+ */
+ sql_cmd_status = PQcmdStatus(res);
+ funcctx->max_calls = 1;
}
else
- {
- if (PQresultStatus(res) == PGRES_COMMAND_OK)
- {
- is_sql_cmd = true;
-
- /* need a tuple descriptor representing one TEXT column */
- tupdesc = CreateTemplateTupleDesc(1, false);
- TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
- TEXTOID, -1, 0, false);
-
- /*
- * and save a copy of the command status string to return
- * as our result tuple
- */
- sql_cmd_status = PQcmdStatus(res);
- funcctx->max_calls = 1;
- }
- else
- funcctx->max_calls = PQntuples(res);
+ funcctx->max_calls = PQntuples(res);
- /* got results, keep track of them */
- funcctx->user_fctx = res;
+ /* got results, keep track of them */
+ funcctx->user_fctx = res;
- /* if needed, close the connection to the database and cleanup */
- if (fcinfo->nargs == 2)
- PQfinish(conn);
- }
+ /* if needed, close the connection to the database and cleanup */
+ if (freeconn && PG_NARGS() == 2)
+ PQfinish(conn);
/* fast track when no results */
if (funcctx->max_calls < 1)
SRF_RETURN_NEXT(funcctx, result);
}
else
-/* do when there is no more left */
{
+ /* do when there is no more left */
PQclear(res);
SRF_RETURN_DONE(funcctx);
}
{
char *msg;
PGresult *res = NULL;
- char *sql_cmd_status = NULL;
+ text *sql_cmd_status = NULL;
TupleDesc tupdesc = NULL;
- text *result_text;
PGconn *conn = NULL;
char *connstr = NULL;
char *sql = NULL;
+ char *conname = NULL;
+ remoteConn *rcon=NULL;
+ bool freeconn = true;
- if (fcinfo->nargs == 2)
+ if (PG_NARGS() == 2)
{
- connstr = GET_STR(PG_GETARG_TEXT_P(0));
+ DBLINK_GET_CONN("dblink_exec");
sql = GET_STR(PG_GETARG_TEXT_P(1));
-
- conn = PQconnectdb(connstr);
- if (PQstatus(conn) == CONNECTION_BAD)
- {
- msg = pstrdup(PQerrorMessage(conn));
- PQfinish(conn);
- elog(ERROR, "dblink_exec: connection error: %s", msg);
- }
}
- else if (fcinfo->nargs == 1)
+ else if (PG_NARGS() == 1)
{
+ conn = persistent_conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
-
- if (persistent_conn != NULL)
- conn = persistent_conn;
- else
- elog(ERROR, "dblink_exec: no connection available");
}
else
elog(ERROR, "dblink_exec: wrong number of arguments");
+ if(!conn)
+ DBLINK_CONN_NOT_AVAIL("dblink_exec");
res = PQexec(conn, sql);
- if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
+ if (!res ||
+ (PQresultStatus(res) != PGRES_COMMAND_OK &&
+ PQresultStatus(res) != PGRES_TUPLES_OK))
+ DBLINK_RES_ERROR("dblink_exec", "sql error");
+
+ if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
- msg = pstrdup(PQerrorMessage(conn));
- PQclear(res);
- PQfinish(conn);
- if (fcinfo->nargs == 1)
- persistent_conn = NULL;
+ /* need a tuple descriptor representing one TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1, false);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+ TEXTOID, -1, 0, false);
- elog(ERROR, "dblink_exec: sql error: %s", msg);
+ /*
+ * and save a copy of the command status string to return as
+ * our result tuple
+ */
+ sql_cmd_status = GET_TEXT(PQcmdStatus(res));
}
else
- {
- if (PQresultStatus(res) == PGRES_COMMAND_OK)
- {
- /* need a tuple descriptor representing one TEXT column */
- tupdesc = CreateTemplateTupleDesc(1, false);
- TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
- TEXTOID, -1, 0, false);
+ elog(ERROR, "dblink_exec: queries returning results not allowed");
- /*
- * and save a copy of the command status string to return as
- * our result tuple
- */
- sql_cmd_status = PQcmdStatus(res);
- }
- else
- elog(ERROR, "dblink_exec: queries returning results not allowed");
- }
PQclear(res);
/* if needed, close the connection to the database and cleanup */
- if (fcinfo->nargs == 2)
+ if (freeconn && fcinfo->nargs == 2)
PQfinish(conn);
- result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql_cmd_status)));
- PG_RETURN_TEXT_P(result_text);
+ PG_RETURN_TEXT_P(sql_cmd_status);
}
-/*
- * Note: this original version of dblink is DEPRECATED;
- * it *will* be removed in favor of the new version on next release
- */
-PG_FUNCTION_INFO_V1(dblink);
-Datum
-dblink(PG_FUNCTION_ARGS)
-{
- PGconn *conn = NULL;
- PGresult *res = NULL;
- dblink_results *results;
- char *optstr;
- char *sqlstatement;
- char *execstatement;
- char *msg;
- int ntuples = 0;
- ReturnSetInfo *rsi;
-
- if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
- elog(ERROR, "dblink: function called in context that does not accept a set result");
-
- optstr = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
- sqlstatement = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
-
- if (fcinfo->flinfo->fn_extra == NULL)
- {
-
- conn = PQconnectdb(optstr);
- if (PQstatus(conn) == CONNECTION_BAD)
- {
- msg = pstrdup(PQerrorMessage(conn));
- PQfinish(conn);
- elog(ERROR, "dblink: connection error: %s", msg);
- }
-
- execstatement = (char *) palloc(strlen(sqlstatement) + 1);
- if (execstatement != NULL)
- {
- strcpy(execstatement, sqlstatement);
- strcat(execstatement, "\0");
- }
- else
- elog(ERROR, "dblink: insufficient memory");
-
- res = PQexec(conn, execstatement);
- if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
- {
- msg = pstrdup(PQerrorMessage(conn));
- PQclear(res);
- PQfinish(conn);
- elog(ERROR, "dblink: sql error: %s", msg);
- }
- else
- {
- /*
- * got results, start fetching them
- */
- ntuples = PQntuples(res);
-
- /*
- * increment resource index
- */
- res_id_index++;
-
- results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
- results->tup_num = 0;
- results->res_id_index = res_id_index;
- results->res = res;
-
- /*
- * Append node to res_id to hold pointer to results. Needed by
- * dblink_tok to access the data
- */
- append_res_ptr(results);
-
- /*
- * save pointer to results for the next function manager call
- */
- fcinfo->flinfo->fn_extra = (void *) results;
-
- /* close the connection to the database and cleanup */
- PQfinish(conn);
-
- rsi = (ReturnSetInfo *) fcinfo->resultinfo;
- rsi->isDone = ExprMultipleResult;
-
- PG_RETURN_INT32(res_id_index);
- }
- }
- else
- {
- /*
- * check for more results
- */
- results = fcinfo->flinfo->fn_extra;
-
- results->tup_num++;
- res_id_index = results->res_id_index;
- ntuples = PQntuples(results->res);
-
- if (results->tup_num < ntuples)
- {
- /*
- * fetch them if available
- */
-
- rsi = (ReturnSetInfo *) fcinfo->resultinfo;
- rsi->isDone = ExprMultipleResult;
-
- PG_RETURN_INT32(res_id_index);
- }
- else
- {
- /*
- * or if no more, clean things up
- */
- results = fcinfo->flinfo->fn_extra;
-
- remove_res_ptr(results);
- PQclear(results->res);
- pfree(results);
- fcinfo->flinfo->fn_extra = NULL;
-
- rsi = (ReturnSetInfo *) fcinfo->resultinfo;
- rsi->isDone = ExprEndResult;
-
- PG_RETURN_NULL();
- }
- }
- PG_RETURN_NULL();
-}
-
-/*
- * Note: dblink_tok is DEPRECATED;
- * it *will* be removed in favor of the new version on next release
- *
- * dblink_tok
- * parse dblink output string
- * return fldnum item (0 based)
- * based on provided field separator
- */
-PG_FUNCTION_INFO_V1(dblink_tok);
-Datum
-dblink_tok(PG_FUNCTION_ARGS)
-{
- dblink_results *results;
- int fldnum;
- text *result_text;
- char *result;
- int nfields = 0;
- int text_len = 0;
-
- results = get_res_ptr(PG_GETARG_INT32(0));
- if (results == NULL)
- {
- if (res_id != NIL)
- {
- freeList(res_id);
- res_id = NIL;
- res_id_index = 0;
- }
-
- elog(ERROR, "dblink_tok: function called with invalid resource id");
- }
-
- fldnum = PG_GETARG_INT32(1);
- if (fldnum < 0)
- elog(ERROR, "dblink_tok: field number < 0 not permitted");
-
- nfields = PQnfields(results->res);
- if (fldnum > (nfields - 1))
- elog(ERROR, "dblink_tok: field number %d does not exist", fldnum);
-
- if (PQgetisnull(results->res, results->tup_num, fldnum) == 1)
- PG_RETURN_NULL();
- else
- {
- text_len = PQgetlength(results->res, results->tup_num, fldnum);
-
- result = (char *) palloc(text_len + 1);
-
- if (result != NULL)
- {
- strcpy(result, PQgetvalue(results->res, results->tup_num, fldnum));
- strcat(result, "\0");
- }
- else
- elog(ERROR, "dblink: insufficient memory");
-
- result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result)));
-
- PG_RETURN_TEXT_P(result_text);
- }
-}
/*
* dblink_get_pkey
funcctx->user_fctx = results;
}
else
-/* fast track when no results */
+ /* fast track when no results */
SRF_RETURN_DONE(funcctx);
MemoryContextSwitchTo(oldcontext);
SRF_RETURN_NEXT(funcctx, result);
}
else
-/* do when there is no more left */
- SRF_RETURN_DONE(funcctx);
-}
-
-/*
- * Note: dblink_last_oid is DEPRECATED;
- * it *will* be removed on next release
- *
- * dblink_last_oid
- * return last inserted oid
- */
-PG_FUNCTION_INFO_V1(dblink_last_oid);
-Datum
-dblink_last_oid(PG_FUNCTION_ARGS)
-{
- dblink_results *results;
-
- results = get_res_ptr(PG_GETARG_INT32(0));
- if (results == NULL)
{
- if (res_id != NIL)
- {
- freeList(res_id);
- res_id = NIL;
- res_id_index = 0;
- }
-
- elog(ERROR, "dblink_tok: function called with invalid resource id");
+ /* do when there is no more left */
+ SRF_RETURN_DONE(funcctx);
}
-
- PG_RETURN_OID(PQoidValue(results->res));
}
int i;
char *ptr;
char *sql;
- text *sql_text;
int16 typlen;
bool typbyval;
char typalign;
*/
sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
- /*
- * Make it into TEXT for return to the client
- */
- sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
-
/*
* And send it
*/
- PG_RETURN_TEXT_P(sql_text);
+ PG_RETURN_TEXT_P(GET_TEXT(sql));
}
int i;
char *ptr;
char *sql;
- text *sql_text;
int16 typlen;
bool typbyval;
char typalign;
*/
sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals);
- /*
- * Make it into TEXT for return to the client
- */
- sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
-
/*
* And send it
*/
- PG_RETURN_TEXT_P(sql_text);
+ PG_RETURN_TEXT_P(GET_TEXT(sql));
}
int i;
char *ptr;
char *sql;
- text *sql_text;
int16 typlen;
bool typbyval;
char typalign;
*/
sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
- /*
- * Make it into TEXT for return to the client
- */
- sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
-
/*
* And send it
*/
- PG_RETURN_TEXT_P(sql_text);
+ PG_RETURN_TEXT_P(GET_TEXT(sql));
}
/*
Datum
dblink_current_query(PG_FUNCTION_ARGS)
{
- text *result_text;
-
- result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(debug_query_string)));
- PG_RETURN_TEXT_P(result_text);
+ PG_RETURN_TEXT_P(GET_TEXT(debug_query_string));
}
*/
-/*
- * init_dblink_results
- * - create an empty dblink_results data structure
- */
-static dblink_results *
-init_dblink_results(MemoryContext fn_mcxt)
-{
- MemoryContext oldcontext;
- dblink_results *retval;
-
- oldcontext = MemoryContextSwitchTo(fn_mcxt);
-
- retval = (dblink_results *) palloc0(sizeof(dblink_results));
-
- retval->tup_num = -1;
- retval->res_id_index = -1;
- retval->res = NULL;
-
- MemoryContextSwitchTo(oldcontext);
-
- return retval;
-}
-
/*
* get_pkey_attnames
*
/* we're only interested if it is the primary key */
if (index->indisprimary == TRUE)
{
- *numatts = index->indnatts;
+ i = 0;
+ while (index->indkey[i++] != 0)
+ (*numatts)++;
+
if (*numatts > 0)
{
result = (char **) palloc(*numatts * sizeof(char *));
return relid;
}
-static dblink_results *
-get_res_ptr(int32 res_id_index)
-{
- List *ptr;
-
- /*
- * short circuit empty list
- */
- if (res_id == NIL)
- return NULL;
-
- /*
- * OK, should be good to go
- */
- foreach(ptr, res_id)
- {
- dblink_results *this_res_id = (dblink_results *) lfirst(ptr);
-
- if (this_res_id->res_id_index == res_id_index)
- return this_res_id;
- }
- return NULL;
-}
-
-/*
- * Add node to global List res_id
- */
-static void
-append_res_ptr(dblink_results * results)
-{
- res_id = lappend(res_id, results);
-}
-
-/*
- * Remove node from global List
- * using res_id_index
- */
-static void
-remove_res_ptr(dblink_results * results)
-{
- res_id = lremove(results, res_id);
-
- if (res_id == NIL)
- res_id_index = 0;
-}
-
static TupleDesc
pgresultGetTupleDesc(PGresult *res)
{
return result;
}
+
+
+static remoteConn *
+getConnectionByName(const char *name)
+{
+ remoteConnHashEnt *hentry;
+ char key[NAMEDATALEN];
+
+ if(!remoteConnHash)
+ remoteConnHash=createConnHash();
+
+ MemSet(key, 0, NAMEDATALEN);
+ snprintf(key, NAMEDATALEN - 1, "%s", name);
+ hentry = (remoteConnHashEnt*) hash_search(remoteConnHash,
+ key, HASH_FIND, NULL);
+
+ if(hentry)
+ return(hentry->rcon);
+
+ return(NULL);
+}
+
+static HTAB *
+createConnHash(void)
+{
+ HASHCTL ctl;
+ HTAB *ptr;
+
+ ctl.keysize = NAMEDATALEN;
+ ctl.entrysize = sizeof(remoteConnHashEnt);
+
+ ptr=hash_create("Remote Con hash", NUMCONN, &ctl, HASH_ELEM);
+
+ if(!ptr)
+ elog(ERROR,"Can not create connections hash table. Out of memory");
+
+ return(ptr);
+}
+
+static bool
+createNewConnection(const char *name, remoteConn *con)
+{
+ remoteConnHashEnt *hentry;
+ bool found;
+ char key[NAMEDATALEN];
+
+ if(!remoteConnHash)
+ remoteConnHash=createConnHash();
+
+ MemSet(key, 0, NAMEDATALEN);
+ snprintf(key, NAMEDATALEN - 1, "%s", name);
+ hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
+ HASH_ENTER, &found);
+
+ if(!hentry)
+ elog(ERROR, "failed to create connection");
+
+ if(found)
+ {
+ elog(NOTICE, "cannot use a connection name more than once");
+ return false;
+ }
+
+ hentry->rcon = con;
+ strncpy(hentry->name, name, NAMEDATALEN - 1);
+
+ return true;
+}
+
+static void
+deleteConnection(const char *name)
+{
+ remoteConnHashEnt *hentry;
+ bool found;
+ char key[NAMEDATALEN];
+
+ if(!remoteConnHash)
+ remoteConnHash=createConnHash();
+
+ MemSet(key, 0, NAMEDATALEN);
+ snprintf(key, NAMEDATALEN - 1, "%s", name);
+
+ hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
+ key, HASH_REMOVE, &found);
+
+ if(!hentry)
+ elog(WARNING,"Trying to delete a connection that does not exist");
+}