]> granicus.if.org Git - pgbouncer/commitdiff
move shutdown after fork(), detect old bouncer
authorMarko Kreen <markokr@gmail.com>
Wed, 9 Jan 2008 09:05:44 +0000 (09:05 +0000)
committerMarko Kreen <markokr@gmail.com>
Wed, 9 Jan 2008 09:05:44 +0000 (09:05 +0000)
include/takeover.h
src/main.c
src/server.c
src/takeover.c

index f0bb1462f6050d14b4d0a87617c2d70f1f8195b1..bcd442c6f38a1a2f89743e394d56929d2aaa5d1c 100644 (file)
@@ -18,4 +18,6 @@
 
 void takeover_init(void);
 bool takeover_login(PgSocket *bouncer) _MUSTCHECK;
+void takeover_login_failed(void);
+void takeover_finish(void);
 
index 26e680d5ba792989b6f725b8661f3aa96a63c975..a896bd0c791a0dee448be907dbde880ad1684a15 100644 (file)
@@ -459,13 +459,29 @@ static void check_limits(void)
                 (int)lim.rlim_cur, (int)lim.rlim_max, cf_max_client_conn, fd_count);
 }
 
-static void daemon_setup(void)
+static bool check_old_process_unix(void)
 {
-       if (!cf_reboot)
-               check_pidfile();
-       if (cf_daemon)
-               go_daemon();
-       write_pidfile();
+       struct sockaddr_un sa_un;
+       socklen_t len = sizeof(sa_un);
+       int domain = AF_UNIX;
+       int res, fd;
+
+       if (!*cf_unix_socket_dir)
+               return false;
+
+       memset(&sa_un, 0, len);
+       sa_un.sun_family = domain;
+       snprintf(sa_un.sun_path, sizeof(sa_un.sun_path),
+                "%s/.s.PGSQL.%d", cf_unix_socket_dir, cf_listen_port);
+
+       fd = socket(domain, SOCK_STREAM, 0);
+       if (fd < 0)
+               fatal_perror("cannot create socket");
+       res = safe_connect(fd, (struct sockaddr *)&sa_un, len);
+       safe_close(fd);
+       if (res < 0)
+               return false;
+       return true;
 }
 
 static void main_loop_once(void)
@@ -484,11 +500,25 @@ static void main_loop_once(void)
        rescue_timers();
 }
 
+static void takeover_part1(void)
+{
+       /* use temporary libevent base */
+       void *evtmp = event_init();
+
+       if (!*cf_unix_socket_dir)
+               fatal("cannot reboot if unix dir not configured");
+
+       takeover_init();
+       while (cf_reboot)
+               main_loop_once();
+       event_base_free(evtmp);
+}
+
 /* boot everything */
 int main(int argc, char *argv[])
 {
        int c;
-       int did_takeover = 0;
+       bool did_takeover = false;
 
        /* parse cmdline */
        while ((c = getopt(argc, argv, "avhdVR")) != EOF) {
@@ -529,26 +559,34 @@ int main(int argc, char *argv[])
        srandom(time(NULL) ^ getpid());
 
        if (cf_reboot) {
-               /* use temporary libevent base */
-               void *evtmp = event_init();
-               takeover_init();
-               while (cf_reboot)
-                       main_loop_once();
-               did_takeover = 1;
-               event_base_free(evtmp);
+               if (check_old_process_unix()) {
+                       takeover_part1();
+                       did_takeover = true;
+               } else {
+                       log_info("old process not found, try to continue normally");
+                       cf_reboot = 0;
+                       check_pidfile();
+               }
+       } else {
+               check_pidfile();
+               if (check_old_process_unix())
+                       fatal("somebody is listening on unix socket");
        }
 
        /* initialize subsystems, order important */
-       daemon_setup();
+       if (cf_daemon)
+               go_daemon();
        event_init();
        signal_setup();
        janitor_setup();
        stats_setup();
 
-       if (!did_takeover)
-               pooler_setup();
+       if (did_takeover)
+               takeover_finish();
        else
-               resume_all();
+               pooler_setup();
+
+       write_pidfile();
 
        /* main loop */
        while (1)
index ef1b05056af39e6c728323ea3a4028005306306f..a7f6df23a96664d4bbf4f31bc50f0b75f33219b7 100644 (file)
@@ -274,6 +274,7 @@ bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data, void *arg)
 {
        bool res = false;
        PgSocket *server = arg;
+       PgPool *pool = server->pool;
        PktHdr pkt;
 
        Assert(is_server_socket(server));
@@ -295,13 +296,13 @@ bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data, void *arg)
        case SBUF_EV_READ:
                if (mbuf_avail(data) < NEW_HEADER_LEN) {
                        slog_noise(server, "S: got partial header, trying to wait a bit");
-                       return false;
+                       break;
                }
 
                /* parse pkt header */
                if (!get_header(data, &pkt)) {
                        disconnect_server(server, true, "bad pkt header");
-                       return false;
+                       break;
                }
                slog_noise(server, "S: pkt '%c', len=%d", pkt_desc(&pkt), pkt.len);
 
@@ -331,7 +332,7 @@ bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data, void *arg)
                res = handle_connect(server);
                break;
        case SBUF_EV_FLUSH:
-               res = true; /* unused actually */
+               res = true;
                if (!server->ready)
                        break;
 
@@ -362,6 +363,8 @@ bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data, void *arg)
                slog_warning(server, "SBUF_EV_PKT_CALLBACK with state=%d", server->state);
                break;
        }
+       if (!res && pool->admin)
+               takeover_login_failed();
        return res;
 }
 
index e2b34581263d0daecd7299b2cf7ed0609a2ab0cf..f11b49b53b8891257ef0e6e4ff2755b8b37953d9 100644 (file)
  * Takeover done, old process shut down,
  * kick this one running.
  */
-static void takeover_finish(PgSocket *bouncer)
+
+static PgSocket *old_bouncer = NULL;
+
+void takeover_finish(void)
+{
+       uint8_t buf[512];
+       int fd = sbuf_socket(&old_bouncer->sbuf);
+       bool res;
+       int got;
+
+       log_info("sending SHUTDOWN;");
+       socket_set_nonblocking(fd, 0);
+       SEND_generic(res, old_bouncer, 'Q', "s", "SHUTDOWN;");
+       if (!res)
+               fatal("failed to send SHUTDOWN;");
+
+       while (1) {
+               got = safe_recv(fd, buf, sizeof(buf), 0);
+               if (got == 0)
+                       break;
+               if (got < 0)
+                       fatal_perror("sky is falling - error while waiting result from SHUTDOWN");
+       }
+
+       disconnect_server(old_bouncer, false, "disko over");
+       old_bouncer = NULL;
+
+       log_info("old process killed, resuming work");
+       resume_all();
+}
+
+static void takeover_finish_part1(PgSocket *bouncer)
 {
-       disconnect_server(bouncer, false, "disko over");
+       Assert(old_bouncer == NULL);
+
+       /* unregister bouncer from libevent */
+       sbuf_pause(&bouncer->sbuf);
+       old_bouncer = bouncer;
        cf_reboot = 0;
-       log_info("disko over, resuming work");
+       log_info("disko over, going background");
 }
 
 /* parse msg for fd and info */
@@ -164,14 +199,11 @@ static void next_command(PgSocket *bouncer, MBuf *pkt)
                log_info("SUSPEND finished, sending SHOW FDS");
                SEND_generic(res, bouncer, 'Q', "s", "SHOW FDS;");
        } else if (strncmp(cmd, "SHOW", 4) == 0) {
-
-               log_info("SHOW FDS finished, sending SHUTDOWN");
-
                /* all fds loaded, review them */
                takeover_postprocess_fds();
+               log_info("SHOW FDS finished");
 
-               /* all OK, kill old one */
-               SEND_generic(res, bouncer, 'Q', "s", "SHUTDOWN;");
+               takeover_finish_part1(bouncer);
        } else
                fatal("got bad CMD from old bouncer: %s", cmd);
 
@@ -228,7 +260,7 @@ static void takeover_parse_data(PgSocket *bouncer,
 /*
  * listen for data from old bouncer.
  *
- * use always sendmsg, to keep code simpler
+ * use always recvmsg, to keep code simpler
  */
 static void takeover_recv_cb(int sock, short flags, void *arg)
 {
@@ -253,7 +285,7 @@ static void takeover_recv_cb(int sock, short flags, void *arg)
                mbuf_init(&data, data_buf, res);
                takeover_parse_data(bouncer, &msg, &data);
        } else if (res == 0) {
-               takeover_finish(bouncer);
+               fatal("unexpected EOF");
        } else {
                if (errno == EAGAIN)
                        return;
@@ -296,3 +328,8 @@ void takeover_init(void)
        launch_new_connection(pool);
 }
 
+void takeover_login_failed(void)
+{
+       fatal("login failed");
+}
+