]> granicus.if.org Git - pgbouncer/commitdiff
draft lazy iobuf handling
authorMarko Kreen <markokr@gmail.com>
Wed, 23 Jan 2008 09:43:34 +0000 (09:43 +0000)
committerMarko Kreen <markokr@gmail.com>
Wed, 23 Jan 2008 09:43:34 +0000 (09:43 +0000)
include/bouncer.h
include/iobuf.h
include/objects.h
include/sbuf.h
src/admin.c
src/main.c
src/objects.c
src/sbuf.c

index cac06555c6c4ea04874d3ae6eebeadd5b6864475..42e7344269508a2628f66c7e582f587cbfc9a863 100644 (file)
@@ -275,8 +275,8 @@ struct PgSocket {
        SBuf sbuf;              /* stream buffer, must be last */
 };
 
-#define RAW_SOCKET_SIZE offsetof(struct PgSocket, sbuf.io.buf)
-#define PG_SOCKET_SIZE (RAW_SOCKET_SIZE + cf_sbuf_len + SBUF_MAX_REWRITE)
+#define RAW_IOBUF_SIZE offsetof(IOBuf, buf)
+#define IOBUF_SIZE     (RAW_IOBUF_SIZE + cf_sbuf_len)
 
 /* where to store old fd info during SHOW FDS result processing */
 #define tmp_sk_oldfd   request_time
index 3197c730028d36d13c83fc3af1a3538f6e7a3583..42574b3f8ff9f4d0cd5f4410b56a1045c3d56ffe 100644 (file)
@@ -56,15 +56,15 @@ typedef struct iobuf IOBuf;
 
 static inline bool iobuf_sane(const IOBuf *io)
 {
-       return io->done_pos >= 0
+       return (io == NULL) || (io->done_pos >= 0
                && io->parse_pos >= io->done_pos
                && io->recv_pos >= io->parse_pos
-               && cf_sbuf_len >= io->recv_pos;
+               && cf_sbuf_len >= io->recv_pos);
 }
 
-static inline bool iobuf_empty(const IOBuf *buf)
+static inline bool iobuf_empty(const IOBuf *io)
 {
-       return buf->done_pos == buf->recv_pos;
+       return io == NULL || io->done_pos == io->recv_pos;
 }
 
 /* unsent amount */
index 92f44a78dc3a819fe0ebf5f262ab14b8478df1fa..374e22bcdb34b58d3222b0909f766e769cddf4cb 100644 (file)
@@ -26,6 +26,7 @@ extern ObjectCache *server_cache;
 extern ObjectCache *db_cache;
 extern ObjectCache *pool_cache;
 extern ObjectCache *user_cache;
+extern ObjectCache *iobuf_cache;
 
 PgDatabase *find_database(const char *name);
 PgUser *find_user(const char *name);
index d2b695691161b7124b62eab7143aeedb19e0bc3b..eaa7e2c6ab394f391284d7eee7d865db786ace93 100644 (file)
@@ -39,11 +39,6 @@ typedef enum {
  */
 #define SBUF_SMALL_PKT 64
 
-/*
- * How much proto handler may want to enlarge the packet.
- */
-#define SBUF_MAX_REWRITE 16
-
 /* fwd def */
 typedef struct SBuf SBuf;
 
@@ -80,11 +75,9 @@ struct SBuf {
 
        SBuf *dst;              /* target SBuf for current packet */
 
-       IOBuf io;
+       IOBuf *io;
 };
 
-#define RAW_SBUF_SIZE offsetof(struct SBuf, io.buf)
-
 #define sbuf_socket(sbuf) ((sbuf)->sock)
 
 void sbuf_init(SBuf *sbuf, sbuf_cb_t proto_fn, void *arg);
@@ -110,7 +103,7 @@ bool sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb cb)  _MUSTCHECK;
  */
 static inline bool sbuf_is_empty(SBuf *sbuf)
 {
-       return iobuf_empty(&sbuf->io) && sbuf->pkt_remain == 0;
+       return iobuf_empty(sbuf->io) && sbuf->pkt_remain == 0;
 }
 
 static inline bool sbuf_is_closed(SBuf *sbuf)
@@ -118,6 +111,4 @@ static inline bool sbuf_is_closed(SBuf *sbuf)
        return sbuf->sock == 0;
 }
 
-bool sbuf_rewrite_header(SBuf *sbuf, int old_len,
-                        const uint8_t *new_hdr, int new_len)  _MUSTCHECK;
 
index 65e26bc10d1cae84a55daa441e870b54f7416583..d9431ad9c9a4d47090c639c5d50979391702c853 100644 (file)
@@ -501,8 +501,8 @@ static void adr2txt(const PgAddr *adr, char *dst, int dstlen)
 
 static void socket_row(PktBuf *buf, PgSocket *sk, const char *state, bool debug)
 {
-       int pkt_avail = iobuf_amount_parse(&sk->sbuf.io);
-       int send_avail = iobuf_amount_pending(&sk->sbuf.io);
+       int pkt_avail = iobuf_amount_parse(sk->sbuf.io);
+       int send_avail = iobuf_amount_pending(sk->sbuf.io);
        char ptrbuf[128], linkbuf[128];
        char l_addr[32], r_addr[32];
 
@@ -524,10 +524,10 @@ static void socket_row(PktBuf *buf, PgSocket *sk, const char *state, bool debug)
                             sk->connect_time,
                             sk->request_time,
                             ptrbuf, linkbuf,
-                            sk->sbuf.io.recv_pos,
-                            sk->sbuf.io.parse_pos,
+                            sk->sbuf.io->recv_pos,
+                            sk->sbuf.io->parse_pos,
                             sk->sbuf.pkt_remain,
-                            sk->sbuf.io.done_pos,
+                            sk->sbuf.io->done_pos,
                             0,
                             pkt_avail, send_avail);
 }
index 0dce3d9c633668766224279565625d0f249e383f..aa9afe39a9e4ec6b972dcc867e8bc766a01a6961 100644 (file)
@@ -471,9 +471,9 @@ static void check_limits(void)
        List *item;
        PgDatabase *db;
 
-       log_noise("event: %d, SBuf: %d, PgSocket: %d, Full PgSocket: %d",
-                 (int)sizeof(struct event), (int)RAW_SBUF_SIZE,
-                 (int)RAW_SOCKET_SIZE, (int)PG_SOCKET_SIZE);
+       log_noise("event: %d, SBuf: %d, PgSocket: %d, IOBuf: %d",
+                 (int)sizeof(struct event), (int)sizeof(SBuf),
+                 (int)sizeof(PgSocket), (int)IOBUF_SIZE);
 
        /* load limits */
        err = getrlimit(RLIMIT_NOFILE, &lim);
index a170f8fbfb9d9207a2dee9242d985a565db71f13..c99e5ef4d04a95ae2dce4ffc0725f1c23cfeb0b7 100644 (file)
@@ -41,6 +41,7 @@ ObjectCache *client_cache;
 ObjectCache *db_cache;
 ObjectCache *pool_cache;
 ObjectCache *user_cache;
+ObjectCache *iobuf_cache;
 
 /*
  * libevent may still report events when event_del()
@@ -125,13 +126,21 @@ void init_objects(void)
                fatal("cannot create initial caches");
 }
 
+static void do_iobuf_reset(void *arg)
+{
+       IOBuf *io = arg;
+       iobuf_reset(io);
+}
+
 /* initialization after config loading */
 void init_caches(void)
 {
-       server_cache = objcache_create("server_cache", PG_SOCKET_SIZE, 8,
+       server_cache = objcache_create("server_cache", sizeof(PgSocket), 0,
                                       construct_server, clean_socket);
-       client_cache = objcache_create("client_cache", PG_SOCKET_SIZE, 8,
+       client_cache = objcache_create("client_cache", sizeof(PgSocket), 0,
                                       construct_client, clean_socket);
+       iobuf_cache = objcache_create("iobuf_cache", IOBUF_SIZE, 0,
+                                     do_iobuf_reset, do_iobuf_reset);
 }
 
 /* state change means moving between lists */
index 8e6c62255cf51fe296c68547a4c8e9cf7abdf042..f97cffa6908e99cc3490e5db622edfd22f4a78db 100644 (file)
@@ -37,7 +37,7 @@
 
 
 #define AssertSanity(sbuf) do { \
-       Assert(iobuf_sane(&(sbuf)->io)); \
+       Assert(iobuf_sane((sbuf)->io)); \
        Assert((sbuf)->pkt_remain >= 0); \
 } while (0)
 
@@ -53,14 +53,14 @@ static bool sbuf_process_pending(SBuf *sbuf) _MUSTCHECK;
 static void sbuf_connect_cb(int sock, short flags, void *arg);
 static void sbuf_recv_cb(int sock, short flags, void *arg);
 static void sbuf_send_cb(int sock, short flags, void *arg);
-static void sbuf_try_resync(SBuf *sbuf);
+static void sbuf_try_resync(SBuf *sbuf, bool release);
 static bool sbuf_wait_for_data(SBuf *sbuf) _MUSTCHECK;
 static void sbuf_main_loop(SBuf *sbuf, bool skip_recv);
 static bool sbuf_call_proto(SBuf *sbuf, int event) /* _MUSTCHECK */;
 static bool sbuf_actual_recv(SBuf *sbuf, int len)  _MUSTCHECK;
 static bool sbuf_after_connect_check(SBuf *sbuf)  _MUSTCHECK;
 
-static inline IOBuf *get_iobuf(SBuf *sbuf) { return &sbuf->io; }
+static inline IOBuf *get_iobuf(SBuf *sbuf) { return sbuf->io; }
 
 /*********************************
  * Public functions
@@ -69,7 +69,7 @@ static inline IOBuf *get_iobuf(SBuf *sbuf) { return &sbuf->io; }
 /* initialize SBuf with proto handler */
 void sbuf_init(SBuf *sbuf, sbuf_cb_t proto_fn, void *arg)
 {
-       memset(sbuf, 0, RAW_SBUF_SIZE);
+       memset(sbuf, 0, sizeof(SBuf));
        sbuf->proto_cb_arg = arg;
        sbuf->proto_cb = proto_fn;
 }
@@ -79,7 +79,7 @@ bool sbuf_accept(SBuf *sbuf, int sock, bool is_unix)
 {
        bool res;
 
-       Assert(iobuf_empty(&sbuf->io) && sbuf->sock == 0);
+       Assert(iobuf_empty(sbuf->io) && sbuf->sock == 0);
        AssertSanity(sbuf);
 
        tune_socket(sock, is_unix);
@@ -112,7 +112,7 @@ bool sbuf_connect(SBuf *sbuf, const PgAddr *addr, const char *unix_dir, int time
        socklen_t len;
        struct timeval timeout;
 
-       Assert(iobuf_empty(&sbuf->io) && sbuf->sock == 0);
+       Assert(iobuf_empty(sbuf->io) && sbuf->sock == 0);
        AssertSanity(sbuf);
 
        /* prepare sockaddr */
@@ -253,7 +253,10 @@ bool sbuf_close(SBuf *sbuf)
        sbuf->sock = 0;
        sbuf->pkt_remain = 0;
        sbuf->pkt_action = sbuf->wait_send = 0;
-       iobuf_reset(get_iobuf(sbuf));
+       if (sbuf->io) {
+               obj_free(iobuf_cache, sbuf->io);
+               sbuf->io = NULL;
+       }
        return true;
 }
 
@@ -312,7 +315,7 @@ void sbuf_prepare_fetch(SBuf *sbuf, int amount)
 static bool sbuf_call_proto(SBuf *sbuf, int event)
 {
        MBuf mbuf;
-       IOBuf *io = get_iobuf(sbuf);
+       IOBuf *io = sbuf->io;
        bool res;
 
        AssertSanity(sbuf);
@@ -321,8 +324,10 @@ static bool sbuf_call_proto(SBuf *sbuf, int event)
        /* if pkt callback, limit only with current packet */
        if (event == SBUF_EV_PKT_CALLBACK)
                iobuf_parse_limit(io, &mbuf, sbuf->pkt_remain);
-       else
+       else if (event == SBUF_EV_READ)
                iobuf_parse_all(io, &mbuf);
+       else
+               memset(&mbuf, 0, sizeof(mbuf));
 
        res = sbuf->proto_cb(sbuf, event, &mbuf, sbuf->proto_cb_arg);
 
@@ -405,10 +410,10 @@ static bool sbuf_queue_send(SBuf *sbuf)
 static bool sbuf_send_pending(SBuf *sbuf)
 {
        int res, avail;
-       IOBuf *io = get_iobuf(sbuf);
+       IOBuf *io = sbuf->io;
 
        AssertActive(sbuf);
-       Assert(sbuf->dst || iobuf_amount_pending(&sbuf->io) == 0);
+       Assert(sbuf->dst || iobuf_amount_pending(io) == 0);
 
 try_more:
        /* how much data is available for sending */
@@ -447,7 +452,7 @@ try_more:
 static bool sbuf_process_pending(SBuf *sbuf)
 {
        int avail;
-       IOBuf *io = get_iobuf(sbuf);
+       IOBuf *io = sbuf->io;
        bool full = iobuf_amount_recv(io) <= 0;
        bool res;
 
@@ -508,20 +513,30 @@ static bool sbuf_process_pending(SBuf *sbuf)
 }
 
 /* reposition at buffer start again */
-static void sbuf_try_resync(SBuf *sbuf)
+static void sbuf_try_resync(SBuf *sbuf, bool release)
 {
-       IOBuf *io = get_iobuf(sbuf);
+       IOBuf *io = sbuf->io;
 
+       if (io)
+               log_debug("reync: done=%d, parse=%d, recv=%d",
+                         io->done_pos, io->parse_pos, io->recv_pos);
        AssertActive(sbuf);
 
-       iobuf_try_resync(io, SBUF_SMALL_PKT);
+       if (!io)
+               return;
+
+       if (release && iobuf_empty(io)) {
+               obj_free(iobuf_cache, io);
+               sbuf->io = NULL;
+       } else
+               iobuf_try_resync(io, SBUF_SMALL_PKT);
 }
 
 /* actually ask kernel for more data */
 static bool sbuf_actual_recv(SBuf *sbuf, int len)
 {
        int got;
-       IOBuf *io = get_iobuf(sbuf);
+       IOBuf *io = sbuf->io;
 
        AssertActive(sbuf);
        Assert(len > 0);
@@ -547,6 +562,19 @@ static void sbuf_recv_cb(int sock, short flags, void *arg)
        sbuf_main_loop(sbuf, DO_RECV);
 }
 
+static bool allocate_iobuf(SBuf *sbuf)
+{
+       if (sbuf->io == NULL) {
+               sbuf->io = obj_alloc(iobuf_cache);
+               if (sbuf->io == NULL) {
+                       sbuf_call_proto(sbuf, SBUF_EV_RECV_FAILED);
+                       return false;
+               }
+               iobuf_reset(sbuf->io);
+       }
+       return true;
+}
+
 /*
  * Main recv-parse-send-repeat loop.
  *
@@ -559,7 +587,6 @@ static void sbuf_recv_cb(int sock, short flags, void *arg)
 static void sbuf_main_loop(SBuf *sbuf, bool skip_recv)
 {
        int free, ok;
-       IOBuf *io = get_iobuf(sbuf);
 
        /* sbuf was closed before in this event loop */
        if (!sbuf->sock)
@@ -569,19 +596,22 @@ static void sbuf_main_loop(SBuf *sbuf, bool skip_recv)
        Assert(sbuf->wait_send == 0);
        AssertSanity(sbuf);
 
+       if (!allocate_iobuf(sbuf))
+               return;
+
        /* avoid recv() if asked */
        if (skip_recv)
                goto skip_recv;
 
 try_more:
        /* make room in buffer */
-       sbuf_try_resync(sbuf);
+       sbuf_try_resync(sbuf, false);
 
        /*
         * here used to be if (free > SBUF_SMALL_PKT) check
         * but with skip_recv switch its should not be needed anymore.
         */
-       free = iobuf_amount_recv(io);
+       free = iobuf_amount_recv(sbuf->io);
        if (free > 0) {
                /*
                 * When suspending, try to hit packet boundary ASAP.
@@ -606,11 +636,11 @@ skip_recv:
                return;
 
        /* if the buffer is full, there can be more data available */
-       if (iobuf_amount_recv(io) <= 0)
+       if (iobuf_amount_recv(sbuf->io) <= 0)
                goto try_more;
 
        /* clean buffer */
-       sbuf_try_resync(sbuf);
+       sbuf_try_resync(sbuf, true);
 
        /* notify proto that all is sent */
        if (sbuf_is_empty(sbuf))
@@ -669,28 +699,3 @@ bool sbuf_answer(SBuf *sbuf, const void *buf, int len)
        return res == len;
 }
 
-bool sbuf_rewrite_header(SBuf *sbuf, int old_len,
-                        const uint8_t *new_hdr, int new_len)
-{
-#if 0
-       int avail = sbuf->recv_pos - sbuf->pkt_pos;
-       int diff = new_len - old_len;
-       uint8_t *pkt_pos = sbuf->buf + sbuf->pkt_pos;
-       uint8_t *old_pos = pkt_pos + old_len;
-       uint8_t *new_pos = pkt_pos + new_len;
-
-       AssertActive(sbuf);
-       Assert(old_len >= 0 && new_len >= 0);
-       Assert(diff <= SBUF_MAX_REWRITE);
-
-       /* overflow can be triggered by user by sending multiple Parse pkts */
-       if (sbuf->recv_pos + diff > cf_sbuf_len + SBUF_MAX_REWRITE)
-               return false;
-
-       memmove(new_pos, old_pos, avail - old_len);
-       memcpy(pkt_pos, new_hdr, new_len);
-       sbuf->recv_pos += diff;
-#endif
-       return false;
-}
-