typedef struct remoteConn
{
- PGconn *conn; /* Hold the remote connection */
- int autoXactCursors;/* Indicates the number of open cursors,
- * non-zero means we opened the xact ourselves */
+ PGconn *conn; /* Hold the remote connection */
+ int openCursorCount; /* The number of open cursors */
+ bool newXactForCursor; /* Opened a transaction for a cursor */
} remoteConn;
/*
static char *generate_relation_name(Oid relid);
/* Global */
-List *res_id = NIL;
-int res_id_index = 0;
-PGconn *persistent_conn = NULL;
-static HTAB *remoteConnHash = NULL;
+static remoteConn *pconn = NULL;
+static HTAB *remoteConnHash = NULL;
/*
* Following is list that holds multiple remote connections.
} \
} while (0)
+#define DBLINK_INIT \
+ do { \
+ if (!pconn) \
+ { \
+ pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
+ pconn->conn = NULL; \
+ pconn->openCursorCount = 0; \
+ pconn->newXactForCursor = FALSE; \
+ } \
+ } while (0)
/*
* Create a persistent connection to another database
PGconn *conn = NULL;
remoteConn *rconn = NULL;
+ DBLINK_INIT;
+
if (PG_NARGS() == 2)
{
connstr = GET_STR(PG_GETARG_TEXT_P(1));
createNewConnection(connname, rconn);
}
else
- persistent_conn = conn;
+ pconn->conn = conn;
PG_RETURN_TEXT_P(GET_TEXT("OK"));
}
remoteConn *rconn = NULL;
PGconn *conn = NULL;
+ DBLINK_INIT;
+
if (PG_NARGS() == 1)
{
conname = GET_STR(PG_GETARG_TEXT_P(0));
conn = rconn->conn;
}
else
- conn = persistent_conn;
+ conn = pconn->conn;
if (!conn)
DBLINK_CONN_NOT_AVAIL;
pfree(rconn);
}
else
- persistent_conn = NULL;
+ pconn->conn = NULL;
PG_RETURN_TEXT_P(GET_TEXT("OK"));
}
remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible behavior */
+ DBLINK_INIT;
+
if (PG_NARGS() == 2)
{
/* text,text */
curname = GET_STR(PG_GETARG_TEXT_P(0));
sql = GET_STR(PG_GETARG_TEXT_P(1));
- conn = persistent_conn;
+ rconn = pconn;
}
else if (PG_NARGS() == 3)
{
curname = GET_STR(PG_GETARG_TEXT_P(0));
sql = GET_STR(PG_GETARG_TEXT_P(1));
fail = PG_GETARG_BOOL(2);
- conn = persistent_conn;
+ rconn = pconn;
}
else
{
curname = GET_STR(PG_GETARG_TEXT_P(1));
sql = GET_STR(PG_GETARG_TEXT_P(2));
rconn = getConnectionByName(conname);
- if (rconn)
- conn = rconn->conn;
}
}
else if (PG_NARGS() == 4)
sql = GET_STR(PG_GETARG_TEXT_P(2));
fail = PG_GETARG_BOOL(3);
rconn = getConnectionByName(conname);
- if (rconn)
- conn = rconn->conn;
}
- if (!conn)
+ if (!rconn || !rconn->conn)
DBLINK_CONN_NOT_AVAIL;
+ else
+ conn = rconn->conn;
- res = PQexec(conn, "BEGIN");
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- DBLINK_RES_INTERNALERROR("begin error");
+ /* If we are not in a transaction, start one */
+ if (PQtransactionStatus(conn) == PQTRANS_IDLE)
+ {
+ res = PQexec(conn, "BEGIN");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ DBLINK_RES_INTERNALERROR("begin error");
+ PQclear(res);
+ rconn->newXactForCursor = TRUE;
+ }
- PQclear(res);
+ /* if we started a transaction, increment cursor count */
+ if (rconn->newXactForCursor)
+ (rconn->openCursorCount)++;
appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
res = PQexec(conn, str->data);
remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible behavior */
+ DBLINK_INIT;
+
if (PG_NARGS() == 1)
{
/* text */
curname = GET_STR(PG_GETARG_TEXT_P(0));
- conn = persistent_conn;
+ rconn = pconn;
}
else if (PG_NARGS() == 2)
{
{
curname = GET_STR(PG_GETARG_TEXT_P(0));
fail = PG_GETARG_BOOL(1);
- conn = persistent_conn;
+ rconn = pconn;
}
else
{
conname = GET_STR(PG_GETARG_TEXT_P(0));
curname = GET_STR(PG_GETARG_TEXT_P(1));
rconn = getConnectionByName(conname);
- if (rconn)
- conn = rconn->conn;
}
}
if (PG_NARGS() == 3)
curname = GET_STR(PG_GETARG_TEXT_P(1));
fail = PG_GETARG_BOOL(2);
rconn = getConnectionByName(conname);
- if (rconn)
- conn = rconn->conn;
}
- if (!conn)
+ if (!rconn || !rconn->conn)
DBLINK_CONN_NOT_AVAIL;
+ else
+ conn = rconn->conn;
appendStringInfo(str, "CLOSE %s", curname);
PQclear(res);
- /* commit the transaction */
- res = PQexec(conn, "COMMIT");
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- DBLINK_RES_INTERNALERROR("commit error");
+ /* if we started a transaction, decrement cursor count */
+ if (rconn->newXactForCursor)
+ {
+ (rconn->openCursorCount)--;
- PQclear(res);
+ /* if count is zero, commit the transaction */
+ if (rconn->openCursorCount == 0)
+ {
+ rconn->newXactForCursor = FALSE;
+
+ res = PQexec(conn, "COMMIT");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ DBLINK_RES_INTERNALERROR("commit error");
+ PQclear(res);
+ }
+ }
PG_RETURN_TEXT_P(GET_TEXT("OK"));
}
char *conname = NULL;
remoteConn *rconn = NULL;
+ DBLINK_INIT;
+
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
curname = GET_STR(PG_GETARG_TEXT_P(0));
howmany = PG_GETARG_INT32(1);
fail = PG_GETARG_BOOL(2);
- conn = persistent_conn;
+ conn = pconn->conn;
}
else
{
/* text,int */
curname = GET_STR(PG_GETARG_TEXT_P(0));
howmany = PG_GETARG_INT32(1);
- conn = persistent_conn;
+ conn = pconn->conn;
}
if (!conn)
MemoryContext oldcontext;
bool freeconn = false;
+ DBLINK_INIT;
+
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
/* text,text or text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
{
- conn = persistent_conn;
+ conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
fail = PG_GETARG_BOOL(1);
}
else if (PG_NARGS() == 1)
{
/* text */
- conn = persistent_conn;
+ conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
}
else
bool freeconn = false;
bool fail = true; /* default to backward compatible behavior */
+ DBLINK_INIT;
+
if (PG_NARGS() == 3)
{
/* must be text,text,bool */
/* might be text,text or text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
{
- conn = persistent_conn;
+ conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
fail = PG_GETARG_BOOL(1);
}
else if (PG_NARGS() == 1)
{
/* must be single text argument */
- conn = persistent_conn;
+ conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
}
else
ROLLBACK
(1 row)
+-- test opening cursor in a transaction
+SELECT dblink_exec('myconn','BEGIN');
+ dblink_exec
+-------------
+ BEGIN
+(1 row)
+
+-- an open transaction will prevent dblink_open() from opening its own
+SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+ dblink_open
+-------------
+ OK
+(1 row)
+
+-- this should not commit the transaction because the client opened it
+SELECT dblink_close('myconn','rmt_foo_cursor');
+ dblink_close
+--------------
+ OK
+(1 row)
+
+-- this should succeed because we have an open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ dblink_exec
+----------------
+ DECLARE CURSOR
+(1 row)
+
+-- commit remote transaction
+SELECT dblink_exec('myconn','COMMIT');
+ dblink_exec
+-------------
+ COMMIT
+(1 row)
+
+-- test automatic transactions for multiple cursor opens
+SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+ dblink_open
+-------------
+ OK
+(1 row)
+
+-- the second cursor
+SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
+ dblink_open
+-------------
+ OK
+(1 row)
+
+-- this should not commit the transaction
+SELECT dblink_close('myconn','rmt_foo_cursor2');
+ dblink_close
+--------------
+ OK
+(1 row)
+
+-- this should succeed because we have an open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ dblink_exec
+----------------
+ DECLARE CURSOR
+(1 row)
+
+-- this should commit the transaction
+SELECT dblink_close('myconn','rmt_foo_cursor');
+ dblink_close
+--------------
+ OK
+(1 row)
+
+-- this should fail because there is no open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ERROR: sql error
+DETAIL: ERROR: cursor "xact_test" already exists
+
+-- reset remote transaction state
+SELECT dblink_exec('myconn','ABORT');
+ dblink_exec
+-------------
+ ROLLBACK
+(1 row)
+
-- open a cursor
SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
dblink_open
-- reset remote transaction state
SELECT dblink_exec('myconn','ABORT');
+-- test opening cursor in a transaction
+SELECT dblink_exec('myconn','BEGIN');
+
+-- an open transaction will prevent dblink_open() from opening its own
+SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+
+-- this should not commit the transaction because the client opened it
+SELECT dblink_close('myconn','rmt_foo_cursor');
+
+-- this should succeed because we have an open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+-- commit remote transaction
+SELECT dblink_exec('myconn','COMMIT');
+
+-- test automatic transactions for multiple cursor opens
+SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+
+-- the second cursor
+SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
+
+-- this should not commit the transaction
+SELECT dblink_close('myconn','rmt_foo_cursor2');
+
+-- this should succeed because we have an open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+-- this should commit the transaction
+SELECT dblink_close('myconn','rmt_foo_cursor');
+
+-- this should fail because there is no open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+-- reset remote transaction state
+SELECT dblink_exec('myconn','ABORT');
+
-- open a cursor
SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');