From 3aa6978bd0ddd0fc8cb9d9bdbbfe7af6647f2032 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Fri, 11 Jan 2008 13:22:49 +0000 Subject: [PATCH] make failure from event_del() non-fatal - pgsocket: keep open sbufs in justfree lists, retry close later - pooler: keep track socket states, retry in per-loop main function --- include/objects.h | 1 - include/pooler.h | 1 + include/sbuf.h | 9 ++++-- src/janitor.c | 27 +++++++++--------- src/main.c | 1 + src/objects.c | 42 +++++++++++++++++---------- src/pooler.c | 73 +++++++++++++++++++++++++++++++++-------------- src/sbuf.c | 27 ++++++++++++------ src/takeover.c | 6 ++-- 9 files changed, 125 insertions(+), 62 deletions(-) diff --git a/include/objects.h b/include/objects.h index 2bf3496..3e241cb 100644 --- a/include/objects.h +++ b/include/objects.h @@ -51,7 +51,6 @@ bool use_server_socket(int fd, PgAddr *addr, const char *dbname, const char *use const char *client_end, const char *std_string, const char *datestyle, const char *timezone) _MUSTCHECK; -void pause_client(PgSocket *client); void activate_client(PgSocket *client); void change_client_state(PgSocket *client, SocketState newstate); diff --git a/include/pooler.h b/include/pooler.h index b43e47b..5775d22 100644 --- a/include/pooler.h +++ b/include/pooler.h @@ -21,4 +21,5 @@ bool use_pooler_socket(int fd, bool is_unix) _MUSTCHECK; void resume_pooler(void); void suspend_pooler(void); void get_pooler_fds(int *p_net, int *p_unix); +void per_loop_pooler_maint(void); diff --git a/include/sbuf.h b/include/sbuf.h index b21e5ba..871a900 100644 --- a/include/sbuf.h +++ b/include/sbuf.h @@ -96,9 +96,9 @@ void sbuf_init(SBuf *sbuf, sbuf_cb_t proto_fn, void *arg); bool sbuf_accept(SBuf *sbuf, int read_sock, bool is_unix) _MUSTCHECK; bool sbuf_connect(SBuf *sbuf, const PgAddr *addr, const char *unix_dir, int timeout_sec) _MUSTCHECK; -void sbuf_pause(SBuf *sbuf); +bool sbuf_pause(SBuf *sbuf) _MUSTCHECK; void sbuf_continue(SBuf *sbuf); -void sbuf_close(SBuf *sbuf); +bool sbuf_close(SBuf *sbuf) _MUSTCHECK; /* proto_fn can use those functions to order behaviour */ void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, int amount); @@ -119,6 +119,11 @@ static inline bool sbuf_is_empty(SBuf *sbuf) && sbuf->pkt_remain == 0; } +static inline bool sbuf_is_closed(SBuf *sbuf) +{ + return sbuf->sock == 0; +} + bool sbuf_rewrite_header(SBuf *sbuf, int old_len, const uint8_t *new_hdr, int new_len) _MUSTCHECK; diff --git a/src/janitor.c b/src/janitor.c index f6685b4..4c4d100 100644 --- a/src/janitor.c +++ b/src/janitor.c @@ -51,21 +51,22 @@ static void close_client_list(StatList *sk_list, const char *reason) bool suspend_socket(PgSocket *sk, bool force_suspend) { - bool done = true; - if (!sk->suspended) { - if (sbuf_is_empty(&sk->sbuf)) { - sbuf_pause(&sk->sbuf); + if (sk->suspended) + return true; + + if (sbuf_is_empty(&sk->sbuf)) { + if (sbuf_pause(&sk->sbuf)) sk->suspended = 1; - } else - done = false; - } - if (!done && force_suspend) { - if (is_server_socket(sk)) - disconnect_server(sk, true, "suspend_timeout"); - else - disconnect_client(sk, true, "suspend_timeout"); } - return done; + + if (sk->suspended || !force_suspend) + return sk->suspended; + + if (is_server_socket(sk)) + disconnect_server(sk, true, "suspend_timeout"); + else + disconnect_client(sk, true, "suspend_timeout"); + return true; } /* suspend all sockets in socket list */ diff --git a/src/main.c b/src/main.c index 1239033..9d1c3f7 100644 --- a/src/main.c +++ b/src/main.c @@ -502,6 +502,7 @@ static void main_loop_once(void) per_loop_maint(); reuse_just_freed_objects(); rescue_timers(); + per_loop_pooler_maint(); } static void takeover_part1(void) diff --git a/src/objects.c b/src/objects.c index f29c376..e136110 100644 --- a/src/objects.c +++ b/src/objects.c @@ -425,13 +425,14 @@ PgPool *get_pool(PgDatabase *db, PgUser *user) } /* deactivate socket and put into wait queue */ -void pause_client(PgSocket *client) +static void pause_client(PgSocket *client) { Assert(client->state == CL_ACTIVE); slog_debug(client, "pause_client"); change_client_state(client, CL_WAITING); - sbuf_pause(&client->sbuf); + if (!sbuf_pause(&client->sbuf)) + disconnect_client(client, true, "pause failed"); } /* wake client from wait */ @@ -485,15 +486,15 @@ bool find_server(PgSocket *client) server->link = client; change_server_state(server, SV_ACTIVE); if (varchange) { - sbuf_pause(&client->sbuf); - res = false; /* don't process client data yet */ server->setting_vars = 1; server->ready = 0; + res = false; /* don't process client data yet */ + if (!sbuf_pause(&client->sbuf)) + disconnect_client(client, true, "pause failed"); } else res = true; } else { pause_client(client); - Assert(client->state == CL_WAITING); res = false; } return res; @@ -628,9 +629,10 @@ void disconnect_server(PgSocket *server, bool notify, const char *reason) /* ignore result */ notify = false; } - sbuf_close(&server->sbuf); change_server_state(server, SV_JUSTFREE); + if (!sbuf_close(&server->sbuf)) + log_noise("sbuf_close failed, retry later"); } /* drop client connection */ @@ -673,9 +675,9 @@ void disconnect_client(PgSocket *client, bool notify, const char *reason) send_pooler_error(client, false, reason); } - sbuf_close(&client->sbuf); - change_client_state(client, CL_JUSTFREE); + if (!sbuf_close(&client->sbuf)) + log_noise("sbuf_close failed, retry later"); } /* the pool needs new connection, if possible */ @@ -846,8 +848,9 @@ found: return; } - /* drop the connection silently */ - sbuf_close(&req->sbuf); + /* drop the connection, if fails, retry later in justfree list */ + if (!sbuf_close(&req->sbuf)) + log_noise("sbuf_close failed, retry later"); /* remember server key */ server = main_client->link; @@ -1014,19 +1017,28 @@ void reuse_just_freed_objects(void) { List *tmp, *item; PgSocket *sk; + bool close_works = true; /* - * Obviously, if state would be set to *_FREE, - * they could be moved in one go. + * event_del() may fail because of ENOMEM for event handlers + * that need only changes sent to kernel on each loop. + * + * Keep open sbufs in justfree lists until successful. */ + statlist_for_each_safe(item, &justfree_client_list, tmp) { sk = container_of(item, PgSocket, head); - change_client_state(sk, CL_FREE); + if (sbuf_is_closed(&sk->sbuf)) + change_client_state(sk, CL_FREE); + else if (close_works) + close_works = sbuf_close(&sk->sbuf); } statlist_for_each_safe(item, &justfree_server_list, tmp) { sk = container_of(item, PgSocket, head); - change_server_state(sk, SV_FREE); + if (sbuf_is_closed(&sk->sbuf)) + change_server_state(sk, SV_FREE); + else if (close_works) + close_works = sbuf_close(&sk->sbuf); } } - diff --git a/src/pooler.c b/src/pooler.c index fcbc659..c37033d 100644 --- a/src/pooler.c +++ b/src/pooler.c @@ -24,19 +24,30 @@ static int fd_net = 0; static int fd_unix = 0; + static struct event ev_net; static struct event ev_unix; -static int suspended = 0; + +/* if sockets are registered in libevent */ +static bool reg_net = false; +static bool reg_unix = false; + +/* should listening sockets be active or suspended? */ +static bool pooler_active = false; /* on accept() failure sleep 5 seconds */ static struct event ev_err; static struct timeval err_timeout = {5, 0}; +/* atexit() cleanup func */ static void cleanup_unix_socket(void) { char fn[256]; - if (!cf_unix_socket_dir || suspended) + + /* avoid cleanup if exit() while suspended */ + if (!reg_unix) return; + snprintf(fn, sizeof(fn), "%s/.s.PGSQL.%d", cf_unix_socket_dir, cf_listen_port); unlink(fn); @@ -166,7 +177,8 @@ static int create_net_socket(const char *listen_addr, int listen_port) static void err_wait_func(int sock, short flags, void *arg) { - resume_pooler(); + if (cf_pause_mode != P_SUSPEND) + resume_pooler(); } /* got new connection, associate it with client struct */ @@ -244,36 +256,43 @@ bool use_pooler_socket(int sock, bool is_unix) void suspend_pooler(void) { - suspended = 1; + pooler_active = false; - if (fd_net) { - if (event_del(&ev_net) < 0) - /* fixme */ - fatal_perror("event_del(ev_net)"); + if (fd_net && reg_net) { + if (event_del(&ev_net) < 0) { + log_warning("suspend_pooler, event_del: %s", strerror(errno)); + return; + } + reg_net = false; } - if (fd_unix) { - if (event_del(&ev_unix) < 0) - /* fixme */ - fatal_perror("event_del(ev_unix)"); + if (fd_unix && reg_unix) { + if (event_del(&ev_unix) < 0) { + log_warning("suspend_pooler, event_del: %s", strerror(errno)); + return; + } + reg_unix = false; } } void resume_pooler(void) { - suspended = 0; + pooler_active = true; - if (fd_unix) { + if (fd_unix && !reg_unix) { event_set(&ev_unix, fd_unix, EV_READ | EV_PERSIST, pool_accept, "1"); - if (event_add(&ev_unix, NULL) < 0) - /* fixme: less serious approach? */ - fatal_perror("event_add(ev_unix)"); + if (event_add(&ev_unix, NULL) < 0) { + log_warning("event_add failed: %s", strerror(errno)); + return; + } + reg_unix = true; } - if (fd_net) { + if (fd_net && !reg_net) { event_set(&ev_net, fd_net, EV_READ | EV_PERSIST, pool_accept, NULL); - if (event_add(&ev_net, NULL) < 0) - /* fixme: less serious approach? */ - fatal_perror("event_add(ev_net)"); + if (event_add(&ev_net, NULL) < 0) { + log_warning("event_add failed: %s", strerror(errno)); + } + reg_net = true; } } @@ -292,3 +311,15 @@ void pooler_setup(void) resume_pooler(); } +/* retry previously failed suspend_pooler() / resume_pooler() */ +void per_loop_pooler_maint(void) +{ + if (pooler_active) { + if ((fd_unix && !reg_unix) || (fd_net && !reg_net)) + resume_pooler(); + } else { + if ((fd_unix && reg_unix) || (fd_net && reg_net)) + suspend_pooler(); + } +} + diff --git a/src/sbuf.c b/src/sbuf.c index b7a598e..35c72b9 100644 --- a/src/sbuf.c +++ b/src/sbuf.c @@ -177,14 +177,16 @@ failed: } /* don't wait for data on this socket */ -void sbuf_pause(SBuf *sbuf) +bool sbuf_pause(SBuf *sbuf) { AssertActive(sbuf); Assert(sbuf->wait_send == 0); - if (event_del(&sbuf->ev) < 0) - /* fixme */ - fatal_perror("event_del"); + if (event_del(&sbuf->ev) < 0) { + log_warning("event_del: %s", strerror(errno)); + return false; + } + return true; } /* resume from pause, start waiting for data */ @@ -239,12 +241,14 @@ bool sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb user_cb) } /* socket cleanup & close */ -void sbuf_close(SBuf *sbuf) +bool sbuf_close(SBuf *sbuf) { /* keep handler & arg values */ if (sbuf->sock > 0) { - if (event_del(&sbuf->ev) < 0) - fatal_perror("event_del"); + if (event_del(&sbuf->ev) < 0) { + log_warning("event_del: %s", strerror(errno)); + return false; + } safe_close(sbuf->sock); } sbuf->dst = NULL; @@ -252,6 +256,7 @@ void sbuf_close(SBuf *sbuf) sbuf->pkt_pos = sbuf->pkt_remain = sbuf->recv_pos = 0; sbuf->pkt_action = sbuf->wait_send = 0; sbuf->send_pos = sbuf->send_remain = 0; + return true; } /* proto_fn tells to send some bytes to socket */ @@ -376,18 +381,24 @@ static bool sbuf_queue_send(SBuf *sbuf) int err; AssertActive(sbuf); - sbuf->wait_send = 1; + /* if false is returned, the socket will be closed later */ + + /* stop waiting for read events */ err = event_del(&sbuf->ev); if (err < 0) { log_warning("sbuf_queue_send: event_del failed: %s", strerror(errno)); return false; } + + /* instead wait for EV_WRITE on destination socket */ event_set(&sbuf->ev, sbuf->dst->sock, EV_WRITE, sbuf_send_cb, sbuf); err = event_add(&sbuf->ev, NULL); if (err < 0) { log_warning("sbuf_queue_send: event_add failed: %s", strerror(errno)); return false; } + + sbuf->wait_send = 1; return true; } diff --git a/src/takeover.c b/src/takeover.c index f11b49b..ea7cad0 100644 --- a/src/takeover.c +++ b/src/takeover.c @@ -67,7 +67,8 @@ static void takeover_finish_part1(PgSocket *bouncer) Assert(old_bouncer == NULL); /* unregister bouncer from libevent */ - sbuf_pause(&bouncer->sbuf); + if (!sbuf_pause(&bouncer->sbuf)) + fatal_perror("sbuf_pause failed"); old_bouncer = bouncer; cf_reboot = 0; log_info("disko over, going background"); @@ -305,7 +306,8 @@ bool takeover_login(PgSocket *bouncer) SEND_generic(res, bouncer, 'Q', "s", "SUSPEND;"); if (res) { /* use own callback */ - sbuf_pause(&bouncer->sbuf); + if (!sbuf_pause(&bouncer->sbuf)) + fatal("sbuf_pause failed"); res = sbuf_continue_with_callback(&bouncer->sbuf, takeover_recv_cb); if (!res) fatal("takeover_login: sbuf_continue_with_callback failed"); -- 2.40.0