#include "utils/tqual.h"
#include "utils/varlena.h"
-#include "dblink.h"
-
PG_MODULE_MAGIC;
typedef struct remoteConn
/* initial number of connection hashes */
#define NUMCONN 16
-/* general utility */
-#define xpfree(var_) \
- do { \
- if (var_ != NULL) \
- { \
- pfree(var_); \
- var_ = NULL; \
- } \
- } while (0)
-
-#define xpstrdup(var_c, var_) \
- do { \
- if (var_ != NULL) \
- var_c = pstrdup(var_); \
- else \
- var_c = NULL; \
- } while (0)
-
-#define DBLINK_RES_INTERNALERROR(p2) \
- do { \
- msg = pchomp(PQerrorMessage(conn)); \
- if (res) \
- PQclear(res); \
- elog(ERROR, "%s: %s", p2, msg); \
- } while (0)
-
-#define DBLINK_CONN_NOT_AVAIL \
- do { \
- if(conname) \
- ereport(ERROR, \
- (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
- errmsg("connection \"%s\" not available", conname))); \
- else \
- ereport(ERROR, \
- (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
- errmsg("connection not available"))); \
- } while (0)
-
-#define DBLINK_GET_CONN \
- do { \
- char *conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
- rconn = getConnectionByName(conname_or_str); \
- if (rconn) \
- { \
- conn = rconn->conn; \
- conname = conname_or_str; \
- } \
- else \
- { \
- connstr = get_connect_string(conname_or_str); \
- if (connstr == NULL) \
- { \
- connstr = conname_or_str; \
- } \
- dblink_connstr_check(connstr); \
- conn = PQconnectdb(connstr); \
- if (PQstatus(conn) == CONNECTION_BAD) \
- { \
- msg = pchomp(PQerrorMessage(conn)); \
- PQfinish(conn); \
- ereport(ERROR, \
- (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \
- errmsg("could not establish connection"), \
- errdetail_internal("%s", msg))); \
- } \
- dblink_security_check(conn, rconn); \
- if (PQclientEncoding(conn) != GetDatabaseEncoding()) \
- PQsetClientEncoding(conn, GetDatabaseEncodingName()); \
- freeconn = true; \
- } \
- } while (0)
-
-#define DBLINK_GET_NAMED_CONN \
- do { \
- conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
- rconn = getConnectionByName(conname); \
- if (rconn) \
- conn = rconn->conn; \
- else \
- DBLINK_CONN_NOT_AVAIL; \
- } while (0)
-
-#define DBLINK_INIT \
- do { \
- if (!pconn) \
- { \
- pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
- pconn->conn = NULL; \
- pconn->openCursorCount = 0; \
- pconn->newXactForCursor = FALSE; \
- } \
- } while (0)
+static char *
+xpstrdup(const char *in)
+{
+ if (in == NULL)
+ return NULL;
+ return pstrdup(in);
+}
+
+static void pg_attribute_noreturn()
+dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
+{
+ char *msg = pchomp(PQerrorMessage(conn));
+ if (res)
+ PQclear(res);
+ elog(ERROR, "%s: %s", p2, msg);
+}
+
+static void pg_attribute_noreturn()
+dblink_conn_not_avail(const char *conname)
+{
+ if (conname)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+ errmsg("connection \"%s\" not available", conname)));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+ errmsg("connection not available")));
+}
+
+static void
+dblink_get_conn(char *conname_or_str,
+ PGconn * volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
+{
+ remoteConn *rconn = getConnectionByName(conname_or_str);
+ PGconn *conn;
+ char *conname;
+ bool freeconn;
+
+ if (rconn)
+ {
+ conn = rconn->conn;
+ conname = conname_or_str;
+ freeconn = false;
+ }
+ else
+ {
+ const char *connstr;
+
+ connstr = get_connect_string(conname_or_str);
+ if (connstr == NULL)
+ connstr = conname_or_str;
+ dblink_connstr_check(connstr);
+ conn = PQconnectdb(connstr);
+ if (PQstatus(conn) == CONNECTION_BAD)
+ {
+ char *msg = pchomp(PQerrorMessage(conn));
+ PQfinish(conn);
+ ereport(ERROR,
+ (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+ errmsg("could not establish connection"),
+ errdetail_internal("%s", msg)));
+ }
+ dblink_security_check(conn, rconn);
+ if (PQclientEncoding(conn) != GetDatabaseEncoding())
+ PQsetClientEncoding(conn, GetDatabaseEncodingName());
+ freeconn = true;
+ conname = NULL;
+ }
+
+ *conn_p = conn;
+ *conname_p = conname;
+ *freeconn_p = freeconn;
+}
+
+static PGconn *
+dblink_get_named_conn(const char *conname)
+{
+ remoteConn *rconn = getConnectionByName(conname);
+ if (rconn)
+ return rconn->conn;
+ else
+ dblink_conn_not_avail(conname);
+}
+
+static void
+dblink_init(void)
+{
+ if (!pconn)
+ {
+ pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
+ pconn->conn = NULL;
+ pconn->openCursorCount = 0;
+ pconn->newXactForCursor = FALSE;
+ }
+}
/*
* Create a persistent connection to another database
PGconn *conn = NULL;
remoteConn *rconn = NULL;
- DBLINK_INIT;
+ dblink_init();
if (PG_NARGS() == 2)
{
remoteConn *rconn = NULL;
PGconn *conn = NULL;
- DBLINK_INIT;
+ dblink_init();
if (PG_NARGS() == 1)
{
conn = pconn->conn;
if (!conn)
- DBLINK_CONN_NOT_AVAIL;
+ dblink_conn_not_avail(conname);
PQfinish(conn);
if (rconn)
Datum
dblink_open(PG_FUNCTION_ARGS)
{
- char *msg;
PGresult *res = NULL;
PGconn *conn = NULL;
char *curname = NULL;
remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible behavior */
- DBLINK_INIT;
+ dblink_init();
initStringInfo(&buf);
if (PG_NARGS() == 2)
}
if (!rconn || !rconn->conn)
- DBLINK_CONN_NOT_AVAIL;
+ dblink_conn_not_avail(conname);
else
conn = rconn->conn;
{
res = PQexec(conn, "BEGIN");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- DBLINK_RES_INTERNALERROR("begin error");
+ dblink_res_internalerror(conn, res, "begin error");
PQclear(res);
rconn->newXactForCursor = TRUE;
char *curname = NULL;
char *conname = NULL;
StringInfoData buf;
- char *msg;
remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible behavior */
- DBLINK_INIT;
+ dblink_init();
initStringInfo(&buf);
if (PG_NARGS() == 1)
}
if (!rconn || !rconn->conn)
- DBLINK_CONN_NOT_AVAIL;
+ dblink_conn_not_avail(conname);
else
conn = rconn->conn;
res = PQexec(conn, "COMMIT");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- DBLINK_RES_INTERNALERROR("commit error");
+ dblink_res_internalerror(conn, res, "commit error");
PQclear(res);
}
}
prepTuplestoreResult(fcinfo);
- DBLINK_INIT;
+ dblink_init();
if (PG_NARGS() == 4)
{
}
if (!conn)
- DBLINK_CONN_NOT_AVAIL;
+ dblink_conn_not_avail(conname);
initStringInfo(&buf);
appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
Datum
dblink_send_query(PG_FUNCTION_ARGS)
{
- char *conname = NULL;
- PGconn *conn = NULL;
- char *sql = NULL;
- remoteConn *rconn = NULL;
+ PGconn *conn;
+ char *sql;
int retval;
if (PG_NARGS() == 2)
{
- DBLINK_GET_NAMED_CONN;
+ conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
}
else
prepTuplestoreResult(fcinfo);
- DBLINK_INIT;
+ dblink_init();
PG_TRY();
{
- char *msg;
- char *connstr = NULL;
char *sql = NULL;
char *conname = NULL;
- remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible */
if (!is_async)
if (PG_NARGS() == 3)
{
/* text,text,bool */
- DBLINK_GET_CONN;
+ dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
fail = PG_GETARG_BOOL(2);
}
}
else
{
- DBLINK_GET_CONN;
+ dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
}
}
if (PG_NARGS() == 2)
{
/* text,bool */
- DBLINK_GET_NAMED_CONN;
+ conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
fail = PG_GETARG_BOOL(1);
}
else if (PG_NARGS() == 1)
{
/* text */
- DBLINK_GET_NAMED_CONN;
+ conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
}
else
/* shouldn't happen */
}
if (!conn)
- DBLINK_CONN_NOT_AVAIL;
+ dblink_conn_not_avail(conname);
if (!is_async)
{
Datum
dblink_is_busy(PG_FUNCTION_ARGS)
{
- char *conname = NULL;
- PGconn *conn = NULL;
- remoteConn *rconn = NULL;
+ PGconn *conn;
- DBLINK_INIT;
- DBLINK_GET_NAMED_CONN;
+ dblink_init();
+ conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
PQconsumeInput(conn);
PG_RETURN_INT32(PQisBusy(conn));
Datum
dblink_cancel_query(PG_FUNCTION_ARGS)
{
- int res = 0;
- char *conname = NULL;
- PGconn *conn = NULL;
- remoteConn *rconn = NULL;
+ int res;
+ PGconn *conn;
PGcancel *cancel;
char errbuf[256];
- DBLINK_INIT;
- DBLINK_GET_NAMED_CONN;
+ dblink_init();
+ conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
cancel = PQgetCancel(conn);
res = PQcancel(cancel, errbuf, 256);
dblink_error_message(PG_FUNCTION_ARGS)
{
char *msg;
- char *conname = NULL;
- PGconn *conn = NULL;
- remoteConn *rconn = NULL;
+ PGconn *conn;
- DBLINK_INIT;
- DBLINK_GET_NAMED_CONN;
+ dblink_init();
+ conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
msg = PQerrorMessage(conn);
if (msg == NULL || msg[0] == '\0')
PGconn *volatile conn = NULL;
volatile bool freeconn = false;
- DBLINK_INIT;
+ dblink_init();
PG_TRY();
{
- char *msg;
PGresult *res = NULL;
- char *connstr = NULL;
char *sql = NULL;
char *conname = NULL;
- remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible behavior */
if (PG_NARGS() == 3)
{
/* must be text,text,bool */
- DBLINK_GET_CONN;
+ dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
fail = PG_GETARG_BOOL(2);
}
}
else
{
- DBLINK_GET_CONN;
+ dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
}
}
elog(ERROR, "wrong number of arguments");
if (!conn)
- DBLINK_CONN_NOT_AVAIL;
+ dblink_conn_not_avail(conname);
res = PQexec(conn, sql);
if (!res ||
Datum
dblink_get_notify(PG_FUNCTION_ARGS)
{
- char *conname = NULL;
- PGconn *conn = NULL;
- remoteConn *rconn = NULL;
+ PGconn *conn;
PGnotify *notify;
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
prepTuplestoreResult(fcinfo);
- DBLINK_INIT;
+ dblink_init();
if (PG_NARGS() == 1)
- DBLINK_GET_NAMED_CONN;
+ conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
else
conn = pconn->conn;
else
sqlstate = ERRCODE_CONNECTION_FAILURE;
- xpstrdup(message_primary, pg_diag_message_primary);
- xpstrdup(message_detail, pg_diag_message_detail);
- xpstrdup(message_hint, pg_diag_message_hint);
- xpstrdup(message_context, pg_diag_context);
+ message_primary = xpstrdup(pg_diag_message_primary);
+ message_detail = xpstrdup(pg_diag_message_detail);
+ message_hint = xpstrdup(pg_diag_message_hint);
+ message_context = xpstrdup(pg_diag_context);
/*
* If we don't get a message from the PGresult, try the PGconn. This