From 12338c7ee615039fafb24bf285ef35c6f5387324 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Sat, 8 Aug 2015 12:54:38 +0300 Subject: [PATCH] Support pipelining - count expected ReadyForQuery packets. This avoids releasing server too early. Should fix #44 and #52. --- include/bouncer.h | 3 +++ src/client.c | 6 +++++- src/server.c | 35 +++++++++++++++++++++++++++++++---- 3 files changed, 39 insertions(+), 5 deletions(-) diff --git a/include/bouncer.h b/include/bouncer.h index 2da384e..51cfc88 100644 --- a/include/bouncer.h +++ b/include/bouncer.h @@ -336,6 +336,7 @@ struct PgSocket { bool setting_vars:1; /* server: setting client vars */ bool exec_on_connect:1; /* server: executing connect_query */ bool resetting:1; /* server: executing reset query from auth login; don't release on flush */ + bool copy_mode:1; /* server: in copy stream, ignores any Sync packets */ bool wait_for_welcome:1;/* client: no server yet in pool, cannot send welcome msg */ bool wait_for_user_conn:1;/* client: waiting for auth_conn server connection */ @@ -349,6 +350,8 @@ struct PgSocket { bool wait_sslchar:1; /* server: waiting for ssl response: S/N */ + int expect_rfq_count; /* client: count of ReadyForQuery packets client should see */ + usec_t connect_time; /* when connection was made */ usec_t request_time; /* last activity time */ usec_t query_start; /* query start moment */ diff --git a/src/client.c b/src/client.c index 973785b..a0ca558 100644 --- a/src/client.c +++ b/src/client.c @@ -611,12 +611,16 @@ static bool handle_client_work(PgSocket *client, PktHdr *pkt) case 'F': /* FunctionCall */ /* request immediate response from server */ - case 'H': /* Flush */ case 'S': /* Sync */ + client->expect_rfq_count++; + break; + case 'H': /* Flush */ + break; /* copy end markers */ case 'c': /* CopyDone(F/B) */ case 'f': /* CopyFail(F/B) */ + break; /* * extended protocol allows server (and thus pooler) diff --git a/src/server.c b/src/server.c index cb78224..e06bdf4 100644 --- a/src/server.c +++ b/src/server.c @@ -246,6 +246,14 @@ static bool handle_server_work(PgSocket *server, PktHdr *pkt) } else if (state == 'T' || state == 'E') { idle_tx = true; } + + if (client && !server->setting_vars) { + if (client->expect_rfq_count > 0) { + client->expect_rfq_count--; + } else if (server->state == SV_ACTIVE) { + slog_debug(client, "unexpected ReadyForQuery - expect_rfq_count=%d", client->expect_rfq_count); + } + } break; case 'S': /* ParameterStatus */ @@ -279,6 +287,17 @@ static bool handle_server_work(PgSocket *server, PktHdr *pkt) disconnect_server(server, true, "invalid server parameter"); return false; } + case 'C': /* CommandComplete */ + + /* ErrorResponse and CommandComplete show end of copy mode */ + if (server->copy_mode) { + server->copy_mode = false; + + /* it's impossible to track sync count over copy */ + if (client) + client->expect_rfq_count = 0; + } + break; case 'N': /* NoticeResponse */ break; @@ -289,6 +308,11 @@ static bool handle_server_work(PgSocket *server, PktHdr *pkt) ready = server->ready; break; + /* copy mode */ + case 'G': /* CopyInResponse */ + case 'H': /* CopyOutResponse */ + server->copy_mode = true; + break; /* chat packets */ case '2': /* BindComplete */ case '3': /* CloseComplete */ @@ -297,11 +321,8 @@ static bool handle_server_work(PgSocket *server, PktHdr *pkt) case 'I': /* EmptyQueryResponse == CommandComplete */ case 'V': /* FunctionCallResponse */ case 'n': /* NoData */ - case 'G': /* CopyInResponse */ - case 'H': /* CopyOutResponse */ case '1': /* ParseComplete */ case 's': /* PortalSuspended */ - case 'C': /* CommandComplete */ /* data packets, there will be more coming */ case 'd': /* CopyData(F/B) */ @@ -495,11 +516,17 @@ bool server_proto(SBuf *sbuf, SBufEvent evtype, struct MBuf *data) break; } - if (pool_pool_mode(pool) != POOL_SESSION || server->state == SV_TESTED || server->resetting) { + if (pool_pool_mode(pool) != POOL_SESSION || server->state == SV_TESTED || server->resetting) { server->resetting = false; switch (server->state) { case SV_ACTIVE: case SV_TESTED: + /* keep link if client expects more Syncs */ + if (server->link) { + if (server->link->expect_rfq_count > 0) + break; + } + /* retval does not matter here */ release_server(server); break; -- 2.40.0