]> granicus.if.org Git - pgbouncer/commitdiff
Support pipelining - count expected ReadyForQuery packets.
authorMarko Kreen <markokr@gmail.com>
Sat, 8 Aug 2015 09:54:38 +0000 (12:54 +0300)
committerMarko Kreen <markokr@gmail.com>
Sat, 8 Aug 2015 10:20:09 +0000 (13:20 +0300)
This avoids releasing server too early.

Should fix #44 and #52.

include/bouncer.h
src/client.c
src/server.c

index 2da384e5c64d8c6edbc5d15e18416c65d00725b2..51cfc885d875c5048725be6ce9437249998bfc1c 100644 (file)
@@ -336,6 +336,7 @@ struct PgSocket {
        bool setting_vars:1;    /* server: setting client vars */
        bool exec_on_connect:1; /* server: executing connect_query */
        bool resetting:1;       /* server: executing reset query from auth login; don't release on flush */
+       bool copy_mode:1;       /* server: in copy stream, ignores any Sync packets */
 
        bool wait_for_welcome:1;/* client: no server yet in pool, cannot send welcome msg */
        bool wait_for_user_conn:1;/* client: waiting for auth_conn server connection */
@@ -349,6 +350,8 @@ struct PgSocket {
 
        bool wait_sslchar:1;    /* server: waiting for ssl response: S/N */
 
+       int expect_rfq_count;   /* client: count of ReadyForQuery packets client should see */
+
        usec_t connect_time;    /* when connection was made */
        usec_t request_time;    /* last activity time */
        usec_t query_start;     /* query start moment */
index 973785bef054a2d18779ce06aa4113fd9f806f0e..a0ca55852665f8dfc4befead251bf38011bb60eb 100644 (file)
@@ -611,12 +611,16 @@ static bool handle_client_work(PgSocket *client, PktHdr *pkt)
        case 'F':               /* FunctionCall */
 
        /* request immediate response from server */
-       case 'H':               /* Flush */
        case 'S':               /* Sync */
+               client->expect_rfq_count++;
+               break;
+       case 'H':               /* Flush */
+               break;
 
        /* copy end markers */
        case 'c':               /* CopyDone(F/B) */
        case 'f':               /* CopyFail(F/B) */
+               break;
 
        /*
         * extended protocol allows server (and thus pooler)
index cb782243b9858ea6476d3b96066245ddc8f9aab3..e06bdf44078ef2b4cd3c515402b9838f3c2eedc2 100644 (file)
@@ -246,6 +246,14 @@ static bool handle_server_work(PgSocket *server, PktHdr *pkt)
                } else if (state == 'T' || state == 'E') {
                        idle_tx = true;
                }
+
+               if (client && !server->setting_vars) {
+                       if (client->expect_rfq_count > 0) {
+                               client->expect_rfq_count--;
+                       } else if (server->state == SV_ACTIVE) {
+                               slog_debug(client, "unexpected ReadyForQuery - expect_rfq_count=%d", client->expect_rfq_count);
+                       }
+               }
                break;
 
        case 'S':               /* ParameterStatus */
@@ -279,6 +287,17 @@ static bool handle_server_work(PgSocket *server, PktHdr *pkt)
                        disconnect_server(server, true, "invalid server parameter");
                        return false;
                }
+       case 'C':               /* CommandComplete */
+
+               /* ErrorResponse and CommandComplete show end of copy mode */
+               if (server->copy_mode) {
+                       server->copy_mode = false;
+
+                       /* it's impossible to track sync count over copy */
+                       if (client)
+                               client->expect_rfq_count = 0;
+               }
+               break;
 
        case 'N':               /* NoticeResponse */
                break;
@@ -289,6 +308,11 @@ static bool handle_server_work(PgSocket *server, PktHdr *pkt)
                ready = server->ready;
                break;
 
+       /* copy mode */
+       case 'G':               /* CopyInResponse */
+       case 'H':               /* CopyOutResponse */
+               server->copy_mode = true;
+               break;
        /* chat packets */
        case '2':               /* BindComplete */
        case '3':               /* CloseComplete */
@@ -297,11 +321,8 @@ static bool handle_server_work(PgSocket *server, PktHdr *pkt)
        case 'I':               /* EmptyQueryResponse == CommandComplete */
        case 'V':               /* FunctionCallResponse */
        case 'n':               /* NoData */
-       case 'G':               /* CopyInResponse */
-       case 'H':               /* CopyOutResponse */
        case '1':               /* ParseComplete */
        case 's':               /* PortalSuspended */
-       case 'C':               /* CommandComplete */
 
        /* data packets, there will be more coming */
        case 'd':               /* CopyData(F/B) */
@@ -495,11 +516,17 @@ bool server_proto(SBuf *sbuf, SBufEvent evtype, struct MBuf *data)
                        break;
                }
 
-               if (pool_pool_mode(pool)  != POOL_SESSION || server->state == SV_TESTED || server->resetting) {
+               if (pool_pool_mode(pool) != POOL_SESSION || server->state == SV_TESTED || server->resetting) {
                        server->resetting = false;
                        switch (server->state) {
                        case SV_ACTIVE:
                        case SV_TESTED:
+                               /* keep link if client expects more Syncs */
+                               if (server->link) {
+                                       if (server->link->expect_rfq_count > 0)
+                                               break;
+                               }
+
                                /* retval does not matter here */
                                release_server(server);
                                break;