#include "bouncer.h"
-static int fd_net = 0;
-static int fd_unix = 0;
-#ifdef HAVE_IPV6
-static int fd_net_v6 = 0;
-#endif
+#include <usual/netdb.h>
+struct ListenSocket {
+ struct List node;
+ int fd;
+ bool active;
+ struct event ev;
+ PgAddr addr;
+};
-static struct event ev_net;
-static struct event ev_unix;
+static STATLIST(sock_list);
-/* if sockets are registered in libevent */
-static bool reg_net = false;
-static bool reg_unix = false;
+/* hints for getaddrinfo(listen_addr) */
+static const struct addrinfo hints = {
+ .ai_family = AF_INET6,
+ .ai_socktype = SOCK_STREAM,
+ .ai_protocol = IPPROTO_TCP,
+ .ai_flags = AI_PASSIVE,
+};
/* should listening sockets be active or suspended? */
+static bool need_active = false;
+/* is it actually active or suspended? */
static bool pooler_active = false;
/* on accept() failure sleep 5 seconds */
static struct event ev_err;
static struct timeval err_timeout = {5, 0};
+static void tune_accept(int sock, bool on);
+
/* atexit() cleanup func */
-static void cleanup_unix_socket(void)
+static void cleanup_sockets(void)
{
- char fn[256];
+ struct ListenSocket *ls;
+ struct List *el;
/* avoid cleanup if exit() while suspended */
- if (!reg_unix)
+ if (cf_pause_mode == P_SUSPEND)
return;
- snprintf(fn, sizeof(fn), "%s/.s.PGSQL.%d",
- cf_unix_socket_dir, cf_listen_port);
- unlink(fn);
+ while ((el = statlist_pop(&sock_list)) != NULL) {
+ ls = container_of(el, struct ListenSocket, node);
+ if (ls->fd > 0) {
+ safe_close(ls->fd);
+ ls->fd = 0;
+ }
+ if (pga_is_unix(&ls->addr)) {
+ char buf[sizeof(struct sockaddr_un) + 20];
+ snprintf(buf, sizeof(buf), "%s/.s.PGSQL.%d", cf_unix_socket_dir, cf_listen_port);
+ unlink(buf);
+ }
+ statlist_remove(&sock_list, &ls->node);
+ free(ls);
+ }
}
-void get_pooler_fds(int *p_net, int *p_unix)
+/*
+ * initialize another listening socket.
+ *
+ * currently all initialization errors are fatal.
+ * should we tolerate some?
+ */
+static void add_listen(int af, const struct sockaddr *sa, int salen)
{
- *p_net = fd_net;
- *p_unix = fd_unix;
+ struct ListenSocket *ls;
+ int sock, res, val;
+ char buf[128];
+
+ log_debug("add_listen: %s", sa2str(sa, buf, sizeof(buf)));
+
+ /* create socket */
+ sock = socket(af, SOCK_STREAM, 0);
+ if (sock < 0)
+ fatal_perror("socket");
+
+ /* relaxed binding */
+ if (af != AF_UNIX) {
+ val = 1;
+ res = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
+ if (res < 0)
+ fatal_perror("setsockopt");
+ }
+
+ /* bind it */
+ res = bind(sock, sa, salen);
+ if (res < 0)
+ fatal_perror("bind");
+
+ /* set common options */
+ if (!tune_socket(sock, (af == AF_UNIX)))
+ fatal_perror("tune_socket");
+
+ /* finally, accept connections */
+ res = listen(sock, cf_listen_backlog);
+ if (res < 0)
+ fatal_perror("listen");
+
+ ls = calloc(1, sizeof(*ls));
+ if (!ls)
+ fatal_perror("calloc");
+
+ list_init(&ls->node);
+ ls->fd = sock;
+ if (sa->sa_family == AF_UNIX) {
+ pga_set(&ls->addr, AF_UNIX, cf_listen_port);
+ } else {
+ pga_copy(&ls->addr, sa);
+ }
+
+ if (af == AF_UNIX) {
+ const struct sockaddr_un *un;
+ un = (struct sockaddr_un *)sa;
+ res = chmod(un->sun_path, 0777);
+ if (res < 0)
+ fatal_perror("chmod");
+ } else {
+ tune_accept(sock, cf_tcp_defer_accept);
+ }
+
+ log_info("listening on %s", sa2str(sa, buf, sizeof(buf)));
+ statlist_append(&sock_list, &ls->node);
}
-static int create_unix_socket(const char *socket_dir, int listen_port)
+static void create_unix_socket(const char *socket_dir, int listen_port)
{
struct sockaddr_un un;
- int res, sock;
- char lockfile[256];
+ int res;
+ char lockfile[sizeof(struct sockaddr_un) + 10];
struct stat st;
/* fill sockaddr struct */
/* expect old bouncer gone */
unlink(un.sun_path);
- /* create socket */
- sock = socket(PF_UNIX, SOCK_STREAM, 0);
- if (sock < 0)
- fatal_perror("socket");
-
- /* bind it */
- res = bind(sock, (const struct sockaddr *)&un, sizeof(un));
- if (res < 0)
- fatal_perror("bind");
-
- /* remove socket on shutdown */
- atexit(cleanup_unix_socket);
-
- /* set common options */
- if (!tune_socket(sock, true))
- fatal_perror("tune_socket");
-
- /* finally, accept connections */
- res = listen(sock, cf_listen_backlog);
- if (res < 0)
- fatal_perror("listen");
-
- res = chmod(un.sun_path, 0777);
- if (res < 0)
- fatal_perror("chmod");
-
- log_info("listening on unix:%s", un.sun_path);
-
- return sock;
+ add_listen(AF_UNIX, (const struct sockaddr *)&un, sizeof(un));
}
/*
void pooler_tune_accept(bool on)
{
- if (fd_net > 0)
- tune_accept(fd_net, on);
-}
-
-static int create_net_socket(const char *listen_addr, int listen_port)
-{
- int sock;
- struct sockaddr_in sa4;
- struct sockaddr_in6 sa6;
- struct sockaddr * sa;
- int sa_size = 0;
- int res;
- int val;
-
-
- /* parse address as IPv4 */
- memset(&sa4, 0, sizeof(sa));
- sa = (struct sockaddr *)&sa4;
- sa_size = sizeof(sa4);
- sa4.sin_family = AF_INET;
- sa4.sin_port = htons(cf_listen_port);
- if (strcmp(listen_addr, "*") == 0) {
- sa4.sin_addr.s_addr = htonl(INADDR_ANY);
- } else {
- sa4.sin_addr.s_addr = inet_addr(listen_addr);
- }
- if (sa4.sin_addr.s_addr == INADDR_NONE) {
- /* IPv4 addr not foundm re-parse address as IPv6 */
- log_info("trying to parse %s as IPv6 address", listen_addr);
- sa = (struct sockaddr *)&sa6;
- sa_size = sizeof(sa6);
- memset(&sa6, 0, sizeof(sa6));
- sa6.sin6_family = AF_INET6;
- sa6.sin6_port = htons(cf_listen_port);
-
- if (inet_pton(AF_INET6, listen_addr, (void *) sa6.sin6_addr.s6_addr) <= 0)
- fatal("cannot parse addr: '%s'", listen_addr);
- }
-
- /* create socket of right type */ //NB! turn if() into x?y:z
- if (sa6.sin6_family == AF_INET6){
- sock = socket(AF_INET6, SOCK_STREAM, 0);
- log_info("created AF_INET6 socket");
- } else {
- sock = socket(AF_INET, SOCK_STREAM, 0);
- log_info("created AF_INET socket");
+ struct List *el;
+ struct ListenSocket *ls;
+ statlist_for_each(el, &sock_list) {
+ ls = container_of(el, struct ListenSocket, node);
+ if (!pga_is_unix(&ls->addr))
+ tune_accept(ls->fd, on);
}
- if (sock < 0)
- fatal_perror("socket");
-
- /* relaxed binding */
- val = 1;
- res = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
- if (res < 0)
- fatal_perror("setsockopt");
-
- /* bind to address */
- res = bind(sock, sa, sa_size);
- if (res < 0)
- fatal_perror("bind");
-
- /* set common options */
- if (!tune_socket(sock, false))
- fatal_perror("tune_socket");
-
- /* make it accept connections */
- res = listen(sock, cf_listen_backlog);
- if (res < 0)
- fatal_perror("listen");
-
- tune_accept(sock, cf_tcp_defer_accept);
-
- log_info("listening on %s:%d", cf_listen_addr, cf_listen_port);
-
- return sock;
}
static void err_wait_func(int sock, short flags, void *arg)
}
/* got new connection, associate it with client struct */
-static void pool_accept(int sock, short flags, void *is_unix)
+static void pool_accept(int sock, short flags, void *arg)
{
+ struct ListenSocket *ls = arg;
int fd;
PgSocket *client;
union {
struct sockaddr_in in;
+ struct sockaddr_in6 in6;
struct sockaddr_un un;
struct sockaddr sa;
- } addr;
- socklen_t len = sizeof(addr);
+ } raddr;
+ socklen_t len = sizeof(raddr);
+ bool is_unix = pga_is_unix(&ls->addr);
if(!(flags & EV_READ)) {
log_warning("No EV_READ in pool_accept");
}
loop:
/* get fd */
- fd = safe_accept(sock, &addr.sa, &len);
+ fd = safe_accept(sock, &raddr.sa, &len);
if (fd < 0) {
if (errno == EAGAIN)
return;
log_noise("new fd from accept=%d", fd);
if (is_unix) {
- {
- uid_t uid;
- gid_t gid;
- log_noise("getuid(): %d", (int)getuid());
- if (getpeereid(fd, &uid, &gid) >= 0)
- log_noise("unix peer uid: %d", (int)uid);
- else
- log_warning("unix peer uid failed: %s", strerror(errno));
- }
- client = accept_client(fd, NULL, true);
+ uid_t uid;
+ gid_t gid;
+ log_noise("getuid(): %d", (int)getuid());
+ if (getpeereid(fd, &uid, &gid) >= 0)
+ log_noise("unix peer uid: %d", (int)uid);
+ else
+ log_warning("unix peer uid failed: %s", strerror(errno));
+
+ client = accept_client(fd, true);
} else {
- client = accept_client(fd, &addr.in, false);
+ client = accept_client(fd, false);
}
if (client)
bool use_pooler_socket(int sock, bool is_unix)
{
+ struct ListenSocket *ls;
+ int res;
+ char buf[PGADDR_BUF];
+
if (!tune_socket(sock, is_unix))
return false;
- if (is_unix)
- fd_unix = sock;
- else
- fd_net = sock;
+ ls = calloc(1, sizeof(*ls));
+ ls->fd = sock;
+ if (is_unix) {
+ pga_set(&ls->addr, AF_UNIX, cf_listen_port);
+ } else {
+ struct sockaddr_storage ss;
+ socklen_t len = sizeof(ss);
+ res = getsockname(sock, (struct sockaddr *)&ss, &len);
+ if (res < 0) {
+ log_error("getsockname failed");
+ free(ls);
+ return false;
+ }
+ pga_copy(&ls->addr, (struct sockaddr *)&ss);
+ }
+ log_info("got pooler socket: %s", pga_str(&ls->addr, buf, sizeof(buf)));
+ statlist_append(&sock_list, &ls->node);
return true;
}
void suspend_pooler(void)
{
- pooler_active = false;
-
- if (fd_net && reg_net) {
- if (event_del(&ev_net) < 0) {
- log_warning("suspend_pooler, event_del: %s", strerror(errno));
- return;
- }
- reg_net = false;
- }
- if (fd_unix && reg_unix) {
- if (event_del(&ev_unix) < 0) {
+ struct List *el;
+ struct ListenSocket *ls;
+
+ need_active = false;
+ statlist_for_each(el, &sock_list) {
+ ls = container_of(el, struct ListenSocket, node);
+ if (!ls->active)
+ continue;
+ if (event_del(&ls->ev) < 0) {
log_warning("suspend_pooler, event_del: %s", strerror(errno));
return;
}
- reg_unix = false;
+ ls->active = false;
}
+ pooler_active = false;
}
void resume_pooler(void)
{
- pooler_active = true;
-
- if (fd_unix && !reg_unix) {
- event_set(&ev_unix, fd_unix, EV_READ | EV_PERSIST, pool_accept, "1");
- if (event_add(&ev_unix, NULL) < 0) {
+ struct List *el;
+ struct ListenSocket *ls;
+
+ need_active = true;
+ statlist_for_each(el, &sock_list) {
+ ls = container_of(el, struct ListenSocket, node);
+ if (ls->active)
+ continue;
+ event_set(&ls->ev, ls->fd, EV_READ | EV_PERSIST, pool_accept, ls);
+ if (event_add(&ls->ev, NULL) < 0) {
log_warning("event_add failed: %s", strerror(errno));
return;
}
- reg_unix = true;
+ ls->active = true;
}
+ pooler_active = true;
+}
- if (fd_net && !reg_net) {
- event_set(&ev_net, fd_net, EV_READ | EV_PERSIST, pool_accept, NULL);
- if (event_add(&ev_net, NULL) < 0) {
- log_warning("event_add failed: %s", strerror(errno));
- }
- reg_net = true;
+/* retry previously failed suspend_pooler() / resume_pooler() */
+void per_loop_pooler_maint(void)
+{
+ if (need_active && !pooler_active)
+ resume_pooler();
+ else if (!need_active && pooler_active)
+ suspend_pooler();
+}
+
+static bool parse_addr(void *arg, const char *addr)
+{
+ int res;
+ char service[64];
+ struct addrinfo *ai, *gaires = NULL;
+
+ if (!*addr)
+ return true;
+ if (strcmp(addr, "*") == 0)
+ addr = NULL;
+ snprintf(service, sizeof(service), "%d", cf_listen_port);
+
+ res = getaddrinfo(addr, service, &hints, &gaires);
+ if (res != 0) {
+ fatal("getaddrinfo('%s', '%d') = %s [%d]", addr ? addr : "*",
+ cf_listen_port, gai_strerror(res), res);
}
+
+ for (ai = gaires; ai; ai = ai->ai_next)
+ add_listen(ai->ai_family, ai->ai_addr, ai->ai_addrlen);
+
+ freeaddrinfo(gaires);
+ return true;
}
/* listen on socket - should happen after all other initializations */
void pooler_setup(void)
{
- if (cf_listen_addr && !fd_net)
- fd_net = create_net_socket(cf_listen_addr, cf_listen_port);
+ bool ok;
+ static int init_done = 0;
- if (cf_unix_socket_dir && *cf_unix_socket_dir && !fd_unix)
- fd_unix = create_unix_socket(cf_unix_socket_dir, cf_listen_port);
+ if (!init_done) {
+ /* remove socket on shutdown */
+ atexit(cleanup_sockets);
+ init_done = 1;
+ }
+
+ ok = parse_word_list(cf_listen_addr, parse_addr, NULL);
+ if (!ok)
+ fatal("failed to parse listen_addr list: %s", cf_listen_addr);
- if (!fd_net && !fd_unix)
+ if (cf_unix_socket_dir && *cf_unix_socket_dir)
+ create_unix_socket(cf_unix_socket_dir, cf_listen_port);
+
+ if (!statlist_count(&sock_list))
fatal("nowhere to listen on");
resume_pooler();
}
-/* retry previously failed suspend_pooler() / resume_pooler() */
-void per_loop_pooler_maint(void)
+bool for_each_pooler_fd(pooler_cb cbfunc, void *arg)
{
- if (pooler_active) {
- if ((fd_unix && !reg_unix) || (fd_net && !reg_net))
- resume_pooler();
- } else {
- if ((fd_unix && reg_unix) || (fd_net && reg_net))
- suspend_pooler();
+ struct List *el;
+ struct ListenSocket *ls;
+ bool ok;
+
+ statlist_for_each(el, &sock_list) {
+ ls = container_of(el, struct ListenSocket, node);
+ ok = cbfunc(arg, ls->fd, &ls->addr);
+ if (!ok)
+ return false;
}
+ return true;
}
}
}
+
/*
* PgAddr operations
*/
+int pga_port(const PgAddr *a)
+{
+ if (a->sa.sa_family == AF_INET6) {
+ return ntohs(a->sin6.sin6_port);
+ } else {
+ return ntohs(a->sin.sin_port);
+ }
+}
+
/* set family and port */
void pga_set(PgAddr *a, int af, int port)
{
- a->af = af;
- a->port = port;
+ memset(a, 0, sizeof(*a));
+ if (af == AF_INET6) {
+ a->sin6.sin6_family = af;
+ a->sin6.sin6_port = htons(port);
+ } else {
+ a->sin.sin_family = af;
+ a->sin.sin_port = htons(port);
+ }
}
/* copy sockaddr_in/in6 to PgAddr */
void pga_copy(PgAddr *a, const struct sockaddr *sa)
{
- const struct sockaddr_in *sa4;
- const struct sockaddr_in6 *sa6;
-
- a->af = sa->sa_family;
- switch (a->af) {
+ switch (sa->sa_family) {
case AF_INET:
- sa4 = (struct sockaddr_in *)sa;
- a->port = ntohs(sa4->sin_port);
- a->addr4 = sa4->sin_addr;
+ memcpy(&a->sin, sa, sizeof(a->sin));
break;
case AF_INET6:
- sa6 = (struct sockaddr_in6 *)sa;
- a->port = ntohs(sa6->sin6_port);
- a->addr6 = sa6->sin6_addr;
+ memcpy(&a->sin6, sa, sizeof(a->sin6));
break;
+ case AF_UNIX:
+ log_error("pga_copy: AF_UNIX copy not supported");
}
}
+static inline unsigned pga_family(const PgAddr *a)
+{
+ return a->sa.sa_family;
+}
+
/* convert pgaddr to string */
const char *pga_ntop(const PgAddr *a, char *dst, int dstlen)
{
- switch (a->af) {
+ switch (pga_family(a)) {
case AF_UNIX:
strlcpy(dst, "unix", dstlen);
return dst;
case AF_INET:
- return inet_ntop(a->af, &a->addr4, dst, dstlen);
+ return inet_ntop(AF_INET, &a->sin.sin_addr, dst, dstlen);
case AF_INET6:
- return inet_ntop(a->af, &a->addr6, dst, dstlen);
+ return inet_ntop(AF_INET6, &a->sin6.sin6_addr, dst, dstlen);
default:
return NULL;
}
}
if (strchr(s, ':')) {
pga_set(a, AF_INET6, port);
- res = inet_pton(AF_INET6, s, &a->addr6);
+ res = inet_pton(AF_INET6, s, &a->sin6.sin6_addr);
} else {
pga_set(a, AF_INET, port);
- res = inet_pton(AF_INET, s, &a->addr4);
+ res = inet_pton(AF_INET, s, &a->sin.sin_addr);
}
if (res == 0)
errno = EINVAL;
return res > 0;
}
+const char *pga_str(const PgAddr *a, char *dst, int dstlen)
+{
+ char buf[PGADDR_BUF];
+ if (!pga_ntop(a, buf, sizeof(buf)))
+ return NULL;
+ snprintf(dst, dstlen, "%s@%d", buf, pga_port(a));
+ return dst;
+}