From 056eb1412c864c61b26eb8d540fd92636795f67d Mon Sep 17 00:00:00 2001 From: Joe Conway Date: Tue, 18 Oct 2005 02:55:49 +0000 Subject: [PATCH] When a cursor is opened using dblink_open, only start a transaction if there isn't one already open. Upon dblink_close, only commit the open transaction if it was started by dblink_open, and only then when all cursors opened by dblink_open are closed. The transaction accounting is done individually for all named connections, plus the persistent unnamed connection. --- contrib/dblink/dblink.c | 114 +++++++++++++++++++---------- contrib/dblink/expected/dblink.out | 82 +++++++++++++++++++++ contrib/dblink/sql/dblink.sql | 36 +++++++++ 3 files changed, 193 insertions(+), 39 deletions(-) diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index c11b15860a..54e787bb20 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -60,9 +60,9 @@ typedef struct remoteConn { - PGconn *conn; /* Hold the remote connection */ - int autoXactCursors;/* Indicates the number of open cursors, - * non-zero means we opened the xact ourselves */ + PGconn *conn; /* Hold the remote connection */ + int openCursorCount; /* The number of open cursors */ + bool newXactForCursor; /* Opened a transaction for a cursor */ } remoteConn; /* @@ -84,10 +84,8 @@ static Oid get_relid_from_relname(text *relname_text); static char *generate_relation_name(Oid relid); /* Global */ -List *res_id = NIL; -int res_id_index = 0; -PGconn *persistent_conn = NULL; -static HTAB *remoteConnHash = NULL; +static remoteConn *pconn = NULL; +static HTAB *remoteConnHash = NULL; /* * Following is list that holds multiple remote connections. @@ -184,6 +182,16 @@ typedef struct remoteConnHashEnt } \ } while (0) +#define DBLINK_INIT \ + do { \ + if (!pconn) \ + { \ + pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \ + pconn->conn = NULL; \ + pconn->openCursorCount = 0; \ + pconn->newXactForCursor = FALSE; \ + } \ + } while (0) /* * Create a persistent connection to another database @@ -199,6 +207,8 @@ dblink_connect(PG_FUNCTION_ARGS) PGconn *conn = NULL; remoteConn *rconn = NULL; + DBLINK_INIT; + if (PG_NARGS() == 2) { connstr = GET_STR(PG_GETARG_TEXT_P(1)); @@ -234,7 +244,7 @@ dblink_connect(PG_FUNCTION_ARGS) createNewConnection(connname, rconn); } else - persistent_conn = conn; + pconn->conn = conn; PG_RETURN_TEXT_P(GET_TEXT("OK")); } @@ -250,6 +260,8 @@ dblink_disconnect(PG_FUNCTION_ARGS) remoteConn *rconn = NULL; PGconn *conn = NULL; + DBLINK_INIT; + if (PG_NARGS() == 1) { conname = GET_STR(PG_GETARG_TEXT_P(0)); @@ -258,7 +270,7 @@ dblink_disconnect(PG_FUNCTION_ARGS) conn = rconn->conn; } else - conn = persistent_conn; + conn = pconn->conn; if (!conn) DBLINK_CONN_NOT_AVAIL; @@ -270,7 +282,7 @@ dblink_disconnect(PG_FUNCTION_ARGS) pfree(rconn); } else - persistent_conn = NULL; + pconn->conn = NULL; PG_RETURN_TEXT_P(GET_TEXT("OK")); } @@ -292,12 +304,14 @@ dblink_open(PG_FUNCTION_ARGS) remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible behavior */ + DBLINK_INIT; + if (PG_NARGS() == 2) { /* text,text */ curname = GET_STR(PG_GETARG_TEXT_P(0)); sql = GET_STR(PG_GETARG_TEXT_P(1)); - conn = persistent_conn; + rconn = pconn; } else if (PG_NARGS() == 3) { @@ -307,7 +321,7 @@ dblink_open(PG_FUNCTION_ARGS) curname = GET_STR(PG_GETARG_TEXT_P(0)); sql = GET_STR(PG_GETARG_TEXT_P(1)); fail = PG_GETARG_BOOL(2); - conn = persistent_conn; + rconn = pconn; } else { @@ -315,8 +329,6 @@ dblink_open(PG_FUNCTION_ARGS) curname = GET_STR(PG_GETARG_TEXT_P(1)); sql = GET_STR(PG_GETARG_TEXT_P(2)); rconn = getConnectionByName(conname); - if (rconn) - conn = rconn->conn; } } else if (PG_NARGS() == 4) @@ -327,18 +339,26 @@ dblink_open(PG_FUNCTION_ARGS) sql = GET_STR(PG_GETARG_TEXT_P(2)); fail = PG_GETARG_BOOL(3); rconn = getConnectionByName(conname); - if (rconn) - conn = rconn->conn; } - if (!conn) + if (!rconn || !rconn->conn) DBLINK_CONN_NOT_AVAIL; + else + conn = rconn->conn; - res = PQexec(conn, "BEGIN"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - DBLINK_RES_INTERNALERROR("begin error"); + /* If we are not in a transaction, start one */ + if (PQtransactionStatus(conn) == PQTRANS_IDLE) + { + res = PQexec(conn, "BEGIN"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + DBLINK_RES_INTERNALERROR("begin error"); + PQclear(res); + rconn->newXactForCursor = TRUE; + } - PQclear(res); + /* if we started a transaction, increment cursor count */ + if (rconn->newXactForCursor) + (rconn->openCursorCount)++; appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql); res = PQexec(conn, str->data); @@ -373,11 +393,13 @@ dblink_close(PG_FUNCTION_ARGS) remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible behavior */ + DBLINK_INIT; + if (PG_NARGS() == 1) { /* text */ curname = GET_STR(PG_GETARG_TEXT_P(0)); - conn = persistent_conn; + rconn = pconn; } else if (PG_NARGS() == 2) { @@ -386,15 +408,13 @@ dblink_close(PG_FUNCTION_ARGS) { curname = GET_STR(PG_GETARG_TEXT_P(0)); fail = PG_GETARG_BOOL(1); - conn = persistent_conn; + rconn = pconn; } else { conname = GET_STR(PG_GETARG_TEXT_P(0)); curname = GET_STR(PG_GETARG_TEXT_P(1)); rconn = getConnectionByName(conname); - if (rconn) - conn = rconn->conn; } } if (PG_NARGS() == 3) @@ -404,12 +424,12 @@ dblink_close(PG_FUNCTION_ARGS) curname = GET_STR(PG_GETARG_TEXT_P(1)); fail = PG_GETARG_BOOL(2); rconn = getConnectionByName(conname); - if (rconn) - conn = rconn->conn; } - if (!conn) + if (!rconn || !rconn->conn) DBLINK_CONN_NOT_AVAIL; + else + conn = rconn->conn; appendStringInfo(str, "CLOSE %s", curname); @@ -428,12 +448,22 @@ dblink_close(PG_FUNCTION_ARGS) PQclear(res); - /* commit the transaction */ - res = PQexec(conn, "COMMIT"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - DBLINK_RES_INTERNALERROR("commit error"); + /* if we started a transaction, decrement cursor count */ + if (rconn->newXactForCursor) + { + (rconn->openCursorCount)--; - PQclear(res); + /* if count is zero, commit the transaction */ + if (rconn->openCursorCount == 0) + { + rconn->newXactForCursor = FALSE; + + res = PQexec(conn, "COMMIT"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + DBLINK_RES_INTERNALERROR("commit error"); + PQclear(res); + } + } PG_RETURN_TEXT_P(GET_TEXT("OK")); } @@ -456,6 +486,8 @@ dblink_fetch(PG_FUNCTION_ARGS) char *conname = NULL; remoteConn *rconn = NULL; + DBLINK_INIT; + /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) { @@ -485,7 +517,7 @@ dblink_fetch(PG_FUNCTION_ARGS) curname = GET_STR(PG_GETARG_TEXT_P(0)); howmany = PG_GETARG_INT32(1); fail = PG_GETARG_BOOL(2); - conn = persistent_conn; + conn = pconn->conn; } else { @@ -503,7 +535,7 @@ dblink_fetch(PG_FUNCTION_ARGS) /* text,int */ curname = GET_STR(PG_GETARG_TEXT_P(0)); howmany = PG_GETARG_INT32(1); - conn = persistent_conn; + conn = pconn->conn; } if (!conn) @@ -648,6 +680,8 @@ dblink_record(PG_FUNCTION_ARGS) MemoryContext oldcontext; bool freeconn = false; + DBLINK_INIT; + /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) { @@ -678,7 +712,7 @@ dblink_record(PG_FUNCTION_ARGS) /* text,text or text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) { - conn = persistent_conn; + conn = pconn->conn; sql = GET_STR(PG_GETARG_TEXT_P(0)); fail = PG_GETARG_BOOL(1); } @@ -691,7 +725,7 @@ dblink_record(PG_FUNCTION_ARGS) else if (PG_NARGS() == 1) { /* text */ - conn = persistent_conn; + conn = pconn->conn; sql = GET_STR(PG_GETARG_TEXT_P(0)); } else @@ -857,6 +891,8 @@ dblink_exec(PG_FUNCTION_ARGS) bool freeconn = false; bool fail = true; /* default to backward compatible behavior */ + DBLINK_INIT; + if (PG_NARGS() == 3) { /* must be text,text,bool */ @@ -869,7 +905,7 @@ dblink_exec(PG_FUNCTION_ARGS) /* might be text,text or text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) { - conn = persistent_conn; + conn = pconn->conn; sql = GET_STR(PG_GETARG_TEXT_P(0)); fail = PG_GETARG_BOOL(1); } @@ -882,7 +918,7 @@ dblink_exec(PG_FUNCTION_ARGS) else if (PG_NARGS() == 1) { /* must be single text argument */ - conn = persistent_conn; + conn = pconn->conn; sql = GET_STR(PG_GETARG_TEXT_P(0)); } else diff --git a/contrib/dblink/expected/dblink.out b/contrib/dblink/expected/dblink.out index cd2d4b1845..55d2b9e7c4 100644 --- a/contrib/dblink/expected/dblink.out +++ b/contrib/dblink/expected/dblink.out @@ -436,6 +436,88 @@ SELECT dblink_exec('myconn','ABORT'); ROLLBACK (1 row) +-- test opening cursor in a transaction +SELECT dblink_exec('myconn','BEGIN'); + dblink_exec +------------- + BEGIN +(1 row) + +-- an open transaction will prevent dblink_open() from opening its own +SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo'); + dblink_open +------------- + OK +(1 row) + +-- this should not commit the transaction because the client opened it +SELECT dblink_close('myconn','rmt_foo_cursor'); + dblink_close +-------------- + OK +(1 row) + +-- this should succeed because we have an open transaction +SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo'); + dblink_exec +---------------- + DECLARE CURSOR +(1 row) + +-- commit remote transaction +SELECT dblink_exec('myconn','COMMIT'); + dblink_exec +------------- + COMMIT +(1 row) + +-- test automatic transactions for multiple cursor opens +SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo'); + dblink_open +------------- + OK +(1 row) + +-- the second cursor +SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo'); + dblink_open +------------- + OK +(1 row) + +-- this should not commit the transaction +SELECT dblink_close('myconn','rmt_foo_cursor2'); + dblink_close +-------------- + OK +(1 row) + +-- this should succeed because we have an open transaction +SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo'); + dblink_exec +---------------- + DECLARE CURSOR +(1 row) + +-- this should commit the transaction +SELECT dblink_close('myconn','rmt_foo_cursor'); + dblink_close +-------------- + OK +(1 row) + +-- this should fail because there is no open transaction +SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo'); +ERROR: sql error +DETAIL: ERROR: cursor "xact_test" already exists + +-- reset remote transaction state +SELECT dblink_exec('myconn','ABORT'); + dblink_exec +------------- + ROLLBACK +(1 row) + -- open a cursor SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo'); dblink_open diff --git a/contrib/dblink/sql/dblink.sql b/contrib/dblink/sql/dblink.sql index db9dd6582f..66e2607cfe 100644 --- a/contrib/dblink/sql/dblink.sql +++ b/contrib/dblink/sql/dblink.sql @@ -217,6 +217,42 @@ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foobar',false); -- reset remote transaction state SELECT dblink_exec('myconn','ABORT'); +-- test opening cursor in a transaction +SELECT dblink_exec('myconn','BEGIN'); + +-- an open transaction will prevent dblink_open() from opening its own +SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo'); + +-- this should not commit the transaction because the client opened it +SELECT dblink_close('myconn','rmt_foo_cursor'); + +-- this should succeed because we have an open transaction +SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo'); + +-- commit remote transaction +SELECT dblink_exec('myconn','COMMIT'); + +-- test automatic transactions for multiple cursor opens +SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo'); + +-- the second cursor +SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo'); + +-- this should not commit the transaction +SELECT dblink_close('myconn','rmt_foo_cursor2'); + +-- this should succeed because we have an open transaction +SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo'); + +-- this should commit the transaction +SELECT dblink_close('myconn','rmt_foo_cursor'); + +-- this should fail because there is no open transaction +SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo'); + +-- reset remote transaction state +SELECT dblink_exec('myconn','ABORT'); + -- open a cursor SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo'); -- 2.40.0