/*
* PgBouncer - Lightweight connection pooler for PostgreSQL.
- *
- * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
- *
- * Permission to use, copy, modify, and distribute this software for any
+ *
+ * Copyright (c) 2007-2009 Marko Kreen, Skype Technologies OÜ
+ *
+ * Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
- *
+ *
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
#include "bouncer.h"
-static int fd_net = 0;
-static int fd_unix = 0;
-static struct event ev_net;
-static struct event ev_unix;
-static int suspended = 0;
+#include <usual/netdb.h>
+struct ListenSocket {
+ struct List node;
+ int fd;
+ bool active;
+ struct event ev;
+ PgAddr addr;
+};
+
+static STATLIST(sock_list);
+
+/* hints for getaddrinfo(listen_addr) */
+static const struct addrinfo hints = {
+ .ai_family = AF_UNSPEC,
+ .ai_socktype = SOCK_STREAM,
+ .ai_protocol = IPPROTO_TCP,
+ .ai_flags = AI_PASSIVE,
+};
+
+/* should listening sockets be active or suspended? */
+static bool need_active = false;
+/* is it actually active or suspended? */
+static bool pooler_active = false;
+
+/* on accept() failure sleep 5 seconds */
static struct event ev_err;
static struct timeval err_timeout = {5, 0};
-static void cleanup_unix_socket(void)
+static void tune_accept(int sock, bool on);
+
+/* atexit() cleanup func */
+static void cleanup_sockets(void)
{
- char fn[256];
- if (!cf_unix_socket_dir || suspended)
+ struct ListenSocket *ls;
+ struct List *el;
+
+ /* avoid cleanup if exit() while suspended */
+ if (cf_pause_mode == P_SUSPEND)
return;
- snprintf(fn, sizeof(fn), "%s/.s.PGSQL.%d",
- cf_unix_socket_dir, cf_listen_port);
- unlink(fn);
-}
-void get_pooler_fds(int *p_net, int *p_unix)
-{
- *p_net = fd_net;
- *p_unix = fd_unix;
+ while ((el = statlist_pop(&sock_list)) != NULL) {
+ ls = container_of(el, struct ListenSocket, node);
+ if (ls->fd > 0) {
+ safe_close(ls->fd);
+ ls->fd = 0;
+ }
+ if (pga_is_unix(&ls->addr)) {
+ char buf[sizeof(struct sockaddr_un) + 20];
+ snprintf(buf, sizeof(buf), "%s/.s.PGSQL.%d", cf_unix_socket_dir, cf_listen_port);
+ unlink(buf);
+ }
+ statlist_remove(&sock_list, &ls->node);
+ free(ls);
+ }
}
-static int create_unix_socket(const char *socket_dir, int listen_port)
+/*
+ * initialize another listening socket.
+ */
+static bool add_listen(int af, const struct sockaddr *sa, int salen)
{
- struct sockaddr_un un;
- int res, sock;
- char lockfile[256];
- struct stat st;
+ struct ListenSocket *ls;
+ int sock, res;
+ char buf[128];
+ const char *errpos;
- /* fill sockaddr struct */
- memset(&un, 0, sizeof(un));
- un.sun_family = AF_UNIX;
- snprintf(un.sun_path, sizeof(un.sun_path),
- "%s/.s.PGSQL.%d", socket_dir, listen_port);
-
- /* check for lockfile */
- snprintf(lockfile, sizeof(lockfile), "%s.lock", un.sun_path);
- res = lstat(lockfile, &st);
- if (res == 0)
- fatal("unix port %d is in use", listen_port);
-
- /* expect old bouncer gone */
- unlink(un.sun_path);
+ log_debug("add_listen: %s", sa2str(sa, buf, sizeof(buf)));
/* create socket */
- sock = socket(PF_UNIX, SOCK_STREAM, 0);
+ errpos = "socket";
+ sock = socket(af, SOCK_STREAM, 0);
if (sock < 0)
- fatal_perror("socket");
+ goto failed;
+
+ /* SO_REUSEADDR behaviour it default in WIN32. */
+#ifndef WIN32
+ /* relaxed binding */
+ if (af != AF_UNIX) {
+ int val = 1;
+ errpos = "setsockopt";
+ res = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
+ if (res < 0)
+ goto failed;
+ }
+#endif
+
+#ifdef IPV6_V6ONLY
+ /* avoid ipv6 socket's attempt to takeover ipv4 port */
+ if (af == AF_INET6) {
+ int val = 1;
+ errpos = "setsockopt/IPV6_V6ONLY";
+ res = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
+ if (res < 0)
+ goto failed;
+ }
+#endif
+
+ /*
+ * If configured, set SO_REUSEPORT or equivalent. If it's not
+ * enabled, just leave the socket alone. (We could also unset
+ * the socket option in that case, but this area is fairly
+ * unportable, so perhaps better to avoid it.)
+ */
+ if (af != AF_UNIX && cf_so_reuseport) {
+#if defined(SO_REUSEPORT)
+ int val = 1;
+ errpos = "setsockopt/SO_REUSEPORT";
+ res = setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val));
+ if (res < 0)
+ goto failed;
+#elif defined(SO_REUSEPORT_LB)
+ int val = 1;
+ errpos = "setsockopt/SO_REUSEPORT_LB";
+ res = setsockopt(sock, SOL_SOCKET, SO_REUSEPORT_LB, &val, sizeof(val));
+ if (res < 0)
+ goto failed;
+#else
+ fatal("so_reuseport not supported on this platform");
+#endif
+ }
/* bind it */
- res = bind(sock, (const struct sockaddr *)&un, sizeof(un));
+ errpos = "bind";
+ res = bind(sock, sa, salen);
if (res < 0)
- fatal_perror("bind");
-
- /* remove socket on shutdown */
- atexit(cleanup_unix_socket);
+ goto failed;
/* set common options */
- tune_socket(sock, true);
+ errpos = "tune_socket";
+ if (!tune_socket(sock, (af == AF_UNIX)))
+ goto failed;
/* finally, accept connections */
- res = listen(sock, 100);
+ errpos = "listen";
+ res = listen(sock, cf_listen_backlog);
if (res < 0)
- fatal_perror("listen");
+ goto failed;
- res = chmod(un.sun_path, 0777);
- if (res < 0)
- fatal_perror("chmod");
+ errpos = "calloc";
+ ls = calloc(1, sizeof(*ls));
+ if (!ls)
+ goto failed;
+
+ list_init(&ls->node);
+ ls->fd = sock;
+ if (sa->sa_family == AF_UNIX) {
+ pga_set(&ls->addr, AF_UNIX, cf_listen_port);
+ } else {
+ pga_copy(&ls->addr, sa);
+ }
- log_info("listening on unix:%s", un.sun_path);
+ if (af == AF_UNIX) {
+ struct sockaddr_un *un = (struct sockaddr_un *)sa;
+ change_file_mode(un->sun_path, cf_unix_socket_mode, NULL, cf_unix_socket_group);
+ } else {
+ tune_accept(sock, cf_tcp_defer_accept);
+ }
- return sock;
+ log_info("listening on %s", sa2str(sa, buf, sizeof(buf)));
+ statlist_append(&sock_list, &ls->node);
+ return true;
+
+failed:
+ log_warning("cannot listen on %s: %s(): %s",
+ sa2str(sa, buf, sizeof(buf)),
+ errpos, strerror(errno));
+ if (sock >= 0)
+ safe_close(sock);
+ return false;
}
-static int create_net_socket(const char *listen_addr, int listen_port)
+static void create_unix_socket(const char *socket_dir, int listen_port)
{
- int sock;
- struct sockaddr_in sa;
+ struct sockaddr_un un;
int res;
- int val;
+ char lockfile[sizeof(struct sockaddr_un) + 10];
+ struct stat st;
- /* create socket */
- sock = socket(AF_INET, SOCK_STREAM, 0);
- if (sock < 0)
- fatal_perror("socket");
-
- /* parse address */
- memset(&sa, 0, sizeof(sa));
- sa.sin_family = AF_INET;
- sa.sin_port = htons(cf_listen_port);
- if (strcmp(listen_addr, "*") == 0) {
- sa.sin_addr.s_addr = htonl(INADDR_ANY);
- } else {
- sa.sin_addr.s_addr = inet_addr(listen_addr);
- if (sa.sin_addr.s_addr == INADDR_NONE)
- fatal("cannot parse addr: '%s'", listen_addr);
- }
+ /* fill sockaddr struct */
+ memset(&un, 0, sizeof(un));
+ un.sun_family = AF_UNIX;
+ snprintf(un.sun_path, sizeof(un.sun_path),
+ "%s/.s.PGSQL.%d", socket_dir, listen_port);
- /* relaxed binding */
- val = 1;
- res = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
- if (res < 0)
- fatal_perror("setsockopt");
+ /* check for lockfile */
+ snprintf(lockfile, sizeof(lockfile), "%s.lock", un.sun_path);
+ res = lstat(lockfile, &st);
+ if (res == 0)
+ fatal("unix port %d is in use", listen_port);
- /* bind to address */
- res = bind(sock, (struct sockaddr *)&sa, sizeof(sa));
- if (res < 0)
- fatal_perror("bind");
+ /* expect old bouncer gone */
+ unlink(un.sun_path);
- /* set common options */
- tune_socket(sock, false);
+ add_listen(AF_UNIX, (const struct sockaddr *)&un, sizeof(un));
+}
+/*
+ * Notify pooler only when also data is arrived.
+ *
+ * optval specifies how long after connection attempt to wait for data.
+ *
+ * Related to tcp_synack_retries sysctl, default 5 (corresponds 180 secs).
+ *
+ * SO_ACCEPTFILTER needs to be set after listen(), maybe TCP_DEFER_ACCEPT too.
+ */
+static void tune_accept(int sock, bool on)
+{
+ const char *act = on ? "install" : "uninstall";
+ int res = 0;
#ifdef TCP_DEFER_ACCEPT
- /*
- * Notify pooler only when also data is arrived.
- *
- * optval specifies how long after connection attempt to wait for data.
- *
- * Related to tcp_synack_retries sysctl, default 5 (corresponds 180 secs).
- */
- if (cf_tcp_defer_accept > 0) {
- val = cf_tcp_defer_accept;
- res = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, sizeof(val));
- if (res < 0)
- fatal_perror("setsockopt TCP_DEFER_ACCEPT");
- }
+ int val = 45; /* FIXME: proper value */
+ socklen_t vlen = sizeof(val);
+ res = getsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, &vlen);
+ log_noise("old TCP_DEFER_ACCEPT on %d = %d", sock, val);
+ val = on ? 1 : 0;
+ log_noise("%s TCP_DEFER_ACCEPT on %d", act, sock);
+ res = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, sizeof(val));
+#else
+#if 0
+#ifdef SO_ACCEPTFILTER
+ struct accept_filter_arg af, *afp = on ? &af : NULL;
+ socklen_t af_len = on ? sizeof(af) : 0;
+ memset(&af, 0, sizeof(af));
+ strcpy(af.af_name, "dataready");
+ log_noise("%s SO_ACCEPTFILTER on %d", act, sock);
+ res = setsockopt(sock, SOL_SOCKET, SO_ACCEPTFILTER, afp, af_len);
+#endif
+#endif
#endif
-
- /* finally, accept connections */
- res = listen(sock, 100);
if (res < 0)
- fatal_perror("listen");
+ log_warning("tune_accept: %s TCP_DEFER_ACCEPT/SO_ACCEPTFILTER: %s",
+ act, strerror(errno));
+}
- log_info("listening on %s:%d", cf_listen_addr, cf_listen_port);
+void pooler_tune_accept(bool on)
+{
+ struct List *el;
+ struct ListenSocket *ls;
+ statlist_for_each(el, &sock_list) {
+ ls = container_of(el, struct ListenSocket, node);
+ if (!pga_is_unix(&ls->addr))
+ tune_accept(ls->fd, on);
+ }
+}
- return sock;
+static void err_wait_func(evutil_socket_t sock, short flags, void *arg)
+{
+ if (cf_pause_mode != P_SUSPEND)
+ resume_pooler();
}
-static void err_wait_func(int sock, short flags, void *arg)
+static const char *addrpair(const PgAddr *src, const PgAddr *dst)
{
- resume_pooler();
+ static char ip1buf[PGADDR_BUF], ip2buf[PGADDR_BUF],
+ buf[2*PGADDR_BUF + 16];
+ const char *ip1, *ip2;
+ if (pga_is_unix(src))
+ return "unix->unix";
+
+ ip1 = pga_ntop(src, ip1buf, sizeof(ip1buf));
+ ip2 = pga_ntop(src, ip2buf, sizeof(ip2buf));
+ snprintf(buf, sizeof(buf), "%s:%d -> %s:%d",
+ ip1, pga_port(src), ip2, pga_port(dst));
+ return buf;
+}
+
+static const char *conninfo(const PgSocket *sk)
+{
+ if (is_server_socket(sk)) {
+ return addrpair(&sk->local_addr, &sk->remote_addr);
+ } else {
+ return addrpair(&sk->remote_addr, &sk->local_addr);
+ }
}
/* got new connection, associate it with client struct */
-static void
-pool_accept(int sock, short flags, void *is_unix)
+static void pool_accept(evutil_socket_t sock, short flags, void *arg)
{
+ struct ListenSocket *ls = arg;
int fd;
+ PgSocket *client;
union {
struct sockaddr_in in;
+ struct sockaddr_in6 in6;
struct sockaddr_un un;
struct sockaddr sa;
- } addr;
- socklen_t len = sizeof(addr);
+ } raddr;
+ socklen_t len = sizeof(raddr);
+ bool is_unix = pga_is_unix(&ls->addr);
+ if(!(flags & EV_READ)) {
+ log_warning("no EV_READ in pool_accept");
+ return;
+ }
+loop:
/* get fd */
- fd = accept(sock, &addr.sa, &len);
+ fd = safe_accept(sock, &raddr.sa, &len);
if (fd < 0) {
+ if (errno == EAGAIN)
+ return;
+ else if (errno == ECONNABORTED)
+ return;
+
/*
- * probably fd limit, pointess to try often
+ * probably fd limit, pointless to try often
* wait a bit, hope that admin resolves somehow
*/
log_error("accept() failed: %s", strerror(errno));
- suspend_pooler();
evtimer_set(&ev_err, err_wait_func, NULL);
- evtimer_add(&ev_err, &err_timeout);
+ safe_evtimer_add(&ev_err, &err_timeout);
+ suspend_pooler();
return;
}
log_noise("new fd from accept=%d", fd);
if (is_unix) {
- log_debug("P: new unix client");
- {
- uid_t uid;
- log_noise("getuid(): %d", (int)getuid());
- if (get_unix_peer_uid(fd, &uid))
- log_noise("unix peer uid: %d", (int)uid);
- else
- log_noise("unix peer uid failed");
- }
- accept_client(fd, NULL, true);
+ client = accept_client(fd, true);
} else {
- log_debug("P: new tcp client");
- accept_client(fd, &addr.in, false);
+ client = accept_client(fd, false);
}
+
+ if (client)
+ slog_debug(client, "P: got connection: %s", conninfo(client));
+
+ /*
+ * there may be several clients waiting,
+ * avoid context switch by looping
+ */
+ goto loop;
}
-bool
-use_pooler_socket(int sock, bool is_unix)
+bool use_pooler_socket(int sock, bool is_unix)
{
- tune_socket(sock, is_unix);
+ struct ListenSocket *ls;
+ int res;
+ char buf[PGADDR_BUF];
- if (is_unix)
- fd_unix = sock;
- else
- fd_net = sock;
+ if (!tune_socket(sock, is_unix))
+ return false;
+
+ ls = calloc(1, sizeof(*ls));
+ if (!ls)
+ return false;
+ ls->fd = sock;
+ if (is_unix) {
+ pga_set(&ls->addr, AF_UNIX, cf_listen_port);
+ } else {
+ struct sockaddr_storage ss;
+ socklen_t len = sizeof(ss);
+ res = getsockname(sock, (struct sockaddr *)&ss, &len);
+ if (res < 0) {
+ log_error("getsockname failed");
+ free(ls);
+ return false;
+ }
+ pga_copy(&ls->addr, (struct sockaddr *)&ss);
+ }
+ log_info("got pooler socket: %s", pga_str(&ls->addr, buf, sizeof(buf)));
+ statlist_append(&sock_list, &ls->node);
return true;
}
-void
-suspend_pooler(void)
+void suspend_pooler(void)
{
- suspended = 1;
+ struct List *el;
+ struct ListenSocket *ls;
+
+ need_active = false;
+ statlist_for_each(el, &sock_list) {
+ ls = container_of(el, struct ListenSocket, node);
+ if (!ls->active)
+ continue;
+ if (event_del(&ls->ev) < 0) {
+ log_warning("suspend_pooler, event_del: %s", strerror(errno));
+ return;
+ }
+ ls->active = false;
+ }
+ pooler_active = false;
+}
- if (fd_net)
- event_del(&ev_net);
- if (fd_unix)
- event_del(&ev_unix);
+void resume_pooler(void)
+{
+ struct List *el;
+ struct ListenSocket *ls;
+
+ need_active = true;
+ statlist_for_each(el, &sock_list) {
+ ls = container_of(el, struct ListenSocket, node);
+ if (ls->active)
+ continue;
+ event_set(&ls->ev, ls->fd, EV_READ | EV_PERSIST, pool_accept, ls);
+ if (event_add(&ls->ev, NULL) < 0) {
+ log_warning("event_add failed: %s", strerror(errno));
+ return;
+ }
+ ls->active = true;
+ }
+ pooler_active = true;
}
-void
-resume_pooler(void)
+/* retry previously failed suspend_pooler() / resume_pooler() */
+void per_loop_pooler_maint(void)
{
- suspended = 0;
+ if (need_active && !pooler_active)
+ resume_pooler();
+ else if (!need_active && pooler_active)
+ suspend_pooler();
+}
- if (fd_unix) {
- event_set(&ev_unix, fd_unix, EV_READ | EV_PERSIST, pool_accept, "1");
- event_add(&ev_unix, NULL);
+static bool parse_addr(void *arg, const char *addr)
+{
+ int res;
+ char service[64];
+ struct addrinfo *ai, *gaires = NULL;
+ bool ok;
+
+ if (!*addr)
+ return true;
+ if (strcmp(addr, "*") == 0)
+ addr = NULL;
+ snprintf(service, sizeof(service), "%d", cf_listen_port);
+
+ res = getaddrinfo(addr, service, &hints, &gaires);
+ if (res != 0) {
+ fatal("getaddrinfo('%s', '%d') = %s [%d]", addr ? addr : "*",
+ cf_listen_port, gai_strerror(res), res);
}
- if (fd_net) {
- event_set(&ev_net, fd_net, EV_READ | EV_PERSIST, pool_accept, NULL);
- event_add(&ev_net, NULL);
+ for (ai = gaires; ai; ai = ai->ai_next) {
+ ok = add_listen(ai->ai_family, ai->ai_addr, ai->ai_addrlen);
+ /* it's unclear whether all or only first result should be used */
+ if (0 && ok)
+ break;
}
+
+ freeaddrinfo(gaires);
+ return true;
}
/* listen on socket - should happen after all other initializations */
-void
-pooler_setup(void)
+void pooler_setup(void)
{
- if (cf_listen_addr && !fd_net)
- fd_net = create_net_socket(cf_listen_addr, cf_listen_port);
+ bool ok;
+ static int init_done = 0;
+
+ if (!init_done) {
+ /* remove socket on shutdown */
+ atexit(cleanup_sockets);
+ init_done = 1;
+ }
+
+ ok = parse_word_list(cf_listen_addr, parse_addr, NULL);
+ if (!ok)
+ fatal("failed to parse listen_addr list: %s", cf_listen_addr);
- if (cf_unix_socket_dir && !fd_unix)
- fd_unix = create_unix_socket(cf_unix_socket_dir, cf_listen_port);
+ if (cf_unix_socket_dir && *cf_unix_socket_dir)
+ create_unix_socket(cf_unix_socket_dir, cf_listen_port);
- if (!fd_net && !fd_unix)
+ if (!statlist_count(&sock_list))
fatal("nowhere to listen on");
resume_pooler();
}
+bool for_each_pooler_fd(pooler_cb cbfunc, void *arg)
+{
+ struct List *el;
+ struct ListenSocket *ls;
+ bool ok;
+
+ statlist_for_each(el, &sock_list) {
+ ls = container_of(el, struct ListenSocket, node);
+ ok = cbfunc(arg, ls->fd, &ls->addr);
+ if (!ok)
+ return false;
+ }
+ return true;
+}