]> granicus.if.org Git - pgbouncer/commitdiff
version 1.0.3 pgbouncer_1_0_3
authorMarko Kreen <markokr@gmail.com>
Wed, 11 Apr 2007 07:39:49 +0000 (07:39 +0000)
committerMarko Kreen <markokr@gmail.com>
Wed, 11 Apr 2007 07:39:49 +0000 (07:39 +0000)
  = Fixes =

  * Some error handling was missing in login path, so dying
    connection there could trigger asserts.
  * Cleanup of asserts in sbuf.c to catch problems earlier.
  * Create core when Assert() triggers.

  = New stuff =

  * New config vars: log_connections, log_disconnections,
    log_pooler_errors to turn on/off noise.
  * Config var: client_login_timeout to kill dead connections
    in login phase that could stall SUSPEND and thus online restart.

26 files changed:
NEWS
configure.ac
debian/changelog
doc/config.txt
doc/overview.txt
doc/todo.txt
etc/pgbouncer.ini
etc/test.ini
etc/test.users
src/admin.c
src/bouncer.h
src/client.c
src/janitor.c
src/main.c
src/objects.c
src/objects.h
src/pktbuf.c
src/proto.c
src/sbuf.c
src/sbuf.h
src/server.c
src/system.h
src/takeover.c
src/takeover.h
src/util.c
src/util.h

diff --git a/NEWS b/NEWS
index 8ab22bc64e32d9aba047b19bb147746adad20464..d908aca31f177c724f64e10d7bd5dbcd72648933 100644 (file)
--- a/NEWS
+++ b/NEWS
@@ -1,4 +1,20 @@
 
+2007-04-11  -  PgBouncer 1.0.3  -  "Fearless Fork"
+
+  = Fixes =
+
+  * Some error handling was missing in login path, so dying
+    connection there could trigger asserts.
+  * Cleanup of asserts in sbuf.c to catch problems earlier.
+  * Create core when Assert() triggers.
+
+  = New stuff =
+
+  * New config vars: log_connections, log_disconnections,
+    log_pooler_errors to turn on/off noise.
+  * Config var: client_login_timeout to kill dead connections
+    in login phase that could stall SUSPEND and thus online restart.
+
 2007-03-28  -  PgBouncer 1.0.2  -  "Supersonic Spoon"
 
   * libevent may report a deleted event inside same loop.
index c74ab491282a0e1107c5baeed60c49b787522a1b..a4a41465412b1cefb0c801d0fcd662280cc9369b 100644 (file)
@@ -1,6 +1,6 @@
 dnl Process this file with autoconf to produce a configure script.
 
-AC_INIT(pgbouncer, 1.0.2)
+AC_INIT(pgbouncer, 1.0.3)
 AC_CONFIG_SRCDIR(src/bouncer.h)
 AC_CONFIG_HEADER(config.h)
 
index 0eb83bf9978cee8af2e7338e646b1499b2adf0db..b7e94a18b02f584c4536c3f9da504b1db5334280 100644 (file)
@@ -1,4 +1,10 @@
-pgbouncer (1.0.2) unstable; urgency=low
+pgbouncer (1.0.3-1) unstable; urgency=low
+
+  * more error handling fixes.
+
+ -- Marko Kreen <marko.kreen@skype.net>  Tue, 10 Apr 2007 17:22:49 +0300
+
+pgbouncer (1.0.2-1) unstable; urgency=low
 
   * 2 more bugs.
 
index b842ae423b1fec5d8ebca9c94e58dedc11bb3d49..09ccf03e874d72ab4c75779567942be1755ced9d 100644 (file)
@@ -100,6 +100,28 @@ Maximin number of client connections allowed.
 How many server connection to allow per user/database pair.
 Can be overrided in per-database config.
 
+
+
+=== Log settings ===
+
+==== log_connections ====
+
+Log successful logins.
+
+Default: 1
+
+==== log_disconnections ====
+
+Log disconnections with reasons.
+
+Default: 1
+
+==== log_pooler_errors ====
+
+Log error messaged pooler sends to clients.
+
+Default: 1
+
 === Console access control ===
 
 ==== admin_users ====
@@ -110,6 +132,8 @@ List of users that are allowed to run all commands on console.
 List of users that are allowed to run read-only queries on console.
 Thats means all SHOW commands except SHOW FDS.
 
+
+
 === Connection sanity checks, timeouts ===
 
 ==== server_check_delay ====
@@ -155,10 +179,21 @@ Default: 0 (disabled)
 
 ==== client_idle_timeout ====
 
-Client connections idling longer than that are closed.
+Client connections idling longer than that are closed.  This should be
+larger then client-side connection lifetime settings, to apply only for
+network problems.
+
+Default: 0 (disabled)
+
+==== client_login_timeout ====
+
+If client connect but does not manage to login in this time,
+it will be disconnected.  Mainly needed to avoid dead connections
+stalling SUSPEND and thus online restart.
 
 Default: 0 (disabled)
 
+
 === Low-level network settings ===
 
 ==== pkt_buf ====
@@ -191,6 +226,8 @@ Default: not set
 ==== tcp_keepintvl ====
 Default: not set
 
+
+
 == Section [databases] ==
 
 This contains key=value pairs where key will be taken as database name and value as
index 9c046300669fec22dbc84a524c63c3decbdaed40..5e6ed3575f02740575dd4ed659f7790d150bccc3 100644 (file)
@@ -40,5 +40,5 @@ Downloads, bugtracker, CVS: http://pgfoundry.org/projects/pgbouncer
 == Docs ==
 
  * Detailed usage info: ./UsageInfo
- * COnfig file help: ./ConfigFile
+ * Config file help: ./ConfigFile
  * TODO list: ./ToDo
index e01560cc9231d2535615f643972b5efe7da0b88c..1a556e8eaf16dc32cea012a8b0769c9da73d2ccd 100644 (file)
@@ -8,8 +8,6 @@
  * SHUTDOWN cmd should print notice?
  * before loading users, disable all existing?
 
- * log_connects, log_disconnects settings
-
  * Split CL_IDLE from CL_ACTIVE.  Does not give functional
  advantage, but makes monitoring easier.
 
index 027ef2876311963c2eaed90d0d2d51819673966f..13b4406fb6289dc89f62aa860e5ff0d0cdac4af1 100644 (file)
@@ -79,6 +79,12 @@ server_check_delay = 10
 max_client_conn = 100
 default_pool_size = 20
 
+log_connections = 1
+log_disconnections = 1
+
+; log error messages pooler sends to clients
+log_pooler_errors = 1
+
 ;;;
 ;;; Timeouts
 ;;;
@@ -106,6 +112,7 @@ default_pool_size = 20
 ;; Should be used to survive network problems. (default: 0)
 ;client_idle_timeout = 0
 
+client_login_timeout = 0
 
 ;;;
 ;;; Low-level tuning options
index 503c31b054a5fd32dab1851973ebf1cac1674b79..a246ebc9dffa4830239a7bf4cc33caed92dd9a24 100644 (file)
@@ -1,16 +1,16 @@
 [databases]
-marko = host=127.0.0.1
+marko = host=127.0.0.1 port=7000
 
 [pgbouncer]
 logfile = lib/pgbouncer.log
 pidfile = lib/pgbouncer.pid
 
-#listen_addr = 127.0.0.1
+listen_addr = 127.0.0.1
 listen_port = 6000
 unix_socket_dir = /tmp
 
 ; any, trust, plain, crypt, md5
-auth_type = trust
+auth_type = md5
 auth_file = etc/test.users
 
 ; When server connection is released back to pool:
@@ -20,12 +20,26 @@ auth_file = etc/test.users
 pool_mode = transaction
 
 server_check_query = select 1
-server_check_delay = 10
-max_client_conn = 2000
-default_pool_size = 80
+server_check_delay = 5
+max_client_conn = 100
+default_pool_size = 30
 
 admin_users = plproxy
 stats_users = marko
 
 stats_period = 60
 
+log_connections = 0
+log_disconnections = 0
+log_pooler_errors = 0
+
+; short timeouts
+server_lifetime = 5
+server_idle_timeout = 3
+server_connect_timeout = 1
+server_login_retry = 1
+query_timeout = 10
+client_idle_timeout = 10
+
+client_login_timeout = 3
+
index 5dd5911f7bd7e7ec322e3f7d1f232a34e004258e..1b9c05a2212bbf4bdac2e27bce55d4556d689ce4 100644 (file)
@@ -9,4 +9,4 @@
 "webstore" "" ""
 "wypbe" "md57e17e9c6cfde1c1f6f9155071d7d18a8" ""
 "wypfe" "md5e3b7c35f688032d97ab066210a33184b" ""
-"marko" "funky"
+"marko" "kama"
index c3e1f16784a9063d5f9ccd46952d10cde898d66e..323c4677a7fad72c877d0a0ec21acc869301d5db 100644 (file)
@@ -382,8 +382,8 @@ static void socket_row(PktBuf *buf, PgSocket *sk, const char *state, bool debug)
 
        pktbuf_write_DataRow(buf, debug ? SKF_DBG : SKF_STD,
                             is_server_socket(sk) ? "S" :"C",
-                            sk->auth_user->name,
-                            sk->pool->db->name,
+                            sk->auth_user ? sk->auth_user->name : "(nouser)",
+                            sk->pool ? sk->pool->db->name : "(nodb)",
                             state, addr, sk->addr.port,
                             sk->connect_time,
                             sk->request_time,
@@ -474,15 +474,16 @@ static bool admin_show_sockets(PgSocket *admin)
        socket_header(buf, true);
        statlist_for_each(item, &pool_list) {
                pool = container_of(item, PgPool, head);
-               show_socket_list(buf, &pool->active_client_list, "active", true);
-               show_socket_list(buf, &pool->waiting_client_list, "waiting", true);
-
-               show_socket_list(buf, &pool->active_server_list, "active", true);
-               show_socket_list(buf, &pool->idle_server_list, "idle", true);
-               show_socket_list(buf, &pool->used_server_list, "used", true);
-               show_socket_list(buf, &pool->tested_server_list, "tested", true);
-               show_socket_list(buf, &pool->new_server_list, "login", true);
+               show_socket_list(buf, &pool->active_client_list, "cl_active", true);
+               show_socket_list(buf, &pool->waiting_client_list, "cl_waiting", true);
+
+               show_socket_list(buf, &pool->active_server_list, "sv_active", true);
+               show_socket_list(buf, &pool->idle_server_list, "sv_idle", true);
+               show_socket_list(buf, &pool->used_server_list, "sv_used", true);
+               show_socket_list(buf, &pool->tested_server_list, "sv_tested", true);
+               show_socket_list(buf, &pool->new_server_list, "sv_login", true);
        }
+       show_socket_list(buf, &login_client_list, "cl_login", true);
        admin_flush(admin, buf, "SHOW");
        return true;
 }
index 4230e4ad4c26bd3d7e8d3d389ec42ca859c71624..f1fc7423a9c7420b3701b896920095354cce069b 100644 (file)
@@ -257,6 +257,7 @@ extern usec_t cf_server_connect_timeout;
 extern usec_t cf_server_login_retry;
 extern usec_t cf_query_timeout;
 extern usec_t cf_client_idle_timeout;
+extern usec_t cf_client_login_timeout;
 
 extern int cf_auth_type;
 extern char *cf_auth_file;
@@ -280,6 +281,10 @@ extern int cf_tcp_keepintvl;
 extern int cf_tcp_socket_buffer;
 extern int cf_tcp_defer_accept;
 
+extern int cf_log_connections;
+extern int cf_log_disconnections;
+extern int cf_log_pooler_errors;
+
 extern ConfElem bouncer_params[];
 
 
index b1ad1f604a57c5b2c059d628de8747bcb58904fe..206052b7017c40f094c38c508274a9bc6917c8f8 100644 (file)
@@ -121,7 +121,9 @@ static bool decide_startup_pool(PgSocket *client, MBuf *pkt)
                disconnect_client(client, true, "No database supplied");
                return false;
        }
-       slog_debug(client, "login request: db=%s user=%s", dbname, username);
+
+       if (cf_log_connections)
+               slog_info(client, "login request: db=%s user=%s", dbname, username);
 
        /* check if limit allows, dont limit admin db
           nb: new incoming conn will be attached to PgSocket, thus
@@ -191,7 +193,12 @@ static bool handle_client_startup(PgSocket *client, MBuf *pkt)
        case PKT_SSLREQ:
                log_noise("C: req SSL");
                log_noise("P: nak");
-               sbuf_answer(&client->sbuf, "N", 1);
+
+               /* reject SSL attempt */
+               if (!sbuf_answer(&client->sbuf, "N", 1)) {
+                       disconnect_client(client, false, "failed to nak SSL");
+                       return false;
+               }
                break;
        case PKT_STARTUP:
                if (mbuf_avail(pkt) < pkt_len - 8) {
@@ -215,7 +222,10 @@ static bool handle_client_startup(PgSocket *client, MBuf *pkt)
                        if (!finish_client_login(client))
                                return false;
                } else {
-                       send_client_authreq(client);
+                       if (!send_client_authreq(client)) {
+                               disconnect_client(client, false, "failed to send auth req");
+                               return false;
+                       }
                }
                break;
        case 'p':               /* PasswordMessage */
index 0a645039bec4afd4af4d6f386c1f89508a08775b..a7c392cf8e18a6d887cd5e3308c728d66fc7d57e 100644 (file)
@@ -284,7 +284,7 @@ static void pool_client_maint(PgPool *pool)
                        Assert(client->state == CL_WAITING);
                        if (client->query_start == 0) {
                                age = now - client->request_time;
-                               log_warning("query_start==0");
+                               //log_warning("query_start==0");
                        } else
                                age = now - client->query_start;
                        if (age > cf_query_timeout)
@@ -394,6 +394,24 @@ static void pool_server_maint(PgPool *pool)
        check_pool_size(pool);
 }
 
+static void cleanup_client_logins(void)
+{
+       List *item, *tmp;
+       PgSocket *client;
+       usec_t age;
+       usec_t now = get_cached_time();
+
+       if (cf_client_login_timeout <= 0)
+               return;
+
+       statlist_for_each_safe(item, &login_client_list, tmp) {
+               client = container_of(item, PgSocket, head);
+               age = now - client->connect_time;
+               if (age > cf_client_login_timeout)
+                       disconnect_client(client, true, "client_login_timeout");
+       }
+}
+
 /* full-scale maintenenace, done only occasionally */
 static void do_full_maint(int sock, short flags, void *arg)
 {
@@ -408,6 +426,8 @@ static void do_full_maint(int sock, short flags, void *arg)
                pool_client_maint(pool);
        }
 
+       cleanup_client_logins();
+
        if (cf_shutdown && get_active_server_count() == 0) {
                log_info("server connections dropped, exiting");
                exit(0);
index 4f48eaa070f4f3a424c11b17040a9ee101c8d622..15095a8b088ac48780e8a988df18881858200113 100644 (file)
@@ -84,6 +84,7 @@ usec_t cf_server_connect_timeout = 15*USEC;
 usec_t cf_server_login_retry = 15*USEC;
 usec_t cf_query_timeout = 0*USEC;
 usec_t cf_client_idle_timeout = 0*USEC;
+usec_t cf_client_login_timeout = 0*USEC;
 
 char *cf_logfile = NULL;
 char *cf_pidfile = NULL;
@@ -93,6 +94,9 @@ char *cf_admin_users = "";
 char *cf_stats_users = "";
 int cf_stats_period = 60;
 
+int cf_log_connections = 1;
+int cf_log_disconnections = 1;
+int cf_log_pooler_errors = 1;
 
 /*
  * config file description
@@ -115,6 +119,7 @@ ConfElem bouncer_params[] = {
 {"server_check_delay", true, CF_TIME, &cf_server_check_delay},
 {"query_timeout",      true, CF_TIME, &cf_query_timeout},
 {"client_idle_timeout",        true, CF_TIME, &cf_client_idle_timeout},
+{"client_login_timeout",true, CF_TIME, &cf_client_login_timeout},
 {"server_lifetime",    true, CF_TIME, &cf_server_lifetime},
 {"server_idle_timeout",        true, CF_TIME, &cf_server_idle_timeout},
 {"server_connect_timeout",true, CF_TIME, &cf_server_connect_timeout},
@@ -131,6 +136,9 @@ ConfElem bouncer_params[] = {
 {"admin_users",                true, CF_STR, &cf_admin_users},
 {"stats_users",                true, CF_STR, &cf_stats_users},
 {"stats_period",       true, CF_INT, &cf_stats_period},
+{"log_connections",    true, CF_INT, &cf_log_connections},
+{"log_disconnections", true, CF_INT, &cf_log_disconnections},
+{"log_pooler_errors",  true, CF_INT, &cf_log_pooler_errors},
 {NULL},
 };
 
index d17cf93fc842a08d66b8e174cafcd617f757b656..ea4cf884262fa5c3709adde8dc29fea3a38be8f7 100644 (file)
@@ -41,8 +41,8 @@ STATLIST(login_client_list);
  * is called from somewhere else.  So hide just freed
  * PgSockets for one loop.
  */
-STATLIST(justfree_client_list);
-STATLIST(justfree_server_list);
+static STATLIST(justfree_client_list);
+static STATLIST(justfree_server_list);
 
 /* how many client sockets are allocated */
 static int absolute_client_count = 0;
@@ -542,13 +542,11 @@ bool find_server(PgSocket *client)
 }
 
 /* connecting/active -> idle, unlink if needed */
-void release_server(PgSocket *server)
+bool release_server(PgSocket *server)
 {
        PgPool *pool = server->pool;
        SocketState newstate = SV_IDLE;
 
-       /* btw, this function is not allowed to disconnect,
-          as there may be packet pending */
        Assert(server->ready);
 
        /* remove from old list */
@@ -578,9 +576,24 @@ void release_server(PgSocket *server)
        /* immidiately process waiters, to give fair chance */
        if (newstate == SV_IDLE) {
                PgSocket *client = first_socket(&pool->waiting_client_list);
-               if (client)
+               if (client) {
                        activate_client(client);
+
+                       /*
+                        * As the activate_client() does full read loop,
+                        * then it may happen that linked client close
+                        * couses server close.  Report it.
+                        */
+                       switch (server->state) {
+                       case SV_FREE:
+                       case SV_JUSTFREE:
+                               return false;
+                       default:
+                               break;
+                       }
+               }
        }
+       return true;
 }
 
 /* drop server connection */
@@ -591,8 +604,8 @@ void disconnect_server(PgSocket *server, bool notify, const char *reason)
        static const uint8 pkt_term[] = {'X', 0,0,0,4};
        int send_term = 1;
 
-       log_debug("disconnect_server");
-       slog_info(server, "closing because: %s", reason);
+       if (cf_log_disconnections)
+               slog_info(server, "closing because: %s", reason);
 
        switch (server->state) {
        case SV_ACTIVE:
@@ -634,7 +647,8 @@ void disconnect_server(PgSocket *server, bool notify, const char *reason)
 /* drop client connection */
 void disconnect_client(PgSocket *client, bool notify, const char *reason)
 {
-       slog_debug(client, "closing because: %s", reason);
+       if (cf_log_disconnections)
+               slog_debug(client, "closing because: %s", reason);
 
        switch (client->state) {
        case CL_ACTIVE:
@@ -715,8 +729,10 @@ void launch_new_connection(PgPool *pool)
        pool->last_connect_time = get_cached_time();
        change_server_state(server, SV_LOGIN);
 
+       if (cf_log_connections)
+               slog_info(server, "new connection to server");
+
        /* start connecting */
-       slog_info(server, "new connection to server");
        sbuf_connect(&server->sbuf, &server->addr, cf_server_connect_timeout / USEC);
 }
 
@@ -744,7 +760,8 @@ PgSocket * accept_client(int sock,
        client->addr.is_unix = is_unix;
        change_client_state(client, CL_LOGIN);
 
-       slog_debug(client, "got connection attempt");
+       if (cf_log_connections)
+               slog_debug(client, "got connection attempt");
        sbuf_accept(&client->sbuf, sock, is_unix);
 
        return client;
index 317cf47ea82a726ad5851a2ce492c9e874547ac2..bf862fbcb083ed6e61a2417a1afe39eac7824c77 100644 (file)
@@ -28,7 +28,7 @@ PgDatabase *find_database(const char *name);
 PgUser *find_user(const char *name);
 PgPool *get_pool(PgDatabase *, PgUser *);
 bool find_server(PgSocket *client);
-void release_server(PgSocket *server);
+bool release_server(PgSocket *server);
 bool finish_client_login(PgSocket *client);
 
 PgSocket * accept_client(int sock, const struct sockaddr_in *addr, bool is_unix);
index ba650425c86e2314863dfc21d4687e77c3992848..8ed635c14ccd26b3a944b6ad6bb8e1a8fdfa6385 100644 (file)
@@ -75,7 +75,7 @@ bool pktbuf_send_immidiate(PktBuf *buf, PgSocket *sk)
                return false;
        res = safe_send(fd, pos, amount, 0);
        if (res < 0) {
-               log_error("pktbuf_send_immidiate: %s", strerror(errno));
+               log_debug("pktbuf_send_immidiate: %s", strerror(errno));
        }
        return res == amount;
 }
index aef48b092fee93de2dc25078adc6db29bcfd721f..fd5eef7bff0fcb1aa7022088f7cd1a8c7263475d 100644 (file)
@@ -78,7 +78,8 @@ bool send_pooler_error(PgSocket *client, bool send_ready, const char *msg)
        uint8 tmpbuf[512];
        PktBuf buf;
 
-       slog_error(client, "Pooler Error: %s", msg);
+       if (cf_log_pooler_errors)
+               slog_info(client, "Pooler Error: %s", msg);
 
        pktbuf_static(&buf, tmpbuf, sizeof(tmpbuf));
        pktbuf_write_generic(&buf, 'E', "cscscsc",
@@ -193,22 +194,20 @@ bool welcome_client(PgSocket *client)
  */
 
 /* actual packet send */
-static void send_password(PgSocket *server, const char *enc_psw)
+static bool send_password(PgSocket *server, const char *enc_psw)
 {
        bool res;
        SEND_PasswordMessage(res, server, enc_psw);
-       if (!res)
-               disconnect_server(server, true,
-                                 "partial send unhandled in send_password");
+       return res;
 }
 
-static void login_clear_psw(PgSocket *server)
+static bool login_clear_psw(PgSocket *server)
 {
        log_debug("P: send clear password");
-       send_password(server, server->pool->user->passwd);
+       return send_password(server, server->pool->user->passwd);
 }
 
-static void login_crypt_psw(PgSocket *server, const uint8 *salt)
+static bool login_crypt_psw(PgSocket *server, const uint8 *salt)
 {
        char saltbuf[3];
        const char *enc;
@@ -217,11 +216,11 @@ static void login_crypt_psw(PgSocket *server, const uint8 *salt)
        log_debug("P: send crypt password");
        strncpy(saltbuf, (char *)salt, 2);
        enc = pg_crypt(user->passwd, saltbuf);
-       send_password(server, enc);
+       return send_password(server, enc);
 }
 
 
-static void login_md5_psw(PgSocket *server, const uint8 *salt)
+static bool login_md5_psw(PgSocket *server, const uint8 *salt)
 {
        char txt[MD5_PASSWD_LEN + 1], *src;
        PgUser *user = server->pool->user;
@@ -234,7 +233,7 @@ static void login_md5_psw(PgSocket *server, const uint8 *salt)
                src = user->passwd + 3;
        pg_md5_encrypt(src, (char *)salt, 4, txt);
 
-       send_password(server, txt);
+       return send_password(server, txt);
 }
 
 /* answer server authentication request */
@@ -244,6 +243,7 @@ bool answer_authreq(PgSocket *server,
 {
        unsigned cmd;
        const uint8 *salt;
+       bool res = false;
 
        if (pkt_len < 5 + 4)
                return false;
@@ -254,32 +254,37 @@ bool answer_authreq(PgSocket *server,
        switch (cmd) {
        case 0:
                log_debug("S: auth ok");
+               res = true;
                break;
        case 3:
                log_debug("S: req cleartext password");
-               login_clear_psw(server);
+               res = login_clear_psw(server);
                break;
        case 4:
                if (pkt_len < 5 + 4 + 2)
                        return false;
                log_debug("S: req crypt psw");
                salt = mbuf_get_bytes(pkt, 2);
-               login_crypt_psw(server, salt);
+               res = login_crypt_psw(server, salt);
                break;
        case 5:
                if (pkt_len < 5 + 4 + 4)
                        return false;
                log_debug("S: req md5-crypted psw");
                salt = mbuf_get_bytes(pkt, 4);
-               login_md5_psw(server, salt);
+               res = login_md5_psw(server, salt);
                break;
        case 2: /* kerberos */
        case 6: /* scm something */
                log_error("unsupported auth method: %d", cmd);
+               res = false;
+               break;
        default:
                log_error("unknown auth method: %d", cmd);
+               res = false;
+               break;
        }
-       return true;
+       return res;
 }
 
 bool send_startup_packet(PgSocket *server)
index 74c319d7e6194c0b295c3fdfbfb964fc20b4cb54..97ffe2681e7fba86b3b342793c7d2bea678c6db2 100644 (file)
  */
 #define SMALL_PKT      16
 
+#define AssertSanity(sbuf) do { \
+       Assert((sbuf)->send_pos >= 0); \
+       Assert((sbuf)->send_pos <= (sbuf)->pkt_pos); \
+       Assert((sbuf)->pkt_pos <= (sbuf)->recv_pos); \
+       Assert((sbuf)->recv_pos <= cf_sbuf_len); \
+       Assert((sbuf)->pkt_remain >= 0); \
+       Assert((sbuf)->send_remain >= 0); \
+} while (0)
+
+#define AssertActive(sbuf) do { \
+       Assert((sbuf)->sock > 0); \
+       AssertSanity(sbuf); \
+} while (0)
+
 /* declare static stuff */
 static void sbuf_queue_send(SBuf *sbuf);
 static bool sbuf_send_pending(SBuf *sbuf);
@@ -41,37 +55,11 @@ static void sbuf_recv_cb(int sock, short flags, void *arg);
 static void sbuf_send_cb(int sock, short flags, void *arg);
 static void sbuf_try_resync(SBuf *sbuf);
 static void sbuf_wait_for_data(SBuf *sbuf);
+static bool sbuf_call_proto(SBuf *sbuf, int event);
 
-/*
- * Call proto callback with proper MBuf.
- *
- * If callback returns true it used one of sbuf_prepare_* on sbuf,
- * and processing can continue.
- *
- * If it returned false it used sbuf_pause(), sbuf_close() or simply
- * wants to wait for next event loop (eg. too few data available).
- * Callee should not touch sbuf in that case and just return to libevent.
- */
-static inline bool sbuf_call_proto(SBuf *sbuf, int event)
-{
-       MBuf mbuf;
-       uint8 *pos = sbuf->buf + sbuf->pkt_pos;
-       int avail = sbuf->recv_pos - sbuf->pkt_pos;
-
-       Assert(avail >= 0);
-       Assert(pos + avail <= sbuf->buf + cf_sbuf_len);
-       Assert(event != SBUF_EV_READ || avail > 0);
-
-       mbuf_init(&mbuf, pos, avail);
-       return sbuf->proto_handler(sbuf, event, &mbuf, sbuf->arg);
-}
-
-/* lets wait for new data */
-static void sbuf_wait_for_data(SBuf *sbuf)
-{
-       event_set(&sbuf->ev, sbuf->sock, EV_READ | EV_PERSIST, sbuf_recv_cb, sbuf);
-       event_add(&sbuf->ev, NULL);
-}
+/*********************************
+ * Public functions
+ *********************************/
 
 /* initialize SBuf with proto handler */
 void sbuf_init(SBuf *sbuf, sbuf_proto_cb_t proto_fn, void *arg)
@@ -84,9 +72,8 @@ void sbuf_init(SBuf *sbuf, sbuf_proto_cb_t proto_fn, void *arg)
 /* got new socket from accept() */
 void sbuf_accept(SBuf *sbuf, int sock, bool is_unix)
 {
-       Assert(sbuf->pkt_pos == 0);
-       Assert(sbuf->recv_pos == 0);
-       Assert(sbuf->send_pos == 0);
+       Assert(sbuf->recv_pos == 0 && sbuf->sock == 0);
+       AssertSanity(sbuf);
 
        tune_socket(sock, is_unix);
        sbuf->sock = sock;
@@ -111,6 +98,9 @@ void sbuf_connect(SBuf *sbuf, const PgAddr *addr, int timeout_sec)
        socklen_t len;
        struct timeval timeout;
 
+       Assert(sbuf->recv_pos == 0 && sbuf->sock == 0);
+       AssertSanity(sbuf);
+
        /* prepare sockaddr */
        if (addr->is_unix) {
                sa = (void*)&sa_un;
@@ -171,6 +161,7 @@ void sbuf_connect(SBuf *sbuf, const PgAddr *addr, int timeout_sec)
 /* dont wait for data on this socket */
 void sbuf_pause(SBuf *sbuf)
 {
+       AssertActive(sbuf);
        Assert(sbuf->wait_send == 0);
 
        event_del(&sbuf->ev);
@@ -179,6 +170,8 @@ void sbuf_pause(SBuf *sbuf)
 /* resume from pause, start waiting for data */
 void sbuf_continue(SBuf *sbuf)
 {
+       AssertActive(sbuf);
+
        sbuf_wait_for_data(sbuf);
 
        /* there is some data already received */
@@ -193,6 +186,8 @@ void sbuf_continue(SBuf *sbuf)
  */
 void sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb user_cb)
 {
+       AssertActive(sbuf);
+
        event_set(&sbuf->ev, sbuf->sock, EV_READ | EV_PERSIST,
                  user_cb, sbuf->arg);
        event_add(&sbuf->ev, NULL);
@@ -214,8 +209,9 @@ void sbuf_close(SBuf *sbuf)
 }
 
 /* proto_fn tells to send some bytes to socket */
-void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, unsigned amount, bool flush)
+void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, int amount, bool flush)
 {
+       AssertActive(sbuf);
        Assert(sbuf->pkt_remain == 0);
        Assert(sbuf->pkt_skip == 0 || sbuf->send_remain == 0);
        Assert(!sbuf->pkt_flush || sbuf->send_remain == 0);
@@ -230,6 +226,7 @@ void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, unsigned amount, bool flush)
 /* proto_fn tells to skip sone amount of bytes */
 void sbuf_prepare_skip(SBuf *sbuf, int amount)
 {
+       AssertActive(sbuf);
        Assert(sbuf->pkt_remain == 0);
        Assert(sbuf->pkt_skip == 0 || sbuf->send_remain == 0);
        Assert(!sbuf->pkt_flush || sbuf->send_remain == 0);
@@ -241,6 +238,47 @@ void sbuf_prepare_skip(SBuf *sbuf, int amount)
        sbuf->dst = NULL;
 }
 
+/*************************
+ * Internal functions
+ *************************/
+
+/*
+ * Call proto callback with proper MBuf.
+ *
+ * If callback returns true it used one of sbuf_prepare_* on sbuf,
+ * and processing can continue.
+ *
+ * If it returned false it used sbuf_pause(), sbuf_close() or simply
+ * wants to wait for next event loop (eg. too few data available).
+ * Callee should not touch sbuf in that case and just return to libevent.
+ */
+static bool sbuf_call_proto(SBuf *sbuf, int event)
+{
+       MBuf mbuf;
+       uint8 *pos = sbuf->buf + sbuf->pkt_pos;
+       int avail = sbuf->recv_pos - sbuf->pkt_pos;
+       bool res;
+
+       AssertSanity(sbuf);
+       Assert(event != SBUF_EV_READ || avail > 0);
+
+       mbuf_init(&mbuf, pos, avail);
+       res = sbuf->proto_handler(sbuf, event, &mbuf, sbuf->arg);
+
+       AssertSanity(sbuf);
+       if (event == SBUF_EV_READ && res)
+               Assert(sbuf->sock > 0);
+
+       return res;
+}
+
+/* lets wait for new data */
+static void sbuf_wait_for_data(SBuf *sbuf)
+{
+       event_set(&sbuf->ev, sbuf->sock, EV_READ | EV_PERSIST, sbuf_recv_cb, sbuf);
+       event_add(&sbuf->ev, NULL);
+}
+
 /* libevent EV_WRITE: called when dest socket is writable again */
 static void sbuf_send_cb(int sock, short flags, void *arg)
 {
@@ -250,6 +288,8 @@ static void sbuf_send_cb(int sock, short flags, void *arg)
        if (!sbuf->sock)
                return;
 
+       AssertSanity(sbuf);
+
        /* prepare normal situation for sbuf_recv_cb() */
        sbuf->wait_send = 0;
        sbuf_wait_for_data(sbuf);
@@ -260,6 +300,8 @@ static void sbuf_send_cb(int sock, short flags, void *arg)
 /* socket is full, wait until its writable again */
 static void sbuf_queue_send(SBuf *sbuf)
 {
+       AssertActive(sbuf);
+
        sbuf->wait_send = 1;
        event_del(&sbuf->ev);
        event_set(&sbuf->ev, sbuf->dst->sock, EV_WRITE, sbuf_send_cb, sbuf);
@@ -276,6 +318,7 @@ static bool sbuf_send_pending(SBuf *sbuf)
        int res, avail;
        uint8 *pos;
 
+       AssertActive(sbuf);
        Assert(sbuf->dst || !sbuf->send_remain);
 
 try_more:
@@ -284,7 +327,7 @@ try_more:
        if (avail > sbuf->send_remain)
                avail = sbuf->send_remain;
        if (avail == 0)
-               return true;
+               goto all_sent;
 
        if (sbuf->dst->sock == 0) {
                log_error("sbuf_send_pending: no dst sock?");
@@ -294,26 +337,34 @@ try_more:
        /* actually send it */
        pos = sbuf->buf + sbuf->send_pos;
        res = safe_send(sbuf->dst->sock, pos, avail, 0);
-       if (res >= 0) {
-               sbuf->send_remain -= res;
-               sbuf->send_pos += res;
-
-               if (res < avail) {
-                       /*
-                        * Should do sbuf_queue_send() immidiately?
-                        *
-                        * To be sure, lets run into EAGAIN.
-                        */
-                       goto try_more;
-               }
-               return true;
-       } else if (errno == EAGAIN) {
-               sbuf_queue_send(sbuf);
-               return false;
-       } else {
-               sbuf_call_proto(sbuf, SBUF_EV_SEND_FAILED);
+       if (res < 0) {
+               if (errno == EAGAIN)
+                       sbuf_queue_send(sbuf);
+               else
+                       sbuf_call_proto(sbuf, SBUF_EV_SEND_FAILED);
                return false;
        }
+
+       sbuf->send_remain -= res;
+       sbuf->send_pos += res;
+
+       AssertActive(sbuf);
+
+       /*
+        * Should do sbuf_queue_send() immidiately?
+        *
+        * To be sure, lets run into EAGAIN.
+        */
+       if (res < avail)
+               goto try_more;
+
+all_sent:
+
+       /* send_pos may lag pkt_pos in case of skip packets, move it here */
+       if (sbuf->send_remain == 0 && sbuf->send_pos < sbuf->pkt_pos)
+               sbuf->send_pos = sbuf->pkt_pos;
+
+       return true;
 }
 
 /* process as much data as possible */
@@ -324,7 +375,7 @@ static bool sbuf_process_pending(SBuf *sbuf)
        bool res;
 
        while (1) {
-               Assert(sbuf->recv_pos >= sbuf->pkt_pos);
+               AssertActive(sbuf);
 
                /*
                 * Enough for now?
@@ -373,28 +424,20 @@ static void sbuf_try_resync(SBuf *sbuf)
 {
        int avail;
 
-       if (sbuf->pkt_pos == 0)
+       AssertActive(sbuf);
+
+       if (sbuf->send_pos == 0)
                return;
 
-       if (sbuf->send_remain > 0)
-               avail = sbuf->recv_pos - sbuf->send_pos;
-       else
-               avail = sbuf->recv_pos - sbuf->pkt_pos;
+       avail = sbuf->recv_pos - sbuf->send_pos;
 
        if (avail == 0) {
                sbuf->recv_pos = sbuf->pkt_pos = sbuf->send_pos = 0;
        } else if (avail <= SMALL_PKT) {
-               if (sbuf->send_remain > 0) {
-                       memmove(sbuf->buf, sbuf->buf + sbuf->send_pos, avail);
-                       sbuf->pkt_pos -= sbuf->send_pos;
-                       sbuf->send_pos = 0;
-                       sbuf->recv_pos = avail;
-               } else {
-                       memmove(sbuf->buf, sbuf->buf + sbuf->pkt_pos, avail);
-                       sbuf->send_pos = 0;
-                       sbuf->pkt_pos = 0;
-                       sbuf->recv_pos = avail;
-               }
+               memmove(sbuf->buf, sbuf->buf + sbuf->send_pos, avail);
+               sbuf->pkt_pos -= sbuf->send_pos;
+               sbuf->send_pos = 0;
+               sbuf->recv_pos = avail;
        }
 }
 
@@ -404,6 +447,10 @@ static bool sbuf_actual_recv(SBuf *sbuf, int len)
        int got;
        uint8 *pos;
 
+       AssertActive(sbuf);
+       Assert(len > 0);
+       Assert(sbuf->recv_pos + len <= cf_sbuf_len);
+
        pos = sbuf->buf + sbuf->recv_pos;
        got = safe_recv(sbuf->sock, pos, len, 0);
 
@@ -438,6 +485,7 @@ static void sbuf_recv_cb(int sock, short flags, void *arg)
 
        /* reading should be disabled when waiting */
        Assert(sbuf->wait_send == 0);
+       AssertSanity(sbuf);
 
 try_more:
        /* make room in buffer */
@@ -481,12 +529,12 @@ static bool sbuf_after_connect_check(SBuf *sbuf)
        err = getsockopt(sbuf->sock, SOL_SOCKET, SO_ERROR, (void*)&optval, &optlen);
        if (err < 0) {
                log_error("sbuf_after_connect_check: getsockopt: %s",
-                               strerror(errno));
+                         strerror(errno));
                return false;
        }
        if (optval != 0) {
                log_error("sbuf_after_connect_check: pending error: %s",
-                               strerror(optval));
+                         strerror(optval));
                return false;
        }
        return true;
@@ -517,9 +565,9 @@ bool sbuf_answer(SBuf *sbuf, const void *buf, int len)
                return false;
        res = safe_send(sbuf->sock, buf, len, 0);
        if (res < 0)
-               log_error("sbuf_answer: error sending: %s", strerror(errno));
+               log_debug("sbuf_answer: error sending: %s", strerror(errno));
        else if (res != len)
-               log_error("sbuf_answer: partial send: len=%d sent=%d", len, res);
+               log_debug("sbuf_answer: partial send: len=%d sent=%d", len, res);
        return res == len;
 }
 
index bb39382d5c89ac9a1950543849c4bd49282f053d..ea731ce27a61ea04b1fa633274e9c893db84b770 100644 (file)
@@ -52,11 +52,11 @@ struct SBuf {
        /* dest SBuf for current packet */
        SBuf *dst;
 
-       unsigned recv_pos;
-       unsigned pkt_pos;
-       unsigned pkt_remain;
-       unsigned send_pos;
-       unsigned send_remain;
+       int recv_pos;
+       int pkt_pos;
+       int pkt_remain;
+       int send_pos;
+       int send_remain;
 
        unsigned wait_send:1;
        unsigned pkt_skip:1;
@@ -77,7 +77,7 @@ void sbuf_continue(SBuf *sbuf);
 void sbuf_close(SBuf *sbuf);
 
 /* proto_fn can use those functions to order behaviour */
-void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, unsigned amount, bool flush);
+void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, int amount, bool flush);
 void sbuf_prepare_skip(SBuf *sbuf, int amount);
 
 bool sbuf_answer(SBuf *sbuf, const void *buf, int len);
index 9f4ee346f6c1d2f0a84538ec68eca30c5a5da440..8bae665ff99aabbdd14d5954ae43b9ad960c04e8 100644 (file)
@@ -56,6 +56,8 @@ static bool handle_server_startup(PgSocket *server, MBuf *pkt)
        case 'R':               /* AuthenticationXXX */
                log_debug("calling login_answer");
                res = answer_authreq(server, pkt_type, pkt_len, pkt);
+               if (!res)
+                       disconnect_server(server, false, "failed to answer authreq");
                break;
        case 'S':               /* ParameterStatus */
                res = add_welcome_parameter(server, pkt_type, pkt_len, pkt);
@@ -65,13 +67,15 @@ static bool handle_server_startup(PgSocket *server, MBuf *pkt)
                log_debug("server login ok, start accepting queries");
                server->ready = 1;
 
+               /* got all params */
                finish_welcome_msg(server);
-               release_server(server);
+
+               // FIXME: res check
+               res = release_server(server);
 
                /* let the takeover process handle it */
-               if (server->pool->admin)
-                       takeover_login(server);
-               res = true;
+               if (res && server->pool->admin)
+                       res = takeover_login(server);
                break;
 
        /* ignorable packets */
@@ -282,6 +286,7 @@ bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *pkt, void *arg)
                        switch (server->state) {
                        case SV_ACTIVE:
                        case SV_TESTED:
+                               /* retval does not matter here */
                                release_server(server);
                                break;
                        default:
index 8a813ba9a21a613a5d944ca11df981abfc559771..35123d192e9e3e918df0d644aa999b79beced890 100644 (file)
@@ -57,7 +57,8 @@
 #endif
 
 #ifdef CASSERT
-#define Assert(e) do { if (!(e)) fatal("Assert(%s) failed", #e); } while (0)
+#define Assert(e) do { if (!(e)) { \
+       fatal_noexit("Assert(%s) failed", #e); abort(); } } while (0)
 #else
 #define Assert(e)
 #endif
index 94bb1b330ac6952acb3ca841892c4c78144d81ee..c8ca0d37614af2145f85f4a3572e6cf0ca04db5a 100644 (file)
@@ -36,6 +36,7 @@ static void takeover_finish(PgSocket *bouncer)
        disconnect_server(bouncer, false, "disko over");
        cf_reboot = 0;
        resume_all();
+       log_info("disko over, resuming work");
 }
 
 /* parse msg for fd and info */
@@ -257,16 +258,20 @@ static void takeover_recv_cb(int sock, short flags, void *arg)
  * login finished, send first command,
  * replace recv callback with custom recvmsg() based one.
  */
-void takeover_login(PgSocket *bouncer)
+bool takeover_login(PgSocket *bouncer)
 {
        bool res;
 
        slog_info(bouncer, "Login OK, sending SUSPEND");
        SEND_generic(res, bouncer, 'Q', "s", "SUSPEND;");
-
-       /* use own callback */
-       sbuf_pause(&bouncer->sbuf);
-       sbuf_continue_with_callback(&bouncer->sbuf, takeover_recv_cb);
+       if (res) {
+               /* use own callback */
+               sbuf_pause(&bouncer->sbuf);
+               sbuf_continue_with_callback(&bouncer->sbuf, takeover_recv_cb);
+       } else {
+               disconnect_server(bouncer, false, "failed to send command");
+       }
+       return res;
 }
 
 /* launch connection to running process */
index 08ca09599a9a9e96997c17387d04259af5523b23..9011c8df2a19d3e8d290aa0fb2972a7d7489b22b 100644 (file)
@@ -17,5 +17,5 @@
  */
 
 void takeover_init(void);
-void takeover_login(PgSocket *bouncer);
+bool takeover_login(PgSocket *bouncer);
 
index a7a26370b02f5722cad7245cb19d795802eca018..65f06a06415db516f300bfac2245f35c286924b0 100644 (file)
@@ -104,7 +104,7 @@ static void _log(const char *pfx, const char *fmt, va_list ap)
 }
 
 void _fatal(const char *file, int line, const char *func,
-           const char *fmt, ...)
+           bool do_exit, const char *fmt, ...)
 {
        va_list ap;
        char buf[1024];
@@ -116,9 +116,8 @@ void _fatal(const char *file, int line, const char *func,
        va_start(ap, fmt);
        _log("FATAL", buf, ap);
        va_end(ap);
-       if (cf_verbose > 2)
-               abort();
-       exit(1);
+       if (do_exit)
+               exit(1);
 }
 
 void _fatal_perror(const char *file, int line, const char *func,
@@ -129,7 +128,7 @@ void _fatal_perror(const char *file, int line, const char *func,
        va_start(ap, fmt);
        vsnprintf(buf, sizeof(buf), fmt, ap);
        va_end(ap);
-       _fatal(file, line, func, "%s: %s", buf, strerror(errno));
+       _fatal(file, line, func, false, "%s: %s", buf, strerror(errno));
 }
 
 /*
index 89ddd4536b0edc749d149f27a1d00de3f976ace6..fee4d6656a682ebd152b732b6d91c089847c1a6b 100644 (file)
@@ -62,10 +62,12 @@ void slog_level(const char *level, const PgSocket *sock, const char *fmt, ...);
 /*
  * log and exit
  */
-void _fatal(const char *file, int line, const char *func, const char *s, ...);
+void _fatal(const char *file, int line, const char *func, bool do_exit, const char *s, ...);
 void _fatal_perror(const char *file, int line, const char *func, const char *s, ...);
 #define fatal(args...) \
-       _fatal(__FILE__, __LINE__, __FUNCTION__, ## args)
+       _fatal(__FILE__, __LINE__, __FUNCTION__, true, ## args)
+#define fatal_noexit(args...) \
+       _fatal(__FILE__, __LINE__, __FUNCTION__, false, ## args)
 #define fatal_perror(args...) \
        _fatal_perror(__FILE__, __LINE__, __FUNCTION__, ## args)