From 3e6133df826b1492617ae6afbd0935ead85dec40 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Wed, 11 Apr 2007 07:39:49 +0000 Subject: [PATCH] version 1.0.3 = Fixes = * Some error handling was missing in login path, so dying connection there could trigger asserts. * Cleanup of asserts in sbuf.c to catch problems earlier. * Create core when Assert() triggers. = New stuff = * New config vars: log_connections, log_disconnections, log_pooler_errors to turn on/off noise. * Config var: client_login_timeout to kill dead connections in login phase that could stall SUSPEND and thus online restart. --- NEWS | 16 ++++ configure.ac | 2 +- debian/changelog | 8 +- doc/config.txt | 39 ++++++++- doc/overview.txt | 2 +- doc/todo.txt | 2 - etc/pgbouncer.ini | 7 ++ etc/test.ini | 26 ++++-- etc/test.users | 2 +- src/admin.c | 21 ++--- src/bouncer.h | 5 ++ src/client.c | 16 +++- src/janitor.c | 22 +++++- src/main.c | 8 ++ src/objects.c | 39 ++++++--- src/objects.h | 2 +- src/pktbuf.c | 2 +- src/proto.c | 35 +++++---- src/sbuf.c | 196 +++++++++++++++++++++++++++++----------------- src/sbuf.h | 12 +-- src/server.c | 13 ++- src/system.h | 3 +- src/takeover.c | 15 ++-- src/takeover.h | 2 +- src/util.c | 9 +-- src/util.h | 6 +- 26 files changed, 357 insertions(+), 153 deletions(-) diff --git a/NEWS b/NEWS index 8ab22bc..d908aca 100644 --- a/NEWS +++ b/NEWS @@ -1,4 +1,20 @@ +2007-04-11 - PgBouncer 1.0.3 - "Fearless Fork" + + = Fixes = + + * Some error handling was missing in login path, so dying + connection there could trigger asserts. + * Cleanup of asserts in sbuf.c to catch problems earlier. + * Create core when Assert() triggers. + + = New stuff = + + * New config vars: log_connections, log_disconnections, + log_pooler_errors to turn on/off noise. + * Config var: client_login_timeout to kill dead connections + in login phase that could stall SUSPEND and thus online restart. + 2007-03-28 - PgBouncer 1.0.2 - "Supersonic Spoon" * libevent may report a deleted event inside same loop. diff --git a/configure.ac b/configure.ac index c74ab49..a4a4146 100644 --- a/configure.ac +++ b/configure.ac @@ -1,6 +1,6 @@ dnl Process this file with autoconf to produce a configure script. -AC_INIT(pgbouncer, 1.0.2) +AC_INIT(pgbouncer, 1.0.3) AC_CONFIG_SRCDIR(src/bouncer.h) AC_CONFIG_HEADER(config.h) diff --git a/debian/changelog b/debian/changelog index 0eb83bf..b7e94a1 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,4 +1,10 @@ -pgbouncer (1.0.2) unstable; urgency=low +pgbouncer (1.0.3-1) unstable; urgency=low + + * more error handling fixes. + + -- Marko Kreen Tue, 10 Apr 2007 17:22:49 +0300 + +pgbouncer (1.0.2-1) unstable; urgency=low * 2 more bugs. diff --git a/doc/config.txt b/doc/config.txt index b842ae4..09ccf03 100644 --- a/doc/config.txt +++ b/doc/config.txt @@ -100,6 +100,28 @@ Maximin number of client connections allowed. How many server connection to allow per user/database pair. Can be overrided in per-database config. + + +=== Log settings === + +==== log_connections ==== + +Log successful logins. + +Default: 1 + +==== log_disconnections ==== + +Log disconnections with reasons. + +Default: 1 + +==== log_pooler_errors ==== + +Log error messaged pooler sends to clients. + +Default: 1 + === Console access control === ==== admin_users ==== @@ -110,6 +132,8 @@ List of users that are allowed to run all commands on console. List of users that are allowed to run read-only queries on console. Thats means all SHOW commands except SHOW FDS. + + === Connection sanity checks, timeouts === ==== server_check_delay ==== @@ -155,10 +179,21 @@ Default: 0 (disabled) ==== client_idle_timeout ==== -Client connections idling longer than that are closed. +Client connections idling longer than that are closed. This should be +larger then client-side connection lifetime settings, to apply only for +network problems. + +Default: 0 (disabled) + +==== client_login_timeout ==== + +If client connect but does not manage to login in this time, +it will be disconnected. Mainly needed to avoid dead connections +stalling SUSPEND and thus online restart. Default: 0 (disabled) + === Low-level network settings === ==== pkt_buf ==== @@ -191,6 +226,8 @@ Default: not set ==== tcp_keepintvl ==== Default: not set + + == Section [databases] == This contains key=value pairs where key will be taken as database name and value as diff --git a/doc/overview.txt b/doc/overview.txt index 9c04630..5e6ed35 100644 --- a/doc/overview.txt +++ b/doc/overview.txt @@ -40,5 +40,5 @@ Downloads, bugtracker, CVS: http://pgfoundry.org/projects/pgbouncer == Docs == * Detailed usage info: ./UsageInfo - * COnfig file help: ./ConfigFile + * Config file help: ./ConfigFile * TODO list: ./ToDo diff --git a/doc/todo.txt b/doc/todo.txt index e01560c..1a556e8 100644 --- a/doc/todo.txt +++ b/doc/todo.txt @@ -8,8 +8,6 @@ * SHUTDOWN cmd should print notice? * before loading users, disable all existing? - * log_connects, log_disconnects settings - * Split CL_IDLE from CL_ACTIVE. Does not give functional advantage, but makes monitoring easier. diff --git a/etc/pgbouncer.ini b/etc/pgbouncer.ini index 027ef28..13b4406 100644 --- a/etc/pgbouncer.ini +++ b/etc/pgbouncer.ini @@ -79,6 +79,12 @@ server_check_delay = 10 max_client_conn = 100 default_pool_size = 20 +log_connections = 1 +log_disconnections = 1 + +; log error messages pooler sends to clients +log_pooler_errors = 1 + ;;; ;;; Timeouts ;;; @@ -106,6 +112,7 @@ default_pool_size = 20 ;; Should be used to survive network problems. (default: 0) ;client_idle_timeout = 0 +client_login_timeout = 0 ;;; ;;; Low-level tuning options diff --git a/etc/test.ini b/etc/test.ini index 503c31b..a246ebc 100644 --- a/etc/test.ini +++ b/etc/test.ini @@ -1,16 +1,16 @@ [databases] -marko = host=127.0.0.1 +marko = host=127.0.0.1 port=7000 [pgbouncer] logfile = lib/pgbouncer.log pidfile = lib/pgbouncer.pid -#listen_addr = 127.0.0.1 +listen_addr = 127.0.0.1 listen_port = 6000 unix_socket_dir = /tmp ; any, trust, plain, crypt, md5 -auth_type = trust +auth_type = md5 auth_file = etc/test.users ; When server connection is released back to pool: @@ -20,12 +20,26 @@ auth_file = etc/test.users pool_mode = transaction server_check_query = select 1 -server_check_delay = 10 -max_client_conn = 2000 -default_pool_size = 80 +server_check_delay = 5 +max_client_conn = 100 +default_pool_size = 30 admin_users = plproxy stats_users = marko stats_period = 60 +log_connections = 0 +log_disconnections = 0 +log_pooler_errors = 0 + +; short timeouts +server_lifetime = 5 +server_idle_timeout = 3 +server_connect_timeout = 1 +server_login_retry = 1 +query_timeout = 10 +client_idle_timeout = 10 + +client_login_timeout = 3 + diff --git a/etc/test.users b/etc/test.users index 5dd5911..1b9c05a 100644 --- a/etc/test.users +++ b/etc/test.users @@ -9,4 +9,4 @@ "webstore" "" "" "wypbe" "md57e17e9c6cfde1c1f6f9155071d7d18a8" "" "wypfe" "md5e3b7c35f688032d97ab066210a33184b" "" -"marko" "funky" +"marko" "kama" diff --git a/src/admin.c b/src/admin.c index c3e1f16..323c467 100644 --- a/src/admin.c +++ b/src/admin.c @@ -382,8 +382,8 @@ static void socket_row(PktBuf *buf, PgSocket *sk, const char *state, bool debug) pktbuf_write_DataRow(buf, debug ? SKF_DBG : SKF_STD, is_server_socket(sk) ? "S" :"C", - sk->auth_user->name, - sk->pool->db->name, + sk->auth_user ? sk->auth_user->name : "(nouser)", + sk->pool ? sk->pool->db->name : "(nodb)", state, addr, sk->addr.port, sk->connect_time, sk->request_time, @@ -474,15 +474,16 @@ static bool admin_show_sockets(PgSocket *admin) socket_header(buf, true); statlist_for_each(item, &pool_list) { pool = container_of(item, PgPool, head); - show_socket_list(buf, &pool->active_client_list, "active", true); - show_socket_list(buf, &pool->waiting_client_list, "waiting", true); - - show_socket_list(buf, &pool->active_server_list, "active", true); - show_socket_list(buf, &pool->idle_server_list, "idle", true); - show_socket_list(buf, &pool->used_server_list, "used", true); - show_socket_list(buf, &pool->tested_server_list, "tested", true); - show_socket_list(buf, &pool->new_server_list, "login", true); + show_socket_list(buf, &pool->active_client_list, "cl_active", true); + show_socket_list(buf, &pool->waiting_client_list, "cl_waiting", true); + + show_socket_list(buf, &pool->active_server_list, "sv_active", true); + show_socket_list(buf, &pool->idle_server_list, "sv_idle", true); + show_socket_list(buf, &pool->used_server_list, "sv_used", true); + show_socket_list(buf, &pool->tested_server_list, "sv_tested", true); + show_socket_list(buf, &pool->new_server_list, "sv_login", true); } + show_socket_list(buf, &login_client_list, "cl_login", true); admin_flush(admin, buf, "SHOW"); return true; } diff --git a/src/bouncer.h b/src/bouncer.h index 4230e4a..f1fc742 100644 --- a/src/bouncer.h +++ b/src/bouncer.h @@ -257,6 +257,7 @@ extern usec_t cf_server_connect_timeout; extern usec_t cf_server_login_retry; extern usec_t cf_query_timeout; extern usec_t cf_client_idle_timeout; +extern usec_t cf_client_login_timeout; extern int cf_auth_type; extern char *cf_auth_file; @@ -280,6 +281,10 @@ extern int cf_tcp_keepintvl; extern int cf_tcp_socket_buffer; extern int cf_tcp_defer_accept; +extern int cf_log_connections; +extern int cf_log_disconnections; +extern int cf_log_pooler_errors; + extern ConfElem bouncer_params[]; diff --git a/src/client.c b/src/client.c index b1ad1f6..206052b 100644 --- a/src/client.c +++ b/src/client.c @@ -121,7 +121,9 @@ static bool decide_startup_pool(PgSocket *client, MBuf *pkt) disconnect_client(client, true, "No database supplied"); return false; } - slog_debug(client, "login request: db=%s user=%s", dbname, username); + + if (cf_log_connections) + slog_info(client, "login request: db=%s user=%s", dbname, username); /* check if limit allows, dont limit admin db nb: new incoming conn will be attached to PgSocket, thus @@ -191,7 +193,12 @@ static bool handle_client_startup(PgSocket *client, MBuf *pkt) case PKT_SSLREQ: log_noise("C: req SSL"); log_noise("P: nak"); - sbuf_answer(&client->sbuf, "N", 1); + + /* reject SSL attempt */ + if (!sbuf_answer(&client->sbuf, "N", 1)) { + disconnect_client(client, false, "failed to nak SSL"); + return false; + } break; case PKT_STARTUP: if (mbuf_avail(pkt) < pkt_len - 8) { @@ -215,7 +222,10 @@ static bool handle_client_startup(PgSocket *client, MBuf *pkt) if (!finish_client_login(client)) return false; } else { - send_client_authreq(client); + if (!send_client_authreq(client)) { + disconnect_client(client, false, "failed to send auth req"); + return false; + } } break; case 'p': /* PasswordMessage */ diff --git a/src/janitor.c b/src/janitor.c index 0a64503..a7c392c 100644 --- a/src/janitor.c +++ b/src/janitor.c @@ -284,7 +284,7 @@ static void pool_client_maint(PgPool *pool) Assert(client->state == CL_WAITING); if (client->query_start == 0) { age = now - client->request_time; - log_warning("query_start==0"); + //log_warning("query_start==0"); } else age = now - client->query_start; if (age > cf_query_timeout) @@ -394,6 +394,24 @@ static void pool_server_maint(PgPool *pool) check_pool_size(pool); } +static void cleanup_client_logins(void) +{ + List *item, *tmp; + PgSocket *client; + usec_t age; + usec_t now = get_cached_time(); + + if (cf_client_login_timeout <= 0) + return; + + statlist_for_each_safe(item, &login_client_list, tmp) { + client = container_of(item, PgSocket, head); + age = now - client->connect_time; + if (age > cf_client_login_timeout) + disconnect_client(client, true, "client_login_timeout"); + } +} + /* full-scale maintenenace, done only occasionally */ static void do_full_maint(int sock, short flags, void *arg) { @@ -408,6 +426,8 @@ static void do_full_maint(int sock, short flags, void *arg) pool_client_maint(pool); } + cleanup_client_logins(); + if (cf_shutdown && get_active_server_count() == 0) { log_info("server connections dropped, exiting"); exit(0); diff --git a/src/main.c b/src/main.c index 4f48eaa..15095a8 100644 --- a/src/main.c +++ b/src/main.c @@ -84,6 +84,7 @@ usec_t cf_server_connect_timeout = 15*USEC; usec_t cf_server_login_retry = 15*USEC; usec_t cf_query_timeout = 0*USEC; usec_t cf_client_idle_timeout = 0*USEC; +usec_t cf_client_login_timeout = 0*USEC; char *cf_logfile = NULL; char *cf_pidfile = NULL; @@ -93,6 +94,9 @@ char *cf_admin_users = ""; char *cf_stats_users = ""; int cf_stats_period = 60; +int cf_log_connections = 1; +int cf_log_disconnections = 1; +int cf_log_pooler_errors = 1; /* * config file description @@ -115,6 +119,7 @@ ConfElem bouncer_params[] = { {"server_check_delay", true, CF_TIME, &cf_server_check_delay}, {"query_timeout", true, CF_TIME, &cf_query_timeout}, {"client_idle_timeout", true, CF_TIME, &cf_client_idle_timeout}, +{"client_login_timeout",true, CF_TIME, &cf_client_login_timeout}, {"server_lifetime", true, CF_TIME, &cf_server_lifetime}, {"server_idle_timeout", true, CF_TIME, &cf_server_idle_timeout}, {"server_connect_timeout",true, CF_TIME, &cf_server_connect_timeout}, @@ -131,6 +136,9 @@ ConfElem bouncer_params[] = { {"admin_users", true, CF_STR, &cf_admin_users}, {"stats_users", true, CF_STR, &cf_stats_users}, {"stats_period", true, CF_INT, &cf_stats_period}, +{"log_connections", true, CF_INT, &cf_log_connections}, +{"log_disconnections", true, CF_INT, &cf_log_disconnections}, +{"log_pooler_errors", true, CF_INT, &cf_log_pooler_errors}, {NULL}, }; diff --git a/src/objects.c b/src/objects.c index d17cf93..ea4cf88 100644 --- a/src/objects.c +++ b/src/objects.c @@ -41,8 +41,8 @@ STATLIST(login_client_list); * is called from somewhere else. So hide just freed * PgSockets for one loop. */ -STATLIST(justfree_client_list); -STATLIST(justfree_server_list); +static STATLIST(justfree_client_list); +static STATLIST(justfree_server_list); /* how many client sockets are allocated */ static int absolute_client_count = 0; @@ -542,13 +542,11 @@ bool find_server(PgSocket *client) } /* connecting/active -> idle, unlink if needed */ -void release_server(PgSocket *server) +bool release_server(PgSocket *server) { PgPool *pool = server->pool; SocketState newstate = SV_IDLE; - /* btw, this function is not allowed to disconnect, - as there may be packet pending */ Assert(server->ready); /* remove from old list */ @@ -578,9 +576,24 @@ void release_server(PgSocket *server) /* immidiately process waiters, to give fair chance */ if (newstate == SV_IDLE) { PgSocket *client = first_socket(&pool->waiting_client_list); - if (client) + if (client) { activate_client(client); + + /* + * As the activate_client() does full read loop, + * then it may happen that linked client close + * couses server close. Report it. + */ + switch (server->state) { + case SV_FREE: + case SV_JUSTFREE: + return false; + default: + break; + } + } } + return true; } /* drop server connection */ @@ -591,8 +604,8 @@ void disconnect_server(PgSocket *server, bool notify, const char *reason) static const uint8 pkt_term[] = {'X', 0,0,0,4}; int send_term = 1; - log_debug("disconnect_server"); - slog_info(server, "closing because: %s", reason); + if (cf_log_disconnections) + slog_info(server, "closing because: %s", reason); switch (server->state) { case SV_ACTIVE: @@ -634,7 +647,8 @@ void disconnect_server(PgSocket *server, bool notify, const char *reason) /* drop client connection */ void disconnect_client(PgSocket *client, bool notify, const char *reason) { - slog_debug(client, "closing because: %s", reason); + if (cf_log_disconnections) + slog_debug(client, "closing because: %s", reason); switch (client->state) { case CL_ACTIVE: @@ -715,8 +729,10 @@ void launch_new_connection(PgPool *pool) pool->last_connect_time = get_cached_time(); change_server_state(server, SV_LOGIN); + if (cf_log_connections) + slog_info(server, "new connection to server"); + /* start connecting */ - slog_info(server, "new connection to server"); sbuf_connect(&server->sbuf, &server->addr, cf_server_connect_timeout / USEC); } @@ -744,7 +760,8 @@ PgSocket * accept_client(int sock, client->addr.is_unix = is_unix; change_client_state(client, CL_LOGIN); - slog_debug(client, "got connection attempt"); + if (cf_log_connections) + slog_debug(client, "got connection attempt"); sbuf_accept(&client->sbuf, sock, is_unix); return client; diff --git a/src/objects.h b/src/objects.h index 317cf47..bf862fb 100644 --- a/src/objects.h +++ b/src/objects.h @@ -28,7 +28,7 @@ PgDatabase *find_database(const char *name); PgUser *find_user(const char *name); PgPool *get_pool(PgDatabase *, PgUser *); bool find_server(PgSocket *client); -void release_server(PgSocket *server); +bool release_server(PgSocket *server); bool finish_client_login(PgSocket *client); PgSocket * accept_client(int sock, const struct sockaddr_in *addr, bool is_unix); diff --git a/src/pktbuf.c b/src/pktbuf.c index ba65042..8ed635c 100644 --- a/src/pktbuf.c +++ b/src/pktbuf.c @@ -75,7 +75,7 @@ bool pktbuf_send_immidiate(PktBuf *buf, PgSocket *sk) return false; res = safe_send(fd, pos, amount, 0); if (res < 0) { - log_error("pktbuf_send_immidiate: %s", strerror(errno)); + log_debug("pktbuf_send_immidiate: %s", strerror(errno)); } return res == amount; } diff --git a/src/proto.c b/src/proto.c index aef48b0..fd5eef7 100644 --- a/src/proto.c +++ b/src/proto.c @@ -78,7 +78,8 @@ bool send_pooler_error(PgSocket *client, bool send_ready, const char *msg) uint8 tmpbuf[512]; PktBuf buf; - slog_error(client, "Pooler Error: %s", msg); + if (cf_log_pooler_errors) + slog_info(client, "Pooler Error: %s", msg); pktbuf_static(&buf, tmpbuf, sizeof(tmpbuf)); pktbuf_write_generic(&buf, 'E', "cscscsc", @@ -193,22 +194,20 @@ bool welcome_client(PgSocket *client) */ /* actual packet send */ -static void send_password(PgSocket *server, const char *enc_psw) +static bool send_password(PgSocket *server, const char *enc_psw) { bool res; SEND_PasswordMessage(res, server, enc_psw); - if (!res) - disconnect_server(server, true, - "partial send unhandled in send_password"); + return res; } -static void login_clear_psw(PgSocket *server) +static bool login_clear_psw(PgSocket *server) { log_debug("P: send clear password"); - send_password(server, server->pool->user->passwd); + return send_password(server, server->pool->user->passwd); } -static void login_crypt_psw(PgSocket *server, const uint8 *salt) +static bool login_crypt_psw(PgSocket *server, const uint8 *salt) { char saltbuf[3]; const char *enc; @@ -217,11 +216,11 @@ static void login_crypt_psw(PgSocket *server, const uint8 *salt) log_debug("P: send crypt password"); strncpy(saltbuf, (char *)salt, 2); enc = pg_crypt(user->passwd, saltbuf); - send_password(server, enc); + return send_password(server, enc); } -static void login_md5_psw(PgSocket *server, const uint8 *salt) +static bool login_md5_psw(PgSocket *server, const uint8 *salt) { char txt[MD5_PASSWD_LEN + 1], *src; PgUser *user = server->pool->user; @@ -234,7 +233,7 @@ static void login_md5_psw(PgSocket *server, const uint8 *salt) src = user->passwd + 3; pg_md5_encrypt(src, (char *)salt, 4, txt); - send_password(server, txt); + return send_password(server, txt); } /* answer server authentication request */ @@ -244,6 +243,7 @@ bool answer_authreq(PgSocket *server, { unsigned cmd; const uint8 *salt; + bool res = false; if (pkt_len < 5 + 4) return false; @@ -254,32 +254,37 @@ bool answer_authreq(PgSocket *server, switch (cmd) { case 0: log_debug("S: auth ok"); + res = true; break; case 3: log_debug("S: req cleartext password"); - login_clear_psw(server); + res = login_clear_psw(server); break; case 4: if (pkt_len < 5 + 4 + 2) return false; log_debug("S: req crypt psw"); salt = mbuf_get_bytes(pkt, 2); - login_crypt_psw(server, salt); + res = login_crypt_psw(server, salt); break; case 5: if (pkt_len < 5 + 4 + 4) return false; log_debug("S: req md5-crypted psw"); salt = mbuf_get_bytes(pkt, 4); - login_md5_psw(server, salt); + res = login_md5_psw(server, salt); break; case 2: /* kerberos */ case 6: /* scm something */ log_error("unsupported auth method: %d", cmd); + res = false; + break; default: log_error("unknown auth method: %d", cmd); + res = false; + break; } - return true; + return res; } bool send_startup_packet(PgSocket *server) diff --git a/src/sbuf.c b/src/sbuf.c index 74c319d..97ffe26 100644 --- a/src/sbuf.c +++ b/src/sbuf.c @@ -32,6 +32,20 @@ */ #define SMALL_PKT 16 +#define AssertSanity(sbuf) do { \ + Assert((sbuf)->send_pos >= 0); \ + Assert((sbuf)->send_pos <= (sbuf)->pkt_pos); \ + Assert((sbuf)->pkt_pos <= (sbuf)->recv_pos); \ + Assert((sbuf)->recv_pos <= cf_sbuf_len); \ + Assert((sbuf)->pkt_remain >= 0); \ + Assert((sbuf)->send_remain >= 0); \ +} while (0) + +#define AssertActive(sbuf) do { \ + Assert((sbuf)->sock > 0); \ + AssertSanity(sbuf); \ +} while (0) + /* declare static stuff */ static void sbuf_queue_send(SBuf *sbuf); static bool sbuf_send_pending(SBuf *sbuf); @@ -41,37 +55,11 @@ static void sbuf_recv_cb(int sock, short flags, void *arg); static void sbuf_send_cb(int sock, short flags, void *arg); static void sbuf_try_resync(SBuf *sbuf); static void sbuf_wait_for_data(SBuf *sbuf); +static bool sbuf_call_proto(SBuf *sbuf, int event); -/* - * Call proto callback with proper MBuf. - * - * If callback returns true it used one of sbuf_prepare_* on sbuf, - * and processing can continue. - * - * If it returned false it used sbuf_pause(), sbuf_close() or simply - * wants to wait for next event loop (eg. too few data available). - * Callee should not touch sbuf in that case and just return to libevent. - */ -static inline bool sbuf_call_proto(SBuf *sbuf, int event) -{ - MBuf mbuf; - uint8 *pos = sbuf->buf + sbuf->pkt_pos; - int avail = sbuf->recv_pos - sbuf->pkt_pos; - - Assert(avail >= 0); - Assert(pos + avail <= sbuf->buf + cf_sbuf_len); - Assert(event != SBUF_EV_READ || avail > 0); - - mbuf_init(&mbuf, pos, avail); - return sbuf->proto_handler(sbuf, event, &mbuf, sbuf->arg); -} - -/* lets wait for new data */ -static void sbuf_wait_for_data(SBuf *sbuf) -{ - event_set(&sbuf->ev, sbuf->sock, EV_READ | EV_PERSIST, sbuf_recv_cb, sbuf); - event_add(&sbuf->ev, NULL); -} +/********************************* + * Public functions + *********************************/ /* initialize SBuf with proto handler */ void sbuf_init(SBuf *sbuf, sbuf_proto_cb_t proto_fn, void *arg) @@ -84,9 +72,8 @@ void sbuf_init(SBuf *sbuf, sbuf_proto_cb_t proto_fn, void *arg) /* got new socket from accept() */ void sbuf_accept(SBuf *sbuf, int sock, bool is_unix) { - Assert(sbuf->pkt_pos == 0); - Assert(sbuf->recv_pos == 0); - Assert(sbuf->send_pos == 0); + Assert(sbuf->recv_pos == 0 && sbuf->sock == 0); + AssertSanity(sbuf); tune_socket(sock, is_unix); sbuf->sock = sock; @@ -111,6 +98,9 @@ void sbuf_connect(SBuf *sbuf, const PgAddr *addr, int timeout_sec) socklen_t len; struct timeval timeout; + Assert(sbuf->recv_pos == 0 && sbuf->sock == 0); + AssertSanity(sbuf); + /* prepare sockaddr */ if (addr->is_unix) { sa = (void*)&sa_un; @@ -171,6 +161,7 @@ void sbuf_connect(SBuf *sbuf, const PgAddr *addr, int timeout_sec) /* dont wait for data on this socket */ void sbuf_pause(SBuf *sbuf) { + AssertActive(sbuf); Assert(sbuf->wait_send == 0); event_del(&sbuf->ev); @@ -179,6 +170,8 @@ void sbuf_pause(SBuf *sbuf) /* resume from pause, start waiting for data */ void sbuf_continue(SBuf *sbuf) { + AssertActive(sbuf); + sbuf_wait_for_data(sbuf); /* there is some data already received */ @@ -193,6 +186,8 @@ void sbuf_continue(SBuf *sbuf) */ void sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb user_cb) { + AssertActive(sbuf); + event_set(&sbuf->ev, sbuf->sock, EV_READ | EV_PERSIST, user_cb, sbuf->arg); event_add(&sbuf->ev, NULL); @@ -214,8 +209,9 @@ void sbuf_close(SBuf *sbuf) } /* proto_fn tells to send some bytes to socket */ -void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, unsigned amount, bool flush) +void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, int amount, bool flush) { + AssertActive(sbuf); Assert(sbuf->pkt_remain == 0); Assert(sbuf->pkt_skip == 0 || sbuf->send_remain == 0); Assert(!sbuf->pkt_flush || sbuf->send_remain == 0); @@ -230,6 +226,7 @@ void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, unsigned amount, bool flush) /* proto_fn tells to skip sone amount of bytes */ void sbuf_prepare_skip(SBuf *sbuf, int amount) { + AssertActive(sbuf); Assert(sbuf->pkt_remain == 0); Assert(sbuf->pkt_skip == 0 || sbuf->send_remain == 0); Assert(!sbuf->pkt_flush || sbuf->send_remain == 0); @@ -241,6 +238,47 @@ void sbuf_prepare_skip(SBuf *sbuf, int amount) sbuf->dst = NULL; } +/************************* + * Internal functions + *************************/ + +/* + * Call proto callback with proper MBuf. + * + * If callback returns true it used one of sbuf_prepare_* on sbuf, + * and processing can continue. + * + * If it returned false it used sbuf_pause(), sbuf_close() or simply + * wants to wait for next event loop (eg. too few data available). + * Callee should not touch sbuf in that case and just return to libevent. + */ +static bool sbuf_call_proto(SBuf *sbuf, int event) +{ + MBuf mbuf; + uint8 *pos = sbuf->buf + sbuf->pkt_pos; + int avail = sbuf->recv_pos - sbuf->pkt_pos; + bool res; + + AssertSanity(sbuf); + Assert(event != SBUF_EV_READ || avail > 0); + + mbuf_init(&mbuf, pos, avail); + res = sbuf->proto_handler(sbuf, event, &mbuf, sbuf->arg); + + AssertSanity(sbuf); + if (event == SBUF_EV_READ && res) + Assert(sbuf->sock > 0); + + return res; +} + +/* lets wait for new data */ +static void sbuf_wait_for_data(SBuf *sbuf) +{ + event_set(&sbuf->ev, sbuf->sock, EV_READ | EV_PERSIST, sbuf_recv_cb, sbuf); + event_add(&sbuf->ev, NULL); +} + /* libevent EV_WRITE: called when dest socket is writable again */ static void sbuf_send_cb(int sock, short flags, void *arg) { @@ -250,6 +288,8 @@ static void sbuf_send_cb(int sock, short flags, void *arg) if (!sbuf->sock) return; + AssertSanity(sbuf); + /* prepare normal situation for sbuf_recv_cb() */ sbuf->wait_send = 0; sbuf_wait_for_data(sbuf); @@ -260,6 +300,8 @@ static void sbuf_send_cb(int sock, short flags, void *arg) /* socket is full, wait until its writable again */ static void sbuf_queue_send(SBuf *sbuf) { + AssertActive(sbuf); + sbuf->wait_send = 1; event_del(&sbuf->ev); event_set(&sbuf->ev, sbuf->dst->sock, EV_WRITE, sbuf_send_cb, sbuf); @@ -276,6 +318,7 @@ static bool sbuf_send_pending(SBuf *sbuf) int res, avail; uint8 *pos; + AssertActive(sbuf); Assert(sbuf->dst || !sbuf->send_remain); try_more: @@ -284,7 +327,7 @@ try_more: if (avail > sbuf->send_remain) avail = sbuf->send_remain; if (avail == 0) - return true; + goto all_sent; if (sbuf->dst->sock == 0) { log_error("sbuf_send_pending: no dst sock?"); @@ -294,26 +337,34 @@ try_more: /* actually send it */ pos = sbuf->buf + sbuf->send_pos; res = safe_send(sbuf->dst->sock, pos, avail, 0); - if (res >= 0) { - sbuf->send_remain -= res; - sbuf->send_pos += res; - - if (res < avail) { - /* - * Should do sbuf_queue_send() immidiately? - * - * To be sure, lets run into EAGAIN. - */ - goto try_more; - } - return true; - } else if (errno == EAGAIN) { - sbuf_queue_send(sbuf); - return false; - } else { - sbuf_call_proto(sbuf, SBUF_EV_SEND_FAILED); + if (res < 0) { + if (errno == EAGAIN) + sbuf_queue_send(sbuf); + else + sbuf_call_proto(sbuf, SBUF_EV_SEND_FAILED); return false; } + + sbuf->send_remain -= res; + sbuf->send_pos += res; + + AssertActive(sbuf); + + /* + * Should do sbuf_queue_send() immidiately? + * + * To be sure, lets run into EAGAIN. + */ + if (res < avail) + goto try_more; + +all_sent: + + /* send_pos may lag pkt_pos in case of skip packets, move it here */ + if (sbuf->send_remain == 0 && sbuf->send_pos < sbuf->pkt_pos) + sbuf->send_pos = sbuf->pkt_pos; + + return true; } /* process as much data as possible */ @@ -324,7 +375,7 @@ static bool sbuf_process_pending(SBuf *sbuf) bool res; while (1) { - Assert(sbuf->recv_pos >= sbuf->pkt_pos); + AssertActive(sbuf); /* * Enough for now? @@ -373,28 +424,20 @@ static void sbuf_try_resync(SBuf *sbuf) { int avail; - if (sbuf->pkt_pos == 0) + AssertActive(sbuf); + + if (sbuf->send_pos == 0) return; - if (sbuf->send_remain > 0) - avail = sbuf->recv_pos - sbuf->send_pos; - else - avail = sbuf->recv_pos - sbuf->pkt_pos; + avail = sbuf->recv_pos - sbuf->send_pos; if (avail == 0) { sbuf->recv_pos = sbuf->pkt_pos = sbuf->send_pos = 0; } else if (avail <= SMALL_PKT) { - if (sbuf->send_remain > 0) { - memmove(sbuf->buf, sbuf->buf + sbuf->send_pos, avail); - sbuf->pkt_pos -= sbuf->send_pos; - sbuf->send_pos = 0; - sbuf->recv_pos = avail; - } else { - memmove(sbuf->buf, sbuf->buf + sbuf->pkt_pos, avail); - sbuf->send_pos = 0; - sbuf->pkt_pos = 0; - sbuf->recv_pos = avail; - } + memmove(sbuf->buf, sbuf->buf + sbuf->send_pos, avail); + sbuf->pkt_pos -= sbuf->send_pos; + sbuf->send_pos = 0; + sbuf->recv_pos = avail; } } @@ -404,6 +447,10 @@ static bool sbuf_actual_recv(SBuf *sbuf, int len) int got; uint8 *pos; + AssertActive(sbuf); + Assert(len > 0); + Assert(sbuf->recv_pos + len <= cf_sbuf_len); + pos = sbuf->buf + sbuf->recv_pos; got = safe_recv(sbuf->sock, pos, len, 0); @@ -438,6 +485,7 @@ static void sbuf_recv_cb(int sock, short flags, void *arg) /* reading should be disabled when waiting */ Assert(sbuf->wait_send == 0); + AssertSanity(sbuf); try_more: /* make room in buffer */ @@ -481,12 +529,12 @@ static bool sbuf_after_connect_check(SBuf *sbuf) err = getsockopt(sbuf->sock, SOL_SOCKET, SO_ERROR, (void*)&optval, &optlen); if (err < 0) { log_error("sbuf_after_connect_check: getsockopt: %s", - strerror(errno)); + strerror(errno)); return false; } if (optval != 0) { log_error("sbuf_after_connect_check: pending error: %s", - strerror(optval)); + strerror(optval)); return false; } return true; @@ -517,9 +565,9 @@ bool sbuf_answer(SBuf *sbuf, const void *buf, int len) return false; res = safe_send(sbuf->sock, buf, len, 0); if (res < 0) - log_error("sbuf_answer: error sending: %s", strerror(errno)); + log_debug("sbuf_answer: error sending: %s", strerror(errno)); else if (res != len) - log_error("sbuf_answer: partial send: len=%d sent=%d", len, res); + log_debug("sbuf_answer: partial send: len=%d sent=%d", len, res); return res == len; } diff --git a/src/sbuf.h b/src/sbuf.h index bb39382..ea731ce 100644 --- a/src/sbuf.h +++ b/src/sbuf.h @@ -52,11 +52,11 @@ struct SBuf { /* dest SBuf for current packet */ SBuf *dst; - unsigned recv_pos; - unsigned pkt_pos; - unsigned pkt_remain; - unsigned send_pos; - unsigned send_remain; + int recv_pos; + int pkt_pos; + int pkt_remain; + int send_pos; + int send_remain; unsigned wait_send:1; unsigned pkt_skip:1; @@ -77,7 +77,7 @@ void sbuf_continue(SBuf *sbuf); void sbuf_close(SBuf *sbuf); /* proto_fn can use those functions to order behaviour */ -void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, unsigned amount, bool flush); +void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, int amount, bool flush); void sbuf_prepare_skip(SBuf *sbuf, int amount); bool sbuf_answer(SBuf *sbuf, const void *buf, int len); diff --git a/src/server.c b/src/server.c index 9f4ee34..8bae665 100644 --- a/src/server.c +++ b/src/server.c @@ -56,6 +56,8 @@ static bool handle_server_startup(PgSocket *server, MBuf *pkt) case 'R': /* AuthenticationXXX */ log_debug("calling login_answer"); res = answer_authreq(server, pkt_type, pkt_len, pkt); + if (!res) + disconnect_server(server, false, "failed to answer authreq"); break; case 'S': /* ParameterStatus */ res = add_welcome_parameter(server, pkt_type, pkt_len, pkt); @@ -65,13 +67,15 @@ static bool handle_server_startup(PgSocket *server, MBuf *pkt) log_debug("server login ok, start accepting queries"); server->ready = 1; + /* got all params */ finish_welcome_msg(server); - release_server(server); + + // FIXME: res check + res = release_server(server); /* let the takeover process handle it */ - if (server->pool->admin) - takeover_login(server); - res = true; + if (res && server->pool->admin) + res = takeover_login(server); break; /* ignorable packets */ @@ -282,6 +286,7 @@ bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *pkt, void *arg) switch (server->state) { case SV_ACTIVE: case SV_TESTED: + /* retval does not matter here */ release_server(server); break; default: diff --git a/src/system.h b/src/system.h index 8a813ba..35123d1 100644 --- a/src/system.h +++ b/src/system.h @@ -57,7 +57,8 @@ #endif #ifdef CASSERT -#define Assert(e) do { if (!(e)) fatal("Assert(%s) failed", #e); } while (0) +#define Assert(e) do { if (!(e)) { \ + fatal_noexit("Assert(%s) failed", #e); abort(); } } while (0) #else #define Assert(e) #endif diff --git a/src/takeover.c b/src/takeover.c index 94bb1b3..c8ca0d3 100644 --- a/src/takeover.c +++ b/src/takeover.c @@ -36,6 +36,7 @@ static void takeover_finish(PgSocket *bouncer) disconnect_server(bouncer, false, "disko over"); cf_reboot = 0; resume_all(); + log_info("disko over, resuming work"); } /* parse msg for fd and info */ @@ -257,16 +258,20 @@ static void takeover_recv_cb(int sock, short flags, void *arg) * login finished, send first command, * replace recv callback with custom recvmsg() based one. */ -void takeover_login(PgSocket *bouncer) +bool takeover_login(PgSocket *bouncer) { bool res; slog_info(bouncer, "Login OK, sending SUSPEND"); SEND_generic(res, bouncer, 'Q', "s", "SUSPEND;"); - - /* use own callback */ - sbuf_pause(&bouncer->sbuf); - sbuf_continue_with_callback(&bouncer->sbuf, takeover_recv_cb); + if (res) { + /* use own callback */ + sbuf_pause(&bouncer->sbuf); + sbuf_continue_with_callback(&bouncer->sbuf, takeover_recv_cb); + } else { + disconnect_server(bouncer, false, "failed to send command"); + } + return res; } /* launch connection to running process */ diff --git a/src/takeover.h b/src/takeover.h index 08ca095..9011c8d 100644 --- a/src/takeover.h +++ b/src/takeover.h @@ -17,5 +17,5 @@ */ void takeover_init(void); -void takeover_login(PgSocket *bouncer); +bool takeover_login(PgSocket *bouncer); diff --git a/src/util.c b/src/util.c index a7a2637..65f06a0 100644 --- a/src/util.c +++ b/src/util.c @@ -104,7 +104,7 @@ static void _log(const char *pfx, const char *fmt, va_list ap) } void _fatal(const char *file, int line, const char *func, - const char *fmt, ...) + bool do_exit, const char *fmt, ...) { va_list ap; char buf[1024]; @@ -116,9 +116,8 @@ void _fatal(const char *file, int line, const char *func, va_start(ap, fmt); _log("FATAL", buf, ap); va_end(ap); - if (cf_verbose > 2) - abort(); - exit(1); + if (do_exit) + exit(1); } void _fatal_perror(const char *file, int line, const char *func, @@ -129,7 +128,7 @@ void _fatal_perror(const char *file, int line, const char *func, va_start(ap, fmt); vsnprintf(buf, sizeof(buf), fmt, ap); va_end(ap); - _fatal(file, line, func, "%s: %s", buf, strerror(errno)); + _fatal(file, line, func, false, "%s: %s", buf, strerror(errno)); } /* diff --git a/src/util.h b/src/util.h index 89ddd45..fee4d66 100644 --- a/src/util.h +++ b/src/util.h @@ -62,10 +62,12 @@ void slog_level(const char *level, const PgSocket *sock, const char *fmt, ...); /* * log and exit */ -void _fatal(const char *file, int line, const char *func, const char *s, ...); +void _fatal(const char *file, int line, const char *func, bool do_exit, const char *s, ...); void _fatal_perror(const char *file, int line, const char *func, const char *s, ...); #define fatal(args...) \ - _fatal(__FILE__, __LINE__, __FUNCTION__, ## args) + _fatal(__FILE__, __LINE__, __FUNCTION__, true, ## args) +#define fatal_noexit(args...) \ + _fatal(__FILE__, __LINE__, __FUNCTION__, false, ## args) #define fatal_perror(args...) \ _fatal_perror(__FILE__, __LINE__, __FUNCTION__, ## args) -- 2.40.0