]> granicus.if.org Git - pgbouncer/commitdiff
new features to SBuf
authorMarko Kreen <markokr@gmail.com>
Thu, 6 Dec 2007 09:11:07 +0000 (09:11 +0000)
committerMarko Kreen <markokr@gmail.com>
Thu, 6 Dec 2007 09:11:07 +0000 (09:11 +0000)
- struct field reorder to get aligned buffer
- support rewriting pkt header
- support listening for full pkt data
- clean some comments

include/bouncer.h
include/sbuf.h
src/client.c
src/objects.c
src/sbuf.c
src/server.c

index 022098c0c4b6fedb72b163ce5a81b5a823c05e3a..4753b0e5c916d26f0aa51cd7364f110f3cbc7b06 100644 (file)
@@ -257,6 +257,8 @@ struct PgSocket {
        SBuf            sbuf;           /* stream buffer, must be last */
 };
 
+#define PG_SOCKET_SIZE (sizeof(PgSocket) + cf_sbuf_len + SBUF_MAX_REWRITE)
+
 /* where to store old fd info during SHOW FDS result processing */
 #define tmp_sk_oldfd   request_time
 #define tmp_sk_linkfd  query_start
index 86d49fba43517fd77d26c148d389aa558ed93284..ded4c9d5b9732e37920c20949e9670b1a806b3e5 100644 (file)
  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  */
 
+/*
+ * event types for protocol handler
+ */
 typedef enum {
-       SBUF_EV_READ,
-       SBUF_EV_RECV_FAILED,
-       SBUF_EV_SEND_FAILED,
-       SBUF_EV_CONNECT_FAILED,
-       SBUF_EV_CONNECT_OK,
-       SBUF_EV_FLUSH
+       SBUF_EV_READ,           /* got new packet */
+       SBUF_EV_RECV_FAILED,    /* error */
+       SBUF_EV_SEND_FAILED,    /* error */
+       SBUF_EV_CONNECT_FAILED, /* error */
+       SBUF_EV_CONNECT_OK,     /* got connection */
+       SBUF_EV_FLUSH,          /* data is sent, buffer empty */
+       SBUF_EV_PKT_CALLBACK,   /* next part of pkt data */
 } SBufEvent;
 
+/*
+ * If less that this amount of data is pending, then
+ * prefer to merge it with next recv().
+ *
+ * It needs to be larger than data handler wants
+ * to see completely.  Generally just header,
+ * but currently also ServerParam pkt.
+ */
+#define SBUF_SMALL_PKT 64
+
+/*
+ * How much proto handler may want to enlarge the packet.
+ */
+#define SBUF_MAX_REWRITE 16
+
+/* fwd def */
 typedef struct SBuf SBuf;
 
 /* callback should return true if it used one of sbuf_prepare_* on sbuf,
    false if it used sbuf_pause(), sbuf_close() or simply wants to wait for
    next event loop (eg. too few data available). */
-typedef bool (*sbuf_proto_cb_t)(SBuf *sbuf,
-                               SBufEvent evtype,
-                               MBuf *mbuf,
-                               void *arg);
+typedef bool (*sbuf_cb_t)(SBuf *sbuf,
+                       SBufEvent evtype,
+                       MBuf *mbuf,
+                       void *arg);
 
 /* for some reason, libevent has no typedef for callback */
 typedef void (*sbuf_libevent_cb)(int, short, void *);
 
+/*
+ * Stream Buffer.
+ *
+ * Stream is divided to packets.  On each packet start
+ * protocol handler is called that decides what to do.
+ */
 struct SBuf {
-       /* libevent handle */
-       struct event ev;
-
-       /* protocol callback function */
-       sbuf_proto_cb_t proto_handler;
-       void *arg;
+       struct event ev;        /* libevent handle */
 
-       /* fd for this socket */
-       int sock;
+       bool is_unix;           /* is it unix socket */
+       bool wait_send;         /* debug var, otherwise useless */
+       uint8_t pkt_action;     /* method for handling current pkt */
 
-       /* dest SBuf for current packet */
-       SBuf *dst;
+       int sock;               /* fd for this socket */
 
-       int recv_pos;
-       int pkt_pos;
-       int send_pos;
+       int recv_pos;           /* end of received data */
+       int pkt_pos;            /* packet processing pos */
+       int send_pos;           /* how far is data sent */
 
        int pkt_remain;         /* total packet length remaining */
        int send_remain;        /* total data to be sent remaining */
 
-       unsigned pkt_skip:1;    /* if current packet should be skipped */
-       unsigned is_unix:1;     /* is it unix socket */
-       unsigned wait_send:1;   /* debug var, otherwise useless */
+       sbuf_cb_t proto_cb;     /* protocol callback */
+       void *proto_cb_arg;     /* extra arg to callback */
+
+       SBuf *dst;              /* target SBuf for current packet */
 
-       uint8_t buf[0];
+       uint8_t buf[0];         /* data buffer follows (cf_sbuf_len + SBUF_MAX_REWRITE) */
 };
 
 #define sbuf_socket(sbuf) ((sbuf)->sock)
 
-void sbuf_init(SBuf *sbuf, sbuf_proto_cb_t proto_fn, void *arg);
+void sbuf_init(SBuf *sbuf, sbuf_cb_t proto_fn, void *arg);
 void sbuf_accept(SBuf *sbuf, int read_sock, bool is_unix);
 void sbuf_connect(SBuf *sbuf, const PgAddr *addr, const char *unix_dir, int timeout_sec);
 
@@ -79,6 +101,7 @@ void sbuf_close(SBuf *sbuf);
 /* proto_fn can use those functions to order behaviour */
 void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, int amount);
 void sbuf_prepare_skip(SBuf *sbuf, int amount);
+void sbuf_prepare_fetch(SBuf *sbuf, int amount);
 
 bool sbuf_answer(SBuf *sbuf, const void *buf, int len);
 
@@ -94,3 +117,5 @@ static inline bool sbuf_is_empty(SBuf *sbuf)
                && sbuf->pkt_remain == 0;
 }
 
+bool sbuf_rewrite_header(SBuf *sbuf, int old_len,
+                        const uint8_t *new_hdr, int new_len);
index 44adc1c4c6bc96b8f6b16d4781c79dfcf82ce6a9..499b4125b43747fc25d43e2200d21c2f00e9e8de 100644 (file)
@@ -392,6 +392,9 @@ bool client_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data, void *arg)
        case SBUF_EV_FLUSH:
                /* client is not interested in it */
                break;
+       case SBUF_EV_PKT_CALLBACK:
+               /* unused ATM */
+               break;
        }
        return res;
 }
index de2bb517b11d5c2be5c63996b729c468bc08121c..5b83b64e0fee5f79d58bc04778bd05f3235c6899 100644 (file)
@@ -117,9 +117,9 @@ void init_objects(void)
 
 void init_caches(void)
 {
-       server_cache = objcache_create("server_cache", sizeof(PgSocket) + cf_sbuf_len, 8,
+       server_cache = objcache_create("server_cache", PG_SOCKET_SIZE, 8,
                                       construct_server, clean_socket);
-       client_cache = objcache_create("client_cache", sizeof(PgSocket) + cf_sbuf_len, 8,
+       client_cache = objcache_create("client_cache", PG_SOCKET_SIZE, 8,
                                       construct_client, clean_socket);
 }
 
index 967060983ec107a8be24d0f25d91d971c5fa0fe4..7d3f5c4238eb41c22555487f9cdc08d1921a1360 100644 (file)
 #define DO_RECV                false
 #define SKIP_RECV      true
 
-/*
- * If less that this amount of data is pending, then
- * prefer to merge it with next recv().
- *
- * It needs to be larger than data handler wants
- * to see completely.  Generally just header,
- * but currently also ServerParam pkt.
- */
-#define SMALL_PKT      64
+#define ACT_UNSET 0
+#define ACT_SEND 1
+#define ACT_SKIP 2
+#define ACT_CALL 3
+
 
 #define AssertSanity(sbuf) do { \
        Assert((sbuf)->send_pos >= 0); \
        Assert((sbuf)->send_pos <= (sbuf)->pkt_pos); \
        Assert((sbuf)->pkt_pos <= (sbuf)->recv_pos); \
-       Assert((sbuf)->recv_pos <= cf_sbuf_len); \
+       Assert((sbuf)->recv_pos <= cf_sbuf_len + SBUF_MAX_REWRITE); \
        Assert((sbuf)->pkt_remain >= 0); \
        Assert((sbuf)->send_remain >= 0); \
 } while (0)
@@ -71,11 +67,11 @@ static bool sbuf_call_proto(SBuf *sbuf, int event);
  *********************************/
 
 /* initialize SBuf with proto handler */
-void sbuf_init(SBuf *sbuf, sbuf_proto_cb_t proto_fn, void *arg)
+void sbuf_init(SBuf *sbuf, sbuf_cb_t proto_fn, void *arg)
 {
        memset(sbuf, 0, sizeof(*sbuf));
-       sbuf->arg = arg;
-       sbuf->proto_handler = proto_fn;
+       sbuf->proto_cb_arg = arg;
+       sbuf->proto_cb = proto_fn;
 }
 
 /* got new socket from accept() */
@@ -191,7 +187,7 @@ void sbuf_continue(SBuf *sbuf)
         * This is not true in ServerParameter case.
         */
        /*
-        * if (sbuf->recv_pos - sbuf->pkt_pos >= SMALL_PKT)
+        * if (sbuf->recv_pos - sbuf->pkt_pos >= SBUF_SMALL_PKT)
         *      do_recv = false;
         */
 
@@ -209,7 +205,7 @@ void sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb user_cb)
        AssertActive(sbuf);
 
        event_set(&sbuf->ev, sbuf->sock, EV_READ | EV_PERSIST,
-                 user_cb, sbuf->arg);
+                 user_cb, sbuf->proto_cb_arg);
        event_add(&sbuf->ev, NULL);
 }
 
@@ -224,7 +220,7 @@ void sbuf_close(SBuf *sbuf)
        sbuf->dst = NULL;
        sbuf->sock = 0;
        sbuf->pkt_pos = sbuf->pkt_remain = sbuf->recv_pos = 0;
-       sbuf->pkt_skip = sbuf->wait_send = 0;
+       sbuf->pkt_action = sbuf->wait_send = 0;
        sbuf->send_pos = sbuf->send_remain = 0;
 }
 
@@ -233,10 +229,10 @@ void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, int amount)
 {
        AssertActive(sbuf);
        Assert(sbuf->pkt_remain == 0);
-       Assert(sbuf->pkt_skip == 0 || sbuf->send_remain == 0);
+       Assert(sbuf->pkt_action == ACT_UNSET || sbuf->send_remain == 0);
        Assert(amount > 0);
 
-       sbuf->pkt_skip = 0;
+       sbuf->pkt_action = ACT_SEND;
        sbuf->pkt_remain = amount;
        sbuf->dst = dst;
 }
@@ -246,12 +242,25 @@ void sbuf_prepare_skip(SBuf *sbuf, int amount)
 {
        AssertActive(sbuf);
        Assert(sbuf->pkt_remain == 0);
-       Assert(sbuf->pkt_skip == 0 || sbuf->send_remain == 0);
+       Assert(sbuf->pkt_action == ACT_UNSET || sbuf->send_remain == 0);
        Assert(amount > 0);
 
-       sbuf->pkt_skip = 1;
+       sbuf->pkt_action = ACT_SKIP;
        sbuf->pkt_remain = amount;
-       sbuf->dst = NULL;
+       /* sbuf->dst = NULL; // fixme ?? */
+}
+
+/* proto_fn tells to skip some amount of bytes */
+void sbuf_prepare_fetch(SBuf *sbuf, int amount)
+{
+       AssertActive(sbuf);
+       Assert(sbuf->pkt_remain == 0);
+       Assert(sbuf->pkt_action == ACT_UNSET || sbuf->send_remain == 0);
+       Assert(amount > 0);
+
+       sbuf->pkt_action = ACT_CALL;
+       sbuf->pkt_remain = amount;
+       /* sbuf->dst = NULL; // fixme ?? */
 }
 
 /*************************
@@ -278,8 +287,14 @@ static bool sbuf_call_proto(SBuf *sbuf, int event)
        AssertSanity(sbuf);
        Assert(event != SBUF_EV_READ || avail > 0);
 
+       /* if pkt callback, limit only with current packet */
+       if (event == SBUF_EV_PKT_CALLBACK) {
+               if (avail > sbuf->pkt_remain)
+                       avail = sbuf->pkt_remain;
+       }
+
        mbuf_init(&mbuf, pos, avail);
-       res = sbuf->proto_handler(sbuf, event, &mbuf, sbuf->arg);
+       res = sbuf->proto_cb(sbuf, event, &mbuf, sbuf->proto_cb_arg);
 
        AssertSanity(sbuf);
        Assert(event != SBUF_EV_READ || !res || sbuf->sock > 0);
@@ -388,7 +403,7 @@ all_sent:
 static bool sbuf_process_pending(SBuf *sbuf)
 {
        int avail;
-       bool full = sbuf->recv_pos == cf_sbuf_len;
+       bool full = sbuf->recv_pos >= cf_sbuf_len;
        bool res;
 
        while (1) {
@@ -397,13 +412,13 @@ static bool sbuf_process_pending(SBuf *sbuf)
                /*
                 * Enough for now?
                 *
-                * The (avail <= SMALL_PKT) check is to avoid partial pkts.
+                * The (avail <= SBUF_SMALL_PKT) check is to avoid partial pkts.
                 * As SBuf should not assume knowledge about packets,
                 * the check is not done in !full case.  Packet handler can
                 * then still notify about partial packet by returning false.
                 */
                avail = sbuf->recv_pos - sbuf->pkt_pos;
-               if (avail == 0 || (full && avail <= SMALL_PKT))
+               if (avail == 0 || (full && avail <= SBUF_SMALL_PKT))
                        break;
 
                /*
@@ -419,20 +434,26 @@ static bool sbuf_process_pending(SBuf *sbuf)
                /* walk pkt, merge sends */
                if (avail > sbuf->pkt_remain)
                        avail = sbuf->pkt_remain;
-               if (!sbuf->pkt_skip) {
+
+               switch (sbuf->pkt_action) {
+               case ACT_SEND:
                        if (sbuf->send_remain == 0)
                                sbuf->send_pos = sbuf->pkt_pos;
                        sbuf->send_remain += avail;
-               }
-               sbuf->pkt_remain -= avail;
-               sbuf->pkt_pos += avail;
-
-               /* send data */
-               if (sbuf->pkt_skip) {
-                       res = sbuf_send_pending(sbuf);
+                       break;
+               case ACT_CALL:
+                       res = sbuf_call_proto(sbuf, SBUF_EV_PKT_CALLBACK);
                        if (!res)
                                return false;
+                       /* after callback, skip pkt */
+               case ACT_SKIP:
+                       res = sbuf_send_pending(sbuf);
+                       if (!res)
+                               return res;
+                       break;
                }
+               sbuf->pkt_remain -= avail;
+               sbuf->pkt_pos += avail;
        }
 
        return sbuf_send_pending(sbuf);
@@ -452,7 +473,7 @@ static void sbuf_try_resync(SBuf *sbuf)
 
        if (avail == 0) {
                sbuf->recv_pos = sbuf->pkt_pos = sbuf->send_pos = 0;
-       } else if (avail <= SMALL_PKT) {
+       } else if (avail <= SBUF_SMALL_PKT) {
                memmove(sbuf->buf, sbuf->buf + sbuf->send_pos, avail);
                sbuf->pkt_pos -= sbuf->send_pos;
                sbuf->send_pos = 0;
@@ -529,7 +550,7 @@ try_more:
        sbuf_try_resync(sbuf);
 
        /*
-        * here used to be if (free > SMALL_PKT) check
+        * here used to be if (free > SBUF_SMALL_PKT) check
         * but with skip_recv switch its should not be needed anymore.
         */
        free = cf_sbuf_len - sbuf->recv_pos;
@@ -557,7 +578,7 @@ skip_recv:
                return;
 
        /* if the buffer is full, there can be more data available */
-       if (sbuf->recv_pos == cf_sbuf_len)
+       if (sbuf->recv_pos >= cf_sbuf_len)
                goto try_more;
 
        /* clean buffer */
@@ -619,3 +640,26 @@ bool sbuf_answer(SBuf *sbuf, const void *buf, int len)
        return res == len;
 }
 
+bool sbuf_rewrite_header(SBuf *sbuf, int old_len,
+                        const uint8_t *new_hdr, int new_len)
+{
+       int avail = sbuf->recv_pos - sbuf->pkt_pos;
+       int diff = new_len - old_len;
+       uint8_t *pkt_pos = sbuf->buf + sbuf->pkt_pos;
+       uint8_t *old_pos = pkt_pos + old_len;
+       uint8_t *new_pos = pkt_pos + new_len;
+
+       AssertActive(sbuf);
+       Assert(old_len >= 0 && new_len >= 0);
+       Assert(diff <= SBUF_MAX_REWRITE);
+
+       /* overflow can be triggered by user by sending multiple Parse pkts */
+       if (sbuf->recv_pos + diff > cf_sbuf_len + SBUF_MAX_REWRITE)
+               return false;
+
+       memmove(new_pos, old_pos, avail - old_len);
+       memcpy(pkt_pos, new_hdr, new_len);
+       sbuf->recv_pos += diff;
+       return true;
+}
+
index b06d3e0666e2f6ab63d633f0d042b80357fc3818..9c9e92c2eaeee57c695cd6dde7099c5ba1d429e0 100644 (file)
@@ -357,6 +357,9 @@ bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data, void *arg)
                        }
                }
                break;
+       case SBUF_EV_PKT_CALLBACK:
+               slog_warning(server, "SBUF_EV_PKT_CALLBACK with state=%d", server->state);
+               break;
        }
        return res;
 }