]> granicus.if.org Git - pgbouncer/blob - src/sbuf.c
New tunable 'sbuf_loopcnt' to limit time spent on one socket.
[pgbouncer] / src / sbuf.c
1 /*
2  * PgBouncer - Lightweight connection pooler for PostgreSQL.
3  * 
4  * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
5  * 
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  * 
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18
19 /*
20  * Stream buffer
21  *
22  * The task is to copy data from one socket to another
23  * efficiently, while allowing callbacks to look
24  * at packet headers.
25  */
26
27 #include "bouncer.h"
28
29 /* sbuf_main_loop() skip_recv values */
30 #define DO_RECV         false
31 #define SKIP_RECV       true
32
33 #define ACT_UNSET 0
34 #define ACT_SEND 1
35 #define ACT_SKIP 2
36 #define ACT_CALL 3
37
38
39 #define AssertSanity(sbuf) do { \
40         Assert(iobuf_sane((sbuf)->io)); \
41 } while (0)
42
43 #define AssertActive(sbuf) do { \
44         Assert((sbuf)->sock > 0); \
45         AssertSanity(sbuf); \
46 } while (0)
47
48 /* declare static stuff */
49 static bool sbuf_queue_send(SBuf *sbuf) _MUSTCHECK;
50 static bool sbuf_send_pending(SBuf *sbuf) _MUSTCHECK;
51 static bool sbuf_process_pending(SBuf *sbuf) _MUSTCHECK;
52 static void sbuf_connect_cb(int sock, short flags, void *arg);
53 static void sbuf_recv_cb(int sock, short flags, void *arg);
54 static void sbuf_send_cb(int sock, short flags, void *arg);
55 static void sbuf_try_resync(SBuf *sbuf, bool release);
56 static bool sbuf_wait_for_data(SBuf *sbuf) _MUSTCHECK;
57 static void sbuf_main_loop(SBuf *sbuf, bool skip_recv);
58 static bool sbuf_call_proto(SBuf *sbuf, int event) /* _MUSTCHECK */;
59 static bool sbuf_actual_recv(SBuf *sbuf, unsigned len)  _MUSTCHECK;
60 static bool sbuf_after_connect_check(SBuf *sbuf)  _MUSTCHECK;
61
62 static inline IOBuf *get_iobuf(SBuf *sbuf) { return sbuf->io; }
63
64 /*********************************
65  * Public functions
66  *********************************/
67
68 /* initialize SBuf with proto handler */
69 void sbuf_init(SBuf *sbuf, sbuf_cb_t proto_fn)
70 {
71         memset(sbuf, 0, sizeof(SBuf));
72         sbuf->proto_cb = proto_fn;
73 }
74
75 /* got new socket from accept() */
76 bool sbuf_accept(SBuf *sbuf, int sock, bool is_unix)
77 {
78         bool res;
79
80         Assert(iobuf_empty(sbuf->io) && sbuf->sock == 0);
81         AssertSanity(sbuf);
82
83         tune_socket(sock, is_unix);
84         sbuf->sock = sock;
85         sbuf->is_unix = is_unix;
86
87         if (!cf_reboot) {
88                 res = sbuf_wait_for_data(sbuf);
89                 if (!res) {
90                         sbuf_call_proto(sbuf, SBUF_EV_RECV_FAILED);
91                         return false;
92                 }
93                 /* socket should already have some data (linux only) */
94                 if (cf_tcp_defer_accept && !is_unix) {
95                         sbuf_main_loop(sbuf, DO_RECV);
96                         if (!sbuf->sock)
97                                 return false;
98                 }
99         }
100         return true;
101 }
102
103 /* need to connect() to get a socket */
104 bool sbuf_connect(SBuf *sbuf, const PgAddr *addr, const char *unix_dir, int timeout_sec)
105 {
106         int res, sock, domain;
107         struct sockaddr_in sa_in;
108         struct sockaddr_un sa_un;
109         struct sockaddr *sa;
110         socklen_t len;
111         struct timeval timeout;
112
113         Assert(iobuf_empty(sbuf->io) && sbuf->sock == 0);
114         AssertSanity(sbuf);
115
116         /* prepare sockaddr */
117         if (addr->is_unix) {
118                 sa = (void*)&sa_un;
119                 len = sizeof(sa_un);
120                 memset(sa, 0, len);
121                 sa_un.sun_family = AF_UNIX;
122                 snprintf(sa_un.sun_path, sizeof(sa_un.sun_path),
123                          "%s/.s.PGSQL.%d", unix_dir, addr->port);
124                 domain = AF_UNIX;
125         } else {
126                 sa = (void*)&sa_in;
127                 len = sizeof(sa_in);
128                 memset(sa, 0, len);
129                 sa_in.sin_family = AF_INET;
130                 sa_in.sin_addr = addr->ip_addr;
131                 sa_in.sin_port = htons(addr->port);
132                 domain = AF_INET;
133         }
134
135         /*
136          * common stuff
137          */
138         sock = socket(domain, SOCK_STREAM, 0);
139         if (sock < 0)
140                 /* probably fd limit */
141                 goto failed;
142
143         tune_socket(sock, addr->is_unix);
144
145         sbuf->is_unix = addr->is_unix;
146         sbuf->sock = sock;
147
148         timeout.tv_sec = timeout_sec;
149         timeout.tv_usec = 0;
150
151         /* launch connection */
152         res = safe_connect(sock, sa, len);
153         if (res == 0) {
154                 /* unix socket gives connection immidiately */
155                 sbuf_connect_cb(sock, EV_WRITE, sbuf);
156                 return true;
157         } else if (errno == EINPROGRESS) {
158                 /* tcp socket needs waiting */
159                 event_set(&sbuf->ev, sock, EV_WRITE, sbuf_connect_cb, sbuf);
160                 res = event_add(&sbuf->ev, &timeout);
161                 if (res >= 0)
162                         return true;
163         }
164
165 failed:
166         log_warning("sbuf_connect failed: %s", strerror(errno));
167
168         if (sock >= 0)
169                 safe_close(sock);
170         sbuf->sock = 0;
171         sbuf_call_proto(sbuf, SBUF_EV_CONNECT_FAILED);
172         return false;
173 }
174
175 /* don't wait for data on this socket */
176 bool sbuf_pause(SBuf *sbuf)
177 {
178         AssertActive(sbuf);
179         Assert(sbuf->wait_send == 0);
180
181         if (event_del(&sbuf->ev) < 0) {
182                 log_warning("event_del: %s", strerror(errno));
183                 return false;
184         }
185         return true;
186 }
187
188 /* resume from pause, start waiting for data */
189 void sbuf_continue(SBuf *sbuf)
190 {
191         bool do_recv = DO_RECV;
192         bool res;
193         AssertActive(sbuf);
194
195         res = sbuf_wait_for_data(sbuf);
196         if (!res) {
197                 /* drop if problems */
198                 sbuf_call_proto(sbuf, SBUF_EV_RECV_FAILED);
199                 return;
200         }
201
202         /*
203          * It's tempting to try to avoid the recv() but that would
204          * only work if no code wants to see full packet.
205          *
206          * This is not true in ServerParameter case.
207          */
208         /*
209          * if (sbuf->recv_pos - sbuf->pkt_pos >= SBUF_SMALL_PKT)
210          *      do_recv = false;
211          */
212
213         sbuf_main_loop(sbuf, do_recv);
214 }
215
216 /*
217  * Resume from pause and give socket over to external
218  * callback function.
219  *
220  * The callback will be called with arg given to sbuf_init.
221  */
222 bool sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb user_cb)
223 {
224         int err;
225
226         AssertActive(sbuf);
227
228         event_set(&sbuf->ev, sbuf->sock, EV_READ | EV_PERSIST,
229                   user_cb, sbuf);
230
231         err = event_add(&sbuf->ev, NULL);
232         if (err < 0) {
233                 log_warning("sbuf_continue_with_callback: %s", strerror(errno));
234                 return false;
235         }
236         return true;
237 }
238
239 /* socket cleanup & close */
240 bool sbuf_close(SBuf *sbuf)
241 {
242         /* keep handler & arg values */
243         if (sbuf->sock > 0) {
244                 if (event_del(&sbuf->ev) < 0) {
245                         log_warning("event_del: %s", strerror(errno));
246                         return false;
247                 }
248                 safe_close(sbuf->sock);
249         }
250         sbuf->dst = NULL;
251         sbuf->sock = 0;
252         sbuf->pkt_remain = 0;
253         sbuf->pkt_action = sbuf->wait_send = 0;
254         if (sbuf->io) {
255                 obj_free(iobuf_cache, sbuf->io);
256                 sbuf->io = NULL;
257         }
258         return true;
259 }
260
261 /* proto_fn tells to send some bytes to socket */
262 void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, unsigned amount)
263 {
264         AssertActive(sbuf);
265         Assert(sbuf->pkt_remain == 0);
266         //Assert(sbuf->pkt_action == ACT_UNSET || sbuf->pkt_action == ACT_SEND || iobuf_amount_pending(&sbuf->io));
267         Assert(amount > 0);
268
269         sbuf->pkt_action = ACT_SEND;
270         sbuf->pkt_remain = amount;
271         sbuf->dst = dst;
272 }
273
274 /* proto_fn tells to skip some amount of bytes */
275 void sbuf_prepare_skip(SBuf *sbuf, unsigned amount)
276 {
277         AssertActive(sbuf);
278         Assert(sbuf->pkt_remain == 0);
279         //Assert(sbuf->pkt_action == ACT_UNSET || iobuf_send_pending_avail(&sbuf->io));
280         Assert(amount > 0);
281
282         sbuf->pkt_action = ACT_SKIP;
283         sbuf->pkt_remain = amount;
284 }
285
286 /* proto_fn tells to skip some amount of bytes */
287 void sbuf_prepare_fetch(SBuf *sbuf, unsigned amount)
288 {
289         AssertActive(sbuf);
290         Assert(sbuf->pkt_remain == 0);
291         //Assert(sbuf->pkt_action == ACT_UNSET || iobuf_send_pending_avail(&sbuf->io));
292         Assert(amount > 0);
293
294         sbuf->pkt_action = ACT_CALL;
295         sbuf->pkt_remain = amount;
296         /* sbuf->dst = NULL; // fixme ?? */
297 }
298
299 /*************************
300  * Internal functions
301  *************************/
302
303 /*
304  * Call proto callback with proper MBuf.
305  *
306  * If callback returns true it used one of sbuf_prepare_* on sbuf,
307  * and processing can continue.
308  *
309  * If it returned false it used sbuf_pause(), sbuf_close() or simply
310  * wants to wait for next event loop (e.g. too few data available).
311  * Callee should not touch sbuf in that case and just return to libevent.
312  */
313 static bool sbuf_call_proto(SBuf *sbuf, int event)
314 {
315         MBuf mbuf;
316         IOBuf *io = sbuf->io;
317         bool res;
318
319         AssertSanity(sbuf);
320         Assert(event != SBUF_EV_READ || iobuf_amount_parse(io) > 0);
321
322         /* if pkt callback, limit only with current packet */
323         if (event == SBUF_EV_PKT_CALLBACK)
324                 iobuf_parse_limit(io, &mbuf, sbuf->pkt_remain);
325         else if (event == SBUF_EV_READ)
326                 iobuf_parse_all(io, &mbuf);
327         else
328                 memset(&mbuf, 0, sizeof(mbuf));
329
330         res = sbuf->proto_cb(sbuf, event, &mbuf);
331
332         AssertSanity(sbuf);
333         Assert(event != SBUF_EV_READ || !res || sbuf->sock > 0);
334
335         return res;
336 }
337
338 /* let's wait for new data */
339 static bool sbuf_wait_for_data(SBuf *sbuf)
340 {
341         int err;
342
343         event_set(&sbuf->ev, sbuf->sock, EV_READ | EV_PERSIST, sbuf_recv_cb, sbuf);
344         err = event_add(&sbuf->ev, NULL);
345         if (err < 0) {
346                 log_warning("sbuf_wait_for_data: event_add: %s", strerror(errno));
347                 return false;
348         }
349         return true;
350 }
351
352 /* libevent EV_WRITE: called when dest socket is writable again */
353 static void sbuf_send_cb(int sock, short flags, void *arg)
354 {
355         SBuf *sbuf = arg;
356         bool res;
357
358         /* sbuf was closed before in this loop */
359         if (!sbuf->sock)
360                 return;
361
362         AssertSanity(sbuf);
363         Assert(sbuf->wait_send);
364
365         /* prepare normal situation for sbuf_main_loop */
366         sbuf->wait_send = 0;
367         res = sbuf_wait_for_data(sbuf);
368         if (res) {
369                 /* here we should certainly skip recv() */
370                 sbuf_main_loop(sbuf, SKIP_RECV);
371         } else
372                 /* drop if problems */
373                 sbuf_call_proto(sbuf, SBUF_EV_SEND_FAILED);
374 }
375
376 /* socket is full, wait until it's writable again */
377 static bool sbuf_queue_send(SBuf *sbuf)
378 {
379         int err;
380         AssertActive(sbuf);
381
382         /* if false is returned, the socket will be closed later */
383
384         /* stop waiting for read events */
385         err = event_del(&sbuf->ev);
386         if (err < 0) {
387                 log_warning("sbuf_queue_send: event_del failed: %s", strerror(errno));
388                 return false;
389         }
390
391         /* instead wait for EV_WRITE on destination socket */
392         event_set(&sbuf->ev, sbuf->dst->sock, EV_WRITE, sbuf_send_cb, sbuf);
393         err = event_add(&sbuf->ev, NULL);
394         if (err < 0) {
395                 log_warning("sbuf_queue_send: event_add failed: %s", strerror(errno));
396                 return false;
397         }
398
399         sbuf->wait_send = 1;
400         return true;
401 }
402
403 /*
404  * There's data in buffer to be sent. Returns bool if processing can continue.
405  *
406  * Does not look at pkt_pos/remain fields, expects them to be merged to send_*
407  */
408 static bool sbuf_send_pending(SBuf *sbuf)
409 {
410         int res, avail;
411         IOBuf *io = sbuf->io;
412
413         AssertActive(sbuf);
414         Assert(sbuf->dst || iobuf_amount_pending(io) == 0);
415
416 try_more:
417         /* how much data is available for sending */
418         avail = iobuf_amount_pending(io);
419         if (avail == 0)
420                 return true;
421
422         if (sbuf->dst->sock == 0) {
423                 log_error("sbuf_send_pending: no dst sock?");
424                 return false;
425         }
426
427         /* actually send it */
428         res = iobuf_send_pending(io, sbuf->dst->sock);
429         if (res < 0) {
430                 if (errno == EAGAIN) {
431                         if (!sbuf_queue_send(sbuf))
432                                 /* drop if queue failed */
433                                 sbuf_call_proto(sbuf, SBUF_EV_SEND_FAILED);
434                 } else
435                         sbuf_call_proto(sbuf, SBUF_EV_SEND_FAILED);
436                 return false;
437         }
438
439         AssertActive(sbuf);
440
441         /*
442          * Should do sbuf_queue_send() immediately?
443          *
444          * To be sure, let's run into EAGAIN.
445          */
446         goto try_more;
447 }
448
449 /* process as much data as possible */
450 static bool sbuf_process_pending(SBuf *sbuf)
451 {
452         unsigned avail;
453         IOBuf *io = sbuf->io;
454         bool full = iobuf_amount_recv(io) <= 0;
455         bool res;
456
457         while (1) {
458                 AssertActive(sbuf);
459
460                 /*
461                  * Enough for now?
462                  *
463                  * The (avail <= SBUF_SMALL_PKT) check is to avoid partial pkts.
464                  * As SBuf should not assume knowledge about packets,
465                  * the check is not done in !full case.  Packet handler can
466                  * then still notify about partial packet by returning false.
467                  */
468                 avail = iobuf_amount_parse(io);
469                 if (avail == 0 || (full && avail <= SBUF_SMALL_PKT))
470                         break;
471
472                 /*
473                  * If start of packet, process packet header.
474                  */
475                 if (sbuf->pkt_remain == 0) {
476                         res = sbuf_call_proto(sbuf, SBUF_EV_READ);
477                         if (!res)
478                                 return false;
479                         Assert(sbuf->pkt_remain > 0);
480                 }
481
482                 if (sbuf->pkt_action == ACT_SKIP || sbuf->pkt_action == ACT_CALL) {
483                         /* send any pending data before skipping */
484                         if (iobuf_amount_pending(io) > 0) {
485                                 res = sbuf_send_pending(sbuf);
486                                 if (!res)
487                                         return res;
488                         }
489                 }
490
491                 if (avail > sbuf->pkt_remain)
492                         avail = sbuf->pkt_remain;
493
494                 switch (sbuf->pkt_action) {
495                 case ACT_SEND:
496                         iobuf_tag_send(io, avail);
497                         break;
498                 case ACT_CALL:
499                         res = sbuf_call_proto(sbuf, SBUF_EV_PKT_CALLBACK);
500                         if (!res)
501                                 return false;
502                         /* after callback, skip pkt */
503                 case ACT_SKIP:
504                         iobuf_tag_skip(io, avail);
505                         break;
506                 }
507                 sbuf->pkt_remain -= avail;
508         }
509
510         return sbuf_send_pending(sbuf);
511 }
512
513 /* reposition at buffer start again */
514 static void sbuf_try_resync(SBuf *sbuf, bool release)
515 {
516         IOBuf *io = sbuf->io;
517
518         if (io)
519                 log_noise("resync: done=%d, parse=%d, recv=%d",
520                           io->done_pos, io->parse_pos, io->recv_pos);
521         AssertActive(sbuf);
522
523         if (!io)
524                 return;
525
526         if (release && iobuf_empty(io)) {
527                 obj_free(iobuf_cache, io);
528                 sbuf->io = NULL;
529         } else
530                 iobuf_try_resync(io, SBUF_SMALL_PKT);
531 }
532
533 /* actually ask kernel for more data */
534 static bool sbuf_actual_recv(SBuf *sbuf, unsigned len)
535 {
536         int got;
537         IOBuf *io = sbuf->io;
538
539         AssertActive(sbuf);
540         Assert(len > 0);
541         Assert(iobuf_amount_recv(io) >= len);
542
543         got = iobuf_recv_limit(io, sbuf->sock, len);
544         if (got == 0) {
545                 /* eof from socket */
546                 sbuf_call_proto(sbuf, SBUF_EV_RECV_FAILED);
547                 return false;
548         } else if (got < 0 && errno != EAGAIN) {
549                 /* some error occured */
550                 sbuf_call_proto(sbuf, SBUF_EV_RECV_FAILED);
551                 return false;
552         }
553         return true;
554 }
555
556 /* callback for libevent EV_READ */
557 static void sbuf_recv_cb(int sock, short flags, void *arg)
558 {
559         SBuf *sbuf = arg;
560         sbuf_main_loop(sbuf, DO_RECV);
561 }
562
563 static bool allocate_iobuf(SBuf *sbuf)
564 {
565         if (sbuf->io == NULL) {
566                 sbuf->io = obj_alloc(iobuf_cache);
567                 if (sbuf->io == NULL) {
568                         sbuf_call_proto(sbuf, SBUF_EV_RECV_FAILED);
569                         return false;
570                 }
571                 iobuf_reset(sbuf->io);
572         }
573         return true;
574 }
575
576 /*
577  * Main recv-parse-send-repeat loop.
578  *
579  * Reason for skip_recv is to avoid extra recv().  The problem with it
580  * is EOF from socket.  Currently that means that the pending data is
581  * dropped.  Fortunately server sockets are not paused and dropping
582  * data from client is no problem.  So only place where skip_recv is
583  * important is sbuf_send_cb().
584  */
585 static void sbuf_main_loop(SBuf *sbuf, bool skip_recv)
586 {
587         unsigned free, ok;
588         int loopcnt = 0;
589
590         /* sbuf was closed before in this event loop */
591         if (!sbuf->sock)
592                 return;
593
594         /* reading should be disabled when waiting */
595         Assert(sbuf->wait_send == 0);
596         AssertSanity(sbuf);
597
598         if (!allocate_iobuf(sbuf))
599                 return;
600
601         /* avoid recv() if asked */
602         if (skip_recv)
603                 goto skip_recv;
604
605 try_more:
606         /* avoid spending too much time on single socket */
607         if (cf_sbuf_loopcnt > 0 && loopcnt >= cf_sbuf_loopcnt) {
608                 log_debug("loopcnt full");
609                 return;
610         }
611         loopcnt++;
612
613         /* make room in buffer */
614         sbuf_try_resync(sbuf, false);
615
616         /*
617          * here used to be if (free > SBUF_SMALL_PKT) check
618          * but with skip_recv switch its should not be needed anymore.
619          */
620         free = iobuf_amount_recv(sbuf->io);
621         if (free > 0) {
622                 /*
623                  * When suspending, try to hit packet boundary ASAP.
624                  */
625                 if (cf_pause_mode == P_SUSPEND
626                     && sbuf->pkt_remain > 0
627                     && sbuf->pkt_remain < free)
628                 {
629                         free = sbuf->pkt_remain;
630                 }
631
632                 /* now fetch the data */
633                 ok = sbuf_actual_recv(sbuf, free);
634                 if (!ok)
635                         return;
636         }
637
638 skip_recv:
639         /* now handle it */
640         ok = sbuf_process_pending(sbuf);
641         if (!ok)
642                 return;
643
644         /* if the buffer is full, there can be more data available */
645         if (iobuf_amount_recv(sbuf->io) <= 0)
646                 goto try_more;
647
648         /* clean buffer */
649         sbuf_try_resync(sbuf, true);
650
651         /* notify proto that all is sent */
652         if (sbuf_is_empty(sbuf))
653                 sbuf_call_proto(sbuf, SBUF_EV_FLUSH);
654 }
655
656 /* check if there is any error pending on socket */
657 static bool sbuf_after_connect_check(SBuf *sbuf)
658 {
659         int optval = 0, err;
660         socklen_t optlen = sizeof(optval);
661
662         err = getsockopt(sbuf->sock, SOL_SOCKET, SO_ERROR, (void*)&optval, &optlen);
663         if (err < 0) {
664                 log_debug("sbuf_after_connect_check: getsockopt: %s",
665                           strerror(errno));
666                 return false;
667         }
668         if (optval != 0) {
669                 log_debug("sbuf_after_connect_check: pending error: %s",
670                           strerror(optval));
671                 return false;
672         }
673         return true;
674 }
675
676 /* callback for libevent EV_WRITE when connecting */
677 static void sbuf_connect_cb(int sock, short flags, void *arg)
678 {
679         SBuf *sbuf = arg;
680
681         if (flags & EV_WRITE) {
682                 if (!sbuf_after_connect_check(sbuf))
683                         goto failed;
684                 if (!sbuf_call_proto(sbuf, SBUF_EV_CONNECT_OK))
685                         return;
686                 if (!sbuf_wait_for_data(sbuf))
687                         goto failed;
688                 return;
689         }
690 failed:
691         sbuf_call_proto(sbuf, SBUF_EV_CONNECT_FAILED);
692 }
693
694 /* send some data to listening socket */
695 bool sbuf_answer(SBuf *sbuf, const void *buf, unsigned len)
696 {
697         int res;
698         if (sbuf->sock <= 0)
699                 return false;
700         res = safe_send(sbuf->sock, buf, len, 0);
701         if (res < 0) {
702                 log_debug("sbuf_answer: error sending: %s", strerror(errno));
703         } else if ((unsigned)res != len)
704                 log_debug("sbuf_answer: partial send: len=%d sent=%d", len, res);
705         return (unsigned)res == len;
706 }
707