]> granicus.if.org Git - pgbouncer/blob - src/pooler.c
Avoid fatal() in tune_socket()
[pgbouncer] / src / pooler.c
1 /*
2  * PgBouncer - Lightweight connection pooler for PostgreSQL.
3  * 
4  * Copyright (c) 2007-2009  Marko Kreen, Skype Technologies OÜ
5  * 
6  * Permission to use, copy, modify, and/or distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  * 
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18
19 /*
20  * Handling of pooler listening sockets
21  */
22
23 #include "bouncer.h"
24
25 static int fd_net = 0;
26 static int fd_unix = 0;
27
28 static struct event ev_net;
29 static struct event ev_unix;
30
31 /* if sockets are registered in libevent */
32 static bool reg_net = false;
33 static bool reg_unix = false;
34
35 /* should listening sockets be active or suspended? */
36 static bool pooler_active = false;
37
38 /* on accept() failure sleep 5 seconds */
39 static struct event ev_err;
40 static struct timeval err_timeout = {5, 0};
41
42 /* atexit() cleanup func */
43 static void cleanup_unix_socket(void)
44 {
45         char fn[256];
46
47         /* avoid cleanup if exit() while suspended */
48         if (!reg_unix)
49                 return;
50
51         snprintf(fn, sizeof(fn), "%s/.s.PGSQL.%d",
52                         cf_unix_socket_dir, cf_listen_port);
53         unlink(fn);
54 }
55
56 void get_pooler_fds(int *p_net, int *p_unix)
57 {
58         *p_net = fd_net;
59         *p_unix = fd_unix;
60 }
61
62 static int create_unix_socket(const char *socket_dir, int listen_port)
63 {
64         struct sockaddr_un un;
65         int res, sock;
66         char lockfile[256];
67         struct stat st;
68
69         /* fill sockaddr struct */
70         memset(&un, 0, sizeof(un));
71         un.sun_family = AF_UNIX;
72         snprintf(un.sun_path, sizeof(un.sun_path),
73                 "%s/.s.PGSQL.%d", socket_dir, listen_port);
74
75         /* check for lockfile */
76         snprintf(lockfile, sizeof(lockfile), "%s.lock", un.sun_path);
77         res = lstat(lockfile, &st);
78         if (res == 0)
79                 fatal("unix port %d is in use", listen_port);
80
81         /* expect old bouncer gone */
82         unlink(un.sun_path);
83
84         /* create socket */
85         sock = socket(PF_UNIX, SOCK_STREAM, 0);
86         if (sock < 0)
87                 fatal_perror("socket");
88
89         /* bind it */
90         res = bind(sock, (const struct sockaddr *)&un, sizeof(un));
91         if (res < 0)
92                 fatal_perror("bind");
93
94         /* remove socket on shutdown */
95         atexit(cleanup_unix_socket);
96
97         /* set common options */
98         if (!tune_socket(sock, true))
99                 fatal_perror("tune_socket");
100
101         /* finally, accept connections */
102         res = listen(sock, cf_listen_backlog);
103         if (res < 0)
104                 fatal_perror("listen");
105
106         res = chmod(un.sun_path, 0777);
107         if (res < 0)
108                 fatal_perror("chmod");
109
110         log_info("listening on unix:%s", un.sun_path);
111
112         return sock;
113 }
114
115 /*
116  * Notify pooler only when also data is arrived.
117  *
118  * optval specifies how long after connection attempt to wait for data.
119  *
120  * Related to tcp_synack_retries sysctl, default 5 (corresponds 180 secs).
121  *
122  * SO_ACCEPTFILTER needs to be set after listern(), maybe TCP_DEFER_ACCEPT too.
123  */
124 static void tune_accept(int sock, bool on)
125 {
126         const char *act = on ? "install" : "uninstall";
127         int res = 0;
128 #ifdef TCP_DEFER_ACCEPT
129         int val = 45; /* fixme: proper value */
130         socklen_t vlen = sizeof(val);
131         res = getsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, &vlen);
132         log_noise("old TCP_DEFER_ACCEPT on %d = %d", sock, val);
133         val = on ? 1 : 0;
134         log_noise("%s TCP_DEFER_ACCEPT on %d", act, sock);
135         res = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, sizeof(val));
136 #else
137 #if 0
138 #ifdef SO_ACCEPTFILTER
139         struct accept_filter_arg af, *afp = on ? &af : NULL;
140         socklen_t af_len = on ? sizeof(af) : 0;
141         memset(&af, 0, sizeof(af));
142         strcpy(af.af_name, "dataready");
143         log_noise("%s SO_ACCEPTFILTER on %d", act, sock);
144         res = setsockopt(sock, SOL_SOCKET, SO_ACCEPTFILTER, afp, af_len);
145 #endif
146 #endif
147 #endif
148         if (res < 0)
149                 log_warning("tune_accept: %s TCP_DEFER_ACCEPT/SO_ACCEPTFILTER: %s",
150                             act, strerror(errno));
151 }
152
153 void pooler_tune_accept(bool on)
154 {
155         if (fd_net > 0)
156                 tune_accept(fd_net, on);
157 }
158
159 static int create_net_socket(const char *listen_addr, int listen_port)
160 {
161         int sock;
162         struct sockaddr_in sa;
163         int res;
164         int val;
165
166         /* create socket */
167         sock = socket(AF_INET, SOCK_STREAM, 0);
168         if (sock < 0)
169                 fatal_perror("socket");
170
171         /* parse address */
172         memset(&sa, 0, sizeof(sa));
173         sa.sin_family = AF_INET;
174         sa.sin_port = htons(cf_listen_port);
175         if (strcmp(listen_addr, "*") == 0) {
176                 sa.sin_addr.s_addr = htonl(INADDR_ANY);
177         } else {
178                 sa.sin_addr.s_addr = inet_addr(listen_addr);
179                 if (sa.sin_addr.s_addr == INADDR_NONE)
180                         fatal("cannot parse addr: '%s'", listen_addr);
181         }
182
183         /* relaxed binding */
184         val = 1;
185         res = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
186         if (res < 0)
187                 fatal_perror("setsockopt");
188
189         /* bind to address */
190         res = bind(sock, (struct sockaddr *)&sa, sizeof(sa));
191         if (res < 0)
192                 fatal_perror("bind");
193
194         /* set common options */
195         if (!tune_socket(sock, false))
196                 fatal_perror("tune_socket");
197
198         /* make it accept connections */
199         res = listen(sock, cf_listen_backlog);
200         if (res < 0)
201                 fatal_perror("listen");
202
203         tune_accept(sock, cf_tcp_defer_accept);
204
205         log_info("listening on %s:%d", cf_listen_addr, cf_listen_port);
206
207         return sock;
208 }
209
210 static void err_wait_func(int sock, short flags, void *arg)
211 {
212         if (cf_pause_mode != P_SUSPEND)
213                 resume_pooler();
214 }
215
216 static const char *addrpair(const PgAddr *src, const PgAddr *dst)
217 {
218         static char ip1buf[64], ip2buf[64], buf[256];
219         const char *ip1, *ip2;
220         if (src->is_unix)
221                 return "unix->unix";
222
223         ip1 = inet_ntop(AF_INET, &src->ip_addr, ip1buf, sizeof(ip1buf));
224         if (!ip1)
225                 ip1 = strerror(errno);
226         ip2 = inet_ntop(AF_INET, &dst->ip_addr, ip2buf, sizeof(ip2buf));
227         if (!ip2)
228                 ip2 = strerror(errno);
229         snprintf(buf, sizeof(buf), "%s:%d -> %s:%d",
230                  ip1, src->port, ip2, dst->port);
231         return buf;
232 }
233
234 static const char *conninfo(const PgSocket *sk)
235 {
236         if (is_server_socket(sk))
237                 return addrpair(&sk->local_addr, &sk->remote_addr);
238         else
239                 return addrpair(&sk->remote_addr, &sk->local_addr);
240 }
241
242 /* got new connection, associate it with client struct */
243 static void pool_accept(int sock, short flags, void *is_unix)
244 {
245         int fd;
246         PgSocket *client;
247         union {
248                 struct sockaddr_in in;
249                 struct sockaddr_un un;
250                 struct sockaddr sa;
251         } addr;
252         socklen_t len = sizeof(addr);
253
254         if(!(flags & EV_READ)) {
255                 log_warning("No EV_READ in pool_accept");
256                 return;
257         }
258 loop:
259         /* get fd */
260         fd = safe_accept(sock, &addr.sa, &len);
261         if (fd < 0) {
262                 if (errno == EAGAIN)
263                         return;
264                 else if (errno == ECONNABORTED)
265                         return;
266
267                 /*
268                  * probably fd limit, pointless to try often
269                  * wait a bit, hope that admin resolves somehow
270                  */
271                 log_error("accept() failed: %s", strerror(errno));
272                 evtimer_set(&ev_err, err_wait_func, NULL);
273                 safe_evtimer_add(&ev_err, &err_timeout);
274                 suspend_pooler();
275                 return;
276         }
277
278         log_noise("new fd from accept=%d", fd);
279         if (is_unix) {
280                 {
281                         uid_t uid;
282                         gid_t gid;
283                         log_noise("getuid(): %d", (int)getuid());
284                         if (getpeereid(fd, &uid, &gid) >= 0)
285                                 log_noise("unix peer uid: %d", (int)uid);
286                         else
287                                 log_warning("unix peer uid failed: %s", strerror(errno));
288                 }
289                 client = accept_client(fd, NULL, true);
290         } else {
291                 client = accept_client(fd, &addr.in, false);
292         }
293
294         if (client)
295                 slog_debug(client, "P: got connection: %s", conninfo(client));
296
297         /*
298          * there may be several clients waiting,
299          * avoid context switch by looping
300          */
301         goto loop;
302 }
303
304 bool use_pooler_socket(int sock, bool is_unix)
305 {
306         if (!tune_socket(sock, is_unix))
307                 return false;
308
309         if (is_unix)
310                 fd_unix = sock;
311         else
312                 fd_net = sock;
313         return true;
314 }
315
316 void suspend_pooler(void)
317 {
318         pooler_active = false;
319
320         if (fd_net && reg_net) {
321                 if (event_del(&ev_net) < 0) {
322                         log_warning("suspend_pooler, event_del: %s", strerror(errno));
323                         return;
324                 }
325                 reg_net = false;
326         }
327         if (fd_unix && reg_unix) {
328                 if (event_del(&ev_unix) < 0) {
329                         log_warning("suspend_pooler, event_del: %s", strerror(errno));
330                         return;
331                 }
332                 reg_unix = false;
333         }
334 }
335
336 void resume_pooler(void)
337 {
338         pooler_active = true;
339
340         if (fd_unix && !reg_unix) {
341                 event_set(&ev_unix, fd_unix, EV_READ | EV_PERSIST, pool_accept, "1");
342                 if (event_add(&ev_unix, NULL) < 0) {
343                         log_warning("event_add failed: %s", strerror(errno));
344                         return;
345                 }
346                 reg_unix = true;
347         }
348
349         if (fd_net && !reg_net) {
350                 event_set(&ev_net, fd_net, EV_READ | EV_PERSIST, pool_accept, NULL);
351                 if (event_add(&ev_net, NULL) < 0) {
352                         log_warning("event_add failed: %s", strerror(errno));
353                 }
354                 reg_net = true;
355         }
356 }
357
358 /* listen on socket - should happen after all other initializations */
359 void pooler_setup(void)
360 {
361         if (cf_listen_addr && !fd_net)
362                 fd_net = create_net_socket(cf_listen_addr, cf_listen_port);
363
364         if (*cf_unix_socket_dir && !fd_unix)
365                 fd_unix = create_unix_socket(cf_unix_socket_dir, cf_listen_port);
366
367         if (!fd_net && !fd_unix)
368                 fatal("nowhere to listen on");
369
370         resume_pooler();
371 }
372
373 /* retry previously failed suspend_pooler() / resume_pooler() */
374 void per_loop_pooler_maint(void)
375 {
376         if (pooler_active) {
377                 if ((fd_unix && !reg_unix) || (fd_net && !reg_net))
378                         resume_pooler();
379         } else {
380                 if ((fd_unix && reg_unix) || (fd_net && reg_net))
381                         suspend_pooler();
382         }
383 }
384