From: Marko Kreen Date: Wed, 9 Jan 2008 09:05:44 +0000 (+0000) Subject: move shutdown after fork(), detect old bouncer X-Git-Tag: pgbouncer_1_2_rc2~75 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=02533a7e36cff5aed939ac4089f0f133c0019e37;p=pgbouncer move shutdown after fork(), detect old bouncer --- diff --git a/include/takeover.h b/include/takeover.h index f0bb146..bcd442c 100644 --- a/include/takeover.h +++ b/include/takeover.h @@ -18,4 +18,6 @@ void takeover_init(void); bool takeover_login(PgSocket *bouncer) _MUSTCHECK; +void takeover_login_failed(void); +void takeover_finish(void); diff --git a/src/main.c b/src/main.c index 26e680d..a896bd0 100644 --- a/src/main.c +++ b/src/main.c @@ -459,13 +459,29 @@ static void check_limits(void) (int)lim.rlim_cur, (int)lim.rlim_max, cf_max_client_conn, fd_count); } -static void daemon_setup(void) +static bool check_old_process_unix(void) { - if (!cf_reboot) - check_pidfile(); - if (cf_daemon) - go_daemon(); - write_pidfile(); + struct sockaddr_un sa_un; + socklen_t len = sizeof(sa_un); + int domain = AF_UNIX; + int res, fd; + + if (!*cf_unix_socket_dir) + return false; + + memset(&sa_un, 0, len); + sa_un.sun_family = domain; + snprintf(sa_un.sun_path, sizeof(sa_un.sun_path), + "%s/.s.PGSQL.%d", cf_unix_socket_dir, cf_listen_port); + + fd = socket(domain, SOCK_STREAM, 0); + if (fd < 0) + fatal_perror("cannot create socket"); + res = safe_connect(fd, (struct sockaddr *)&sa_un, len); + safe_close(fd); + if (res < 0) + return false; + return true; } static void main_loop_once(void) @@ -484,11 +500,25 @@ static void main_loop_once(void) rescue_timers(); } +static void takeover_part1(void) +{ + /* use temporary libevent base */ + void *evtmp = event_init(); + + if (!*cf_unix_socket_dir) + fatal("cannot reboot if unix dir not configured"); + + takeover_init(); + while (cf_reboot) + main_loop_once(); + event_base_free(evtmp); +} + /* boot everything */ int main(int argc, char *argv[]) { int c; - int did_takeover = 0; + bool did_takeover = false; /* parse cmdline */ while ((c = getopt(argc, argv, "avhdVR")) != EOF) { @@ -529,26 +559,34 @@ int main(int argc, char *argv[]) srandom(time(NULL) ^ getpid()); if (cf_reboot) { - /* use temporary libevent base */ - void *evtmp = event_init(); - takeover_init(); - while (cf_reboot) - main_loop_once(); - did_takeover = 1; - event_base_free(evtmp); + if (check_old_process_unix()) { + takeover_part1(); + did_takeover = true; + } else { + log_info("old process not found, try to continue normally"); + cf_reboot = 0; + check_pidfile(); + } + } else { + check_pidfile(); + if (check_old_process_unix()) + fatal("somebody is listening on unix socket"); } /* initialize subsystems, order important */ - daemon_setup(); + if (cf_daemon) + go_daemon(); event_init(); signal_setup(); janitor_setup(); stats_setup(); - if (!did_takeover) - pooler_setup(); + if (did_takeover) + takeover_finish(); else - resume_all(); + pooler_setup(); + + write_pidfile(); /* main loop */ while (1) diff --git a/src/server.c b/src/server.c index ef1b050..a7f6df2 100644 --- a/src/server.c +++ b/src/server.c @@ -274,6 +274,7 @@ bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data, void *arg) { bool res = false; PgSocket *server = arg; + PgPool *pool = server->pool; PktHdr pkt; Assert(is_server_socket(server)); @@ -295,13 +296,13 @@ bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data, void *arg) case SBUF_EV_READ: if (mbuf_avail(data) < NEW_HEADER_LEN) { slog_noise(server, "S: got partial header, trying to wait a bit"); - return false; + break; } /* parse pkt header */ if (!get_header(data, &pkt)) { disconnect_server(server, true, "bad pkt header"); - return false; + break; } slog_noise(server, "S: pkt '%c', len=%d", pkt_desc(&pkt), pkt.len); @@ -331,7 +332,7 @@ bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data, void *arg) res = handle_connect(server); break; case SBUF_EV_FLUSH: - res = true; /* unused actually */ + res = true; if (!server->ready) break; @@ -362,6 +363,8 @@ bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data, void *arg) slog_warning(server, "SBUF_EV_PKT_CALLBACK with state=%d", server->state); break; } + if (!res && pool->admin) + takeover_login_failed(); return res; } diff --git a/src/takeover.c b/src/takeover.c index e2b3458..f11b49b 100644 --- a/src/takeover.c +++ b/src/takeover.c @@ -31,11 +31,46 @@ * Takeover done, old process shut down, * kick this one running. */ -static void takeover_finish(PgSocket *bouncer) + +static PgSocket *old_bouncer = NULL; + +void takeover_finish(void) +{ + uint8_t buf[512]; + int fd = sbuf_socket(&old_bouncer->sbuf); + bool res; + int got; + + log_info("sending SHUTDOWN;"); + socket_set_nonblocking(fd, 0); + SEND_generic(res, old_bouncer, 'Q', "s", "SHUTDOWN;"); + if (!res) + fatal("failed to send SHUTDOWN;"); + + while (1) { + got = safe_recv(fd, buf, sizeof(buf), 0); + if (got == 0) + break; + if (got < 0) + fatal_perror("sky is falling - error while waiting result from SHUTDOWN"); + } + + disconnect_server(old_bouncer, false, "disko over"); + old_bouncer = NULL; + + log_info("old process killed, resuming work"); + resume_all(); +} + +static void takeover_finish_part1(PgSocket *bouncer) { - disconnect_server(bouncer, false, "disko over"); + Assert(old_bouncer == NULL); + + /* unregister bouncer from libevent */ + sbuf_pause(&bouncer->sbuf); + old_bouncer = bouncer; cf_reboot = 0; - log_info("disko over, resuming work"); + log_info("disko over, going background"); } /* parse msg for fd and info */ @@ -164,14 +199,11 @@ static void next_command(PgSocket *bouncer, MBuf *pkt) log_info("SUSPEND finished, sending SHOW FDS"); SEND_generic(res, bouncer, 'Q', "s", "SHOW FDS;"); } else if (strncmp(cmd, "SHOW", 4) == 0) { - - log_info("SHOW FDS finished, sending SHUTDOWN"); - /* all fds loaded, review them */ takeover_postprocess_fds(); + log_info("SHOW FDS finished"); - /* all OK, kill old one */ - SEND_generic(res, bouncer, 'Q', "s", "SHUTDOWN;"); + takeover_finish_part1(bouncer); } else fatal("got bad CMD from old bouncer: %s", cmd); @@ -228,7 +260,7 @@ static void takeover_parse_data(PgSocket *bouncer, /* * listen for data from old bouncer. * - * use always sendmsg, to keep code simpler + * use always recvmsg, to keep code simpler */ static void takeover_recv_cb(int sock, short flags, void *arg) { @@ -253,7 +285,7 @@ static void takeover_recv_cb(int sock, short flags, void *arg) mbuf_init(&data, data_buf, res); takeover_parse_data(bouncer, &msg, &data); } else if (res == 0) { - takeover_finish(bouncer); + fatal("unexpected EOF"); } else { if (errno == EAGAIN) return; @@ -296,3 +328,8 @@ void takeover_init(void) launch_new_connection(pool); } +void takeover_login_failed(void) +{ + fatal("login failed"); +} +