]> granicus.if.org Git - postgresql/commitdiff
When a cursor is opened using dblink_open, only start a transaction
authorJoe Conway <mail@joeconway.com>
Tue, 18 Oct 2005 02:55:49 +0000 (02:55 +0000)
committerJoe Conway <mail@joeconway.com>
Tue, 18 Oct 2005 02:55:49 +0000 (02:55 +0000)
if there isn't one already open. Upon dblink_close, only commit
the open transaction if it was started by dblink_open, and only
then when all cursors opened by dblink_open are closed. The transaction
accounting is done individually for all named connections, plus
the persistent unnamed connection.

contrib/dblink/dblink.c
contrib/dblink/expected/dblink.out
contrib/dblink/sql/dblink.sql

index c11b15860a39b538075f5418dd54ff1637fc1199..54e787bb206da692df5f7f1f42ea31d33f88abdc 100644 (file)
@@ -60,9 +60,9 @@
 
 typedef struct remoteConn
 {
-       PGconn     *conn;                       /* Hold the remote connection */
-       int                     autoXactCursors;/* Indicates the number of open cursors,
-                                                                * non-zero means we opened the xact ourselves */
+       PGconn     *conn;                               /* Hold the remote connection */
+       int                     openCursorCount;        /* The number of open cursors */
+       bool            newXactForCursor;       /* Opened a transaction for a cursor */
 }      remoteConn;
 
 /*
@@ -84,10 +84,8 @@ static Oid   get_relid_from_relname(text *relname_text);
 static char *generate_relation_name(Oid relid);
 
 /* Global */
-List      *res_id = NIL;
-int                    res_id_index = 0;
-PGconn    *persistent_conn = NULL;
-static HTAB *remoteConnHash = NULL;
+static remoteConn         *pconn = NULL;
+static HTAB                       *remoteConnHash = NULL;
 
 /*
  *     Following is list that holds multiple remote connections.
@@ -184,6 +182,16 @@ typedef struct remoteConnHashEnt
                        } \
        } while (0)
 
+#define DBLINK_INIT \
+       do { \
+                       if (!pconn) \
+                       { \
+                               pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
+                               pconn->conn = NULL; \
+                               pconn->openCursorCount = 0; \
+                               pconn->newXactForCursor = FALSE; \
+                       } \
+       } while (0)
 
 /*
  * Create a persistent connection to another database
@@ -199,6 +207,8 @@ dblink_connect(PG_FUNCTION_ARGS)
        PGconn     *conn = NULL;
        remoteConn *rconn = NULL;
 
+       DBLINK_INIT;
+
        if (PG_NARGS() == 2)
        {
                connstr = GET_STR(PG_GETARG_TEXT_P(1));
@@ -234,7 +244,7 @@ dblink_connect(PG_FUNCTION_ARGS)
                createNewConnection(connname, rconn);
        }
        else
-               persistent_conn = conn;
+               pconn->conn = conn;
 
        PG_RETURN_TEXT_P(GET_TEXT("OK"));
 }
@@ -250,6 +260,8 @@ dblink_disconnect(PG_FUNCTION_ARGS)
        remoteConn *rconn = NULL;
        PGconn     *conn = NULL;
 
+       DBLINK_INIT;
+
        if (PG_NARGS() == 1)
        {
                conname = GET_STR(PG_GETARG_TEXT_P(0));
@@ -258,7 +270,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
                        conn = rconn->conn;
        }
        else
-               conn = persistent_conn;
+               conn = pconn->conn;
 
        if (!conn)
                DBLINK_CONN_NOT_AVAIL;
@@ -270,7 +282,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
                pfree(rconn);
        }
        else
-               persistent_conn = NULL;
+               pconn->conn = NULL;
 
        PG_RETURN_TEXT_P(GET_TEXT("OK"));
 }
@@ -292,12 +304,14 @@ dblink_open(PG_FUNCTION_ARGS)
        remoteConn *rconn = NULL;
        bool            fail = true;    /* default to backward compatible behavior */
 
+       DBLINK_INIT;
+
        if (PG_NARGS() == 2)
        {
                /* text,text */
                curname = GET_STR(PG_GETARG_TEXT_P(0));
                sql = GET_STR(PG_GETARG_TEXT_P(1));
-               conn = persistent_conn;
+               rconn = pconn;
        }
        else if (PG_NARGS() == 3)
        {
@@ -307,7 +321,7 @@ dblink_open(PG_FUNCTION_ARGS)
                        curname = GET_STR(PG_GETARG_TEXT_P(0));
                        sql = GET_STR(PG_GETARG_TEXT_P(1));
                        fail = PG_GETARG_BOOL(2);
-                       conn = persistent_conn;
+                       rconn = pconn;
                }
                else
                {
@@ -315,8 +329,6 @@ dblink_open(PG_FUNCTION_ARGS)
                        curname = GET_STR(PG_GETARG_TEXT_P(1));
                        sql = GET_STR(PG_GETARG_TEXT_P(2));
                        rconn = getConnectionByName(conname);
-                       if (rconn)
-                               conn = rconn->conn;
                }
        }
        else if (PG_NARGS() == 4)
@@ -327,18 +339,26 @@ dblink_open(PG_FUNCTION_ARGS)
                sql = GET_STR(PG_GETARG_TEXT_P(2));
                fail = PG_GETARG_BOOL(3);
                rconn = getConnectionByName(conname);
-               if (rconn)
-                       conn = rconn->conn;
        }
 
-       if (!conn)
+       if (!rconn || !rconn->conn)
                DBLINK_CONN_NOT_AVAIL;
+       else
+               conn = rconn->conn;
 
-       res = PQexec(conn, "BEGIN");
-       if (PQresultStatus(res) != PGRES_COMMAND_OK)
-               DBLINK_RES_INTERNALERROR("begin error");
+       /*      If we are not in a transaction, start one */
+       if (PQtransactionStatus(conn) == PQTRANS_IDLE)
+       {
+               res = PQexec(conn, "BEGIN");
+               if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                       DBLINK_RES_INTERNALERROR("begin error");
+               PQclear(res);
+               rconn->newXactForCursor = TRUE;
+       }
 
-       PQclear(res);
+       /* if we started a transaction, increment cursor count */
+       if (rconn->newXactForCursor)
+               (rconn->openCursorCount)++;
 
        appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
        res = PQexec(conn, str->data);
@@ -373,11 +393,13 @@ dblink_close(PG_FUNCTION_ARGS)
        remoteConn *rconn = NULL;
        bool            fail = true;    /* default to backward compatible behavior */
 
+       DBLINK_INIT;
+
        if (PG_NARGS() == 1)
        {
                /* text */
                curname = GET_STR(PG_GETARG_TEXT_P(0));
-               conn = persistent_conn;
+               rconn = pconn;
        }
        else if (PG_NARGS() == 2)
        {
@@ -386,15 +408,13 @@ dblink_close(PG_FUNCTION_ARGS)
                {
                        curname = GET_STR(PG_GETARG_TEXT_P(0));
                        fail = PG_GETARG_BOOL(1);
-                       conn = persistent_conn;
+                       rconn = pconn;
                }
                else
                {
                        conname = GET_STR(PG_GETARG_TEXT_P(0));
                        curname = GET_STR(PG_GETARG_TEXT_P(1));
                        rconn = getConnectionByName(conname);
-                       if (rconn)
-                               conn = rconn->conn;
                }
        }
        if (PG_NARGS() == 3)
@@ -404,12 +424,12 @@ dblink_close(PG_FUNCTION_ARGS)
                curname = GET_STR(PG_GETARG_TEXT_P(1));
                fail = PG_GETARG_BOOL(2);
                rconn = getConnectionByName(conname);
-               if (rconn)
-                       conn = rconn->conn;
        }
 
-       if (!conn)
+       if (!rconn || !rconn->conn)
                DBLINK_CONN_NOT_AVAIL;
+       else
+               conn = rconn->conn;
 
        appendStringInfo(str, "CLOSE %s", curname);
 
@@ -428,12 +448,22 @@ dblink_close(PG_FUNCTION_ARGS)
 
        PQclear(res);
 
-       /* commit the transaction */
-       res = PQexec(conn, "COMMIT");
-       if (PQresultStatus(res) != PGRES_COMMAND_OK)
-               DBLINK_RES_INTERNALERROR("commit error");
+       /* if we started a transaction, decrement cursor count */
+       if (rconn->newXactForCursor)
+       {
+               (rconn->openCursorCount)--;
 
-       PQclear(res);
+               /* if count is zero, commit the transaction */
+               if (rconn->openCursorCount == 0)
+               {
+                       rconn->newXactForCursor = FALSE;
+
+                       res = PQexec(conn, "COMMIT");
+                       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                               DBLINK_RES_INTERNALERROR("commit error");
+                       PQclear(res);
+               }
+       }
 
        PG_RETURN_TEXT_P(GET_TEXT("OK"));
 }
@@ -456,6 +486,8 @@ dblink_fetch(PG_FUNCTION_ARGS)
        char       *conname = NULL;
        remoteConn *rconn = NULL;
 
+       DBLINK_INIT;
+
        /* stuff done only on the first call of the function */
        if (SRF_IS_FIRSTCALL())
        {
@@ -485,7 +517,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
                                curname = GET_STR(PG_GETARG_TEXT_P(0));
                                howmany = PG_GETARG_INT32(1);
                                fail = PG_GETARG_BOOL(2);
-                               conn = persistent_conn;
+                               conn = pconn->conn;
                        }
                        else
                        {
@@ -503,7 +535,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
                        /* text,int */
                        curname = GET_STR(PG_GETARG_TEXT_P(0));
                        howmany = PG_GETARG_INT32(1);
-                       conn = persistent_conn;
+                       conn = pconn->conn;
                }
 
                if (!conn)
@@ -648,6 +680,8 @@ dblink_record(PG_FUNCTION_ARGS)
        MemoryContext oldcontext;
        bool            freeconn = false;
 
+       DBLINK_INIT;
+
        /* stuff done only on the first call of the function */
        if (SRF_IS_FIRSTCALL())
        {
@@ -678,7 +712,7 @@ dblink_record(PG_FUNCTION_ARGS)
                        /* text,text or text,bool */
                        if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
                        {
-                               conn = persistent_conn;
+                               conn = pconn->conn;
                                sql = GET_STR(PG_GETARG_TEXT_P(0));
                                fail = PG_GETARG_BOOL(1);
                        }
@@ -691,7 +725,7 @@ dblink_record(PG_FUNCTION_ARGS)
                else if (PG_NARGS() == 1)
                {
                        /* text */
-                       conn = persistent_conn;
+                       conn = pconn->conn;
                        sql = GET_STR(PG_GETARG_TEXT_P(0));
                }
                else
@@ -857,6 +891,8 @@ dblink_exec(PG_FUNCTION_ARGS)
        bool            freeconn = false;
        bool            fail = true;    /* default to backward compatible behavior */
 
+       DBLINK_INIT;
+
        if (PG_NARGS() == 3)
        {
                /* must be text,text,bool */
@@ -869,7 +905,7 @@ dblink_exec(PG_FUNCTION_ARGS)
                /* might be text,text or text,bool */
                if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
                {
-                       conn = persistent_conn;
+                       conn = pconn->conn;
                        sql = GET_STR(PG_GETARG_TEXT_P(0));
                        fail = PG_GETARG_BOOL(1);
                }
@@ -882,7 +918,7 @@ dblink_exec(PG_FUNCTION_ARGS)
        else if (PG_NARGS() == 1)
        {
                /* must be single text argument */
-               conn = persistent_conn;
+               conn = pconn->conn;
                sql = GET_STR(PG_GETARG_TEXT_P(0));
        }
        else
index cd2d4b1845030d1ecd2493fa01349ee59cae2ed6..55d2b9e7c4c084ceb7f9b15a5f6d72b0eb621893 100644 (file)
@@ -436,6 +436,88 @@ SELECT dblink_exec('myconn','ABORT');
  ROLLBACK
 (1 row)
 
+-- test opening cursor in a transaction
+SELECT dblink_exec('myconn','BEGIN');
+ dblink_exec 
+-------------
+ BEGIN
+(1 row)
+
+-- an open transaction will prevent dblink_open() from opening its own
+SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+ dblink_open 
+-------------
+ OK
+(1 row)
+
+-- this should not commit the transaction because the client opened it
+SELECT dblink_close('myconn','rmt_foo_cursor');
+ dblink_close 
+--------------
+ OK
+(1 row)
+
+-- this should succeed because we have an open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+  dblink_exec   
+----------------
+ DECLARE CURSOR
+(1 row)
+
+-- commit remote transaction
+SELECT dblink_exec('myconn','COMMIT');
+ dblink_exec 
+-------------
+ COMMIT
+(1 row)
+
+-- test automatic transactions for multiple cursor opens
+SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+ dblink_open 
+-------------
+ OK
+(1 row)
+
+-- the second cursor
+SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
+ dblink_open 
+-------------
+ OK
+(1 row)
+
+-- this should not commit the transaction
+SELECT dblink_close('myconn','rmt_foo_cursor2');
+ dblink_close 
+--------------
+ OK
+(1 row)
+
+-- this should succeed because we have an open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+  dblink_exec   
+----------------
+ DECLARE CURSOR
+(1 row)
+
+-- this should commit the transaction
+SELECT dblink_close('myconn','rmt_foo_cursor');
+ dblink_close 
+--------------
+ OK
+(1 row)
+
+-- this should fail because there is no open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ERROR:  sql error
+DETAIL:  ERROR:  cursor "xact_test" already exists
+
+-- reset remote transaction state
+SELECT dblink_exec('myconn','ABORT');
+ dblink_exec 
+-------------
+ ROLLBACK
+(1 row)
+
 -- open a cursor
 SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
  dblink_open 
index db9dd6582fde9f30d9505d7d96ffaee4d1e51201..66e2607cfee667bbe8c226da4c0f4bb66ec39669 100644 (file)
@@ -217,6 +217,42 @@ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foobar',false);
 -- reset remote transaction state
 SELECT dblink_exec('myconn','ABORT');
 
+-- test opening cursor in a transaction
+SELECT dblink_exec('myconn','BEGIN');
+
+-- an open transaction will prevent dblink_open() from opening its own
+SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+
+-- this should not commit the transaction because the client opened it
+SELECT dblink_close('myconn','rmt_foo_cursor');
+
+-- this should succeed because we have an open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+-- commit remote transaction
+SELECT dblink_exec('myconn','COMMIT');
+
+-- test automatic transactions for multiple cursor opens
+SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+
+-- the second cursor
+SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
+
+-- this should not commit the transaction
+SELECT dblink_close('myconn','rmt_foo_cursor2');
+
+-- this should succeed because we have an open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+-- this should commit the transaction
+SELECT dblink_close('myconn','rmt_foo_cursor');
+
+-- this should fail because there is no open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+-- reset remote transaction state
+SELECT dblink_exec('myconn','ABORT');
+
 -- open a cursor
 SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');