]> granicus.if.org Git - pgbouncer/commitdiff
<usual/mbuf.h>
authorMarko Kreen <markokr@gmail.com>
Mon, 11 Jan 2010 16:50:39 +0000 (18:50 +0200)
committerMarko Kreen <markokr@gmail.com>
Tue, 4 May 2010 11:30:50 +0000 (14:30 +0300)
14 files changed:
Makefile
include/bouncer.h
include/client.h
include/iobuf.h
include/mbuf.h [deleted file]
include/proto.h
include/sbuf.h
include/server.h
src/admin.c
src/client.c
src/proto.c
src/sbuf.c
src/server.c
src/takeover.c

index 1ddb63d6292d9e1b3f96e950a033e845891ebf56..5eeddc9923fc09611cffa486d88b087ad47bbd1b 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -5,7 +5,7 @@ SRCS = client.c loader.c objects.c pooler.c proto.c sbuf.c server.c util.c \
        varcache.c
 HDRS = client.h loader.h objects.h pooler.h proto.h sbuf.h server.h util.h \
        admin.h stats.h takeover.h janitor.h pktbuf.h system.h bouncer.h \
-       mbuf.h varcache.h iobuf.h
+       varcache.h iobuf.h
 
 # data & dirs to include in tgz
 DOCS = doc/overview.txt doc/usage.txt doc/config.txt doc/todo.txt
index b451544873cd6896911c99292cc9d8ac1cb062e7..72c614e73e5efed3ae770b28cb9fb5ffc34d824e 100644 (file)
@@ -32,6 +32,7 @@
 #include <usual/slab.h>
 #include <usual/socket.h>
 #include <usual/safeio.h>
+#include <usual/mbuf.h>
 
 #include <event.h>
 
@@ -80,7 +81,6 @@ typedef struct PktHdr PktHdr;
 extern int cf_sbuf_len;
 
 #include "util.h"
-#include "mbuf.h"
 #include "iobuf.h"
 #include "sbuf.h"
 #include "pktbuf.h"
index 6a4a13eed74b99fd8386b8126185c3830b42fcdc..aaa143144b3e75c0212e83e1bef1f8ede9e02c13 100644 (file)
@@ -16,7 +16,7 @@
  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  */
 
-bool client_proto(SBuf *sbuf, SBufEvent evtype, MBuf *pkt)  _MUSTCHECK;
+bool client_proto(SBuf *sbuf, SBufEvent evtype, struct MBuf *pkt)  _MUSTCHECK;
 bool set_pool(PgSocket *client, const char *dbname, const char *username) _MUSTCHECK;
 
 
index f4ff18026d31da3d5d9ddb9469961059126d65bb..41e3d101662d0d92acb302956c55a5dd482a5604 100644 (file)
@@ -84,22 +84,22 @@ static inline unsigned iobuf_amount_recv(const IOBuf *buf)
 }
 
 /* put all unparsed to mbuf */
-static inline unsigned iobuf_parse_all(const IOBuf *buf, MBuf *mbuf)
+static inline unsigned iobuf_parse_all(const IOBuf *buf, struct MBuf *mbuf)
 {
        unsigned avail = iobuf_amount_parse(buf);
        const uint8_t *pos = buf->buf + buf->parse_pos;
-       mbuf_init(mbuf, pos, avail);
+       mbuf_init_fixed_reader(mbuf, pos, avail);
        return avail;
 }
 
 /* put all unparsed to mbuf, with size limit */
-static inline unsigned iobuf_parse_limit(const IOBuf *buf, MBuf *mbuf, unsigned limit)
+static inline unsigned iobuf_parse_limit(const IOBuf *buf, struct MBuf *mbuf, unsigned limit)
 {
        unsigned avail = iobuf_amount_parse(buf);
        const uint8_t *pos = buf->buf + buf->parse_pos;
        if (avail > limit)
                avail = limit;
-       mbuf_init(mbuf, pos, avail);
+       mbuf_init_fixed_reader(mbuf, pos, avail);
        return avail;
 }
 
diff --git a/include/mbuf.h b/include/mbuf.h
deleted file mode 100644 (file)
index 97d8480..0000000
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * PgBouncer - Lightweight connection pooler for PostgreSQL.
- * 
- * Copyright (c) 2007-2009  Marko Kreen, Skype Technologies OÜ
- * 
- * Permission to use, copy, modify, and/or distribute this software for any
- * purpose with or without fee is hereby granted, provided that the above
- * copyright notice and this permission notice appear in all copies.
- * 
- * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- */
-
-/*
- * Safe and easy access to fixed memory buffer.
- */
-
-/*
- * FIXME: the code should be converted so that
- * the fatal()-s can be replaced by Asserts().
- */
-
-typedef struct MBuf MBuf;
-struct MBuf {
-       const uint8_t *data;
-       const uint8_t *end;
-       const uint8_t *pos;
-};
-
-static inline void mbuf_init(MBuf *buf, const uint8_t *ptr, int len)
-{
-       if (len < 0)
-               fatal("fuckup");
-       buf->data = buf->pos = ptr;
-       buf->end = ptr + len;
-}
-
-static inline uint8_t mbuf_get_char(MBuf *buf)
-{
-       if (buf->pos + 1 > buf->end)
-               fatal("buffer overflow");
-       return *buf->pos++;
-}
-
-static inline unsigned mbuf_get_uint16(MBuf *buf)
-{
-       unsigned val;
-       if (buf->pos + 2 > buf->end)
-               fatal("buffer overflow");
-       val = *buf->pos++;
-       val = (val << 8) | *buf->pos++;
-       return val;
-}
-
-static inline uint32_t mbuf_get_uint32(MBuf *buf)
-{
-       uint32_t val;
-       if (buf->pos + 4 > buf->end)
-               fatal("buffer overflow");
-       val = *buf->pos++;
-       val = (val << 8) | *buf->pos++;
-       val = (val << 8) | *buf->pos++;
-       val = (val << 8) | *buf->pos++;
-       return val;
-}
-
-static inline uint64_t mbuf_get_uint64(MBuf *buf)
-{
-       uint64_t i1, i2;
-       i1 = mbuf_get_uint32(buf);
-       i2 = mbuf_get_uint32(buf);
-       return (i1 << 32) | i2;
-}
-
-static inline const uint8_t * mbuf_get_bytes(MBuf *buf, unsigned len)
-{
-       const uint8_t *res = buf->pos;
-       if (buf->pos + len > buf->end)
-               fatal("buffer overflow");
-       buf->pos += len;
-       return res;
-}
-
-static inline unsigned mbuf_avail(const MBuf *buf)
-{
-       return buf->end - buf->pos;
-}
-
-static inline unsigned mbuf_size(const MBuf *buf)
-{
-       return buf->end - buf->data;
-}
-
-static inline const char * mbuf_get_string(MBuf *buf)
-{
-       const char *res = (const char *)buf->pos;
-       const uint8_t *nul = memchr(res, 0, mbuf_avail(buf));
-       if (!nul)
-               return NULL;
-       buf->pos = nul + 1;
-       return res;
-}
-
-static inline void mbuf_copy(const MBuf *src, MBuf *dst)
-{
-       *dst = *src;
-}
-
-static inline void mbuf_slice(MBuf *src, unsigned len, MBuf *dst)
-{
-       if (len > mbuf_avail(src))
-               fatal("buffer overflow");
-       mbuf_init(dst, src->pos, len);
-       src->pos += len;
-}
-
index 82a2c61129d089be3984440585ae3f538183b58c..f42aefd9508268cc2a0b6c8eedc673c3a95a75ed 100644 (file)
 struct PktHdr {
        unsigned type;
        unsigned len;
-       MBuf data;
+       struct MBuf data;
 };
 
-bool get_header(MBuf *data, PktHdr *pkt) _MUSTCHECK;
+bool get_header(struct MBuf *data, PktHdr *pkt) _MUSTCHECK;
 
 bool send_pooler_error(PgSocket *client, bool send_ready, const char *msg)  /*_MUSTCHECK*/;
 void log_server_error(const char *note, PktHdr *pkt);
@@ -46,12 +46,12 @@ bool answer_authreq(PgSocket *server, PktHdr *pkt) _MUSTCHECK;
 
 bool send_startup_packet(PgSocket *server) _MUSTCHECK;
 
-int scan_text_result(MBuf *pkt, const char *tupdesc, ...) _MUSTCHECK;
+int scan_text_result(struct MBuf *pkt, const char *tupdesc, ...) _MUSTCHECK;
 
 /* is packet completely in our buffer */
 static inline bool incomplete_pkt(const PktHdr *pkt)
 {
-       return mbuf_size(&pkt->data) != pkt->len;
+       return mbuf_written(&pkt->data) != pkt->len;
 }
 
 /* one char desc */
index 33af68330a700705b90b80874bf85cf8fa6015bc..ca2c472806c4e9dc85ef2764d800bfbb26779935 100644 (file)
@@ -47,7 +47,7 @@ typedef struct SBuf SBuf;
    next event loop (eg. too few data available). */
 typedef bool (*sbuf_cb_t)(SBuf *sbuf,
                        SBufEvent evtype,
-                       MBuf *mbuf);
+                       struct MBuf *mbuf);
 
 /* for some reason, libevent has no typedef for callback */
 typedef void (*sbuf_libevent_cb)(int, short, void *);
index 6cd0a6b8dc3042ac8b3705a1168644a306630e34..870a590bc82f4b6e040cc085ad7e17b0c2570a1a 100644 (file)
@@ -16,5 +16,5 @@
  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  */
 
-bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *pkt)  _MUSTCHECK;
+bool server_proto(SBuf *sbuf, SBufEvent evtype, struct MBuf *pkt)  _MUSTCHECK;
 
index 731b972bd39940ab0eaa446e03de9e8eddc4276e..21a6813eeec2a3df05f42c00da8f5bc1339226f7 100644 (file)
@@ -283,10 +283,13 @@ static bool send_one_fd(PgSocket *admin,
 static bool show_one_fd(PgSocket *admin, PgSocket *sk)
 {
        PgAddr *addr = &sk->remote_addr;
-       MBuf tmp;
+       struct MBuf tmp;
        VarCache *v = &sk->vars;
+       uint64_t ckey;
 
-       mbuf_init(&tmp, sk->cancel_key, 8);
+       mbuf_init_fixed_reader(&tmp, sk->cancel_key, 8);
+       if (!mbuf_get_uint64be(&tmp, &ckey))
+               return false;
 
        return send_one_fd(admin, sbuf_socket(&sk->sbuf),
                           is_server_socket(sk) ? "server" : "client",
@@ -294,7 +297,7 @@ static bool show_one_fd(PgSocket *admin, PgSocket *sk)
                           sk->pool ? sk->pool->db->name : NULL,
                           addr->is_unix ? "unix" : inet_ntoa(addr->ip_addr),
                           addr->port,
-                          mbuf_get_uint64(&tmp),
+                          ckey,
                           sk->link ? sbuf_socket(&sk->link->sbuf) : 0,
                           v->client_encoding[0] ? v->client_encoding : NULL,
                           v->std_strings[0] ? v->std_strings : NULL,
@@ -1061,8 +1064,7 @@ bool admin_handle_client(PgSocket *admin, PktHdr *pkt)
 
        switch (pkt->type) {
        case 'Q':
-               q = mbuf_get_string(&pkt->data);
-               if (!q) {
+               if (!mbuf_get_string(&pkt->data, &q)) {
                        disconnect_client(admin, true, "incomplete query");
                        return false;
                }
index ccf6edf9f50fae143a61763024225360b7a3b584..a5acf22cef53dc0af0811d512d2f25ee2c7672a8 100644 (file)
@@ -104,13 +104,14 @@ static bool decide_startup_pool(PgSocket *client, PktHdr *pkt)
 {
        const char *username = NULL, *dbname = NULL;
        const char *key, *val;
+       bool ok;
 
        while (1) {
-               key = mbuf_get_string(&pkt->data);
-               if (!key || *key == 0)
+               ok = mbuf_get_string(&pkt->data, &key);
+               if (!ok || *key == 0)
                        break;
-               val = mbuf_get_string(&pkt->data);
-               if (!val)
+               ok = mbuf_get_string(&pkt->data, &val);
+               if (!ok)
                        break;
 
                if (strcmp(key, "database") == 0)
@@ -191,6 +192,8 @@ static bool send_client_authreq(PgSocket *client)
 static bool handle_client_startup(PgSocket *client, PktHdr *pkt)
 {
        const char *passwd;
+       const uint8_t *key;
+       bool ok;
 
        SBuf *sbuf = &client->sbuf;
 
@@ -254,8 +257,8 @@ static bool handle_client_startup(PgSocket *client, PktHdr *pkt)
                        return false;
                }
 
-               passwd = mbuf_get_string(&pkt->data);
-               if (passwd && check_client_passwd(client, passwd)) {
+               ok = mbuf_get_string(&pkt->data, &passwd);
+               if (ok && check_client_passwd(client, passwd)) {
                        if (!finish_client_login(client))
                                return false;
                } else {
@@ -264,8 +267,9 @@ static bool handle_client_startup(PgSocket *client, PktHdr *pkt)
                }
                break;
        case PKT_CANCEL:
-               if (mbuf_avail(&pkt->data) == BACKENDKEY_LEN) {
-                       const uint8_t *key = mbuf_get_bytes(&pkt->data, BACKENDKEY_LEN);
+               if (mbuf_avail_for_read(&pkt->data) == BACKENDKEY_LEN
+                   && mbuf_get_bytes(&pkt->data, BACKENDKEY_LEN, &key))
+               {
                        memcpy(client->cancel_key, key, BACKENDKEY_LEN);
                        accept_cancel_request(client);
                } else
@@ -345,7 +349,7 @@ static bool handle_client_work(PgSocket *client, PktHdr *pkt)
 }
 
 /* callback from SBuf */
-bool client_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data)
+bool client_proto(SBuf *sbuf, SBufEvent evtype, struct MBuf *data)
 {
        bool res = false;
        PgSocket *client = container_of(sbuf, PgSocket, sbuf);
@@ -371,7 +375,7 @@ bool client_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data)
                disconnect_server(client->link, false, "Server connection closed");
                break;
        case SBUF_EV_READ:
-               if (mbuf_avail(data) < NEW_HEADER_LEN && client->state != CL_LOGIN) {
+               if (mbuf_avail_for_read(data) < NEW_HEADER_LEN && client->state != CL_LOGIN) {
                        slog_noise(client, "C: got partial header, trying to wait a bit");
                        return false;
                }
index 8c6d379ced44565e0e1714b18fe42a2fa99e8c55..4ff15b39af62ecb9b2cbb9654847a8c208fa72e3 100644 (file)
 #include "bouncer.h"
 
 /*
- * parse protocol header from MBuf
+ * parse protocol header from struct MBuf
  */
 
 /* parses pkt header from buffer, returns false if failed */
-bool get_header(MBuf *data, PktHdr *pkt)
+bool get_header(struct MBuf *data, PktHdr *pkt)
 {
        unsigned type;
-       unsigned len;
-       unsigned code;
+       uint32_t len;
        unsigned got;
        unsigned avail;
-       MBuf hdr;
+       uint16_t len16;
+       uint8_t type8;
+       uint32_t code;
+       struct MBuf hdr;
+       const uint8_t *ptr;
 
        mbuf_copy(data, &hdr);
 
-       if (mbuf_avail(&hdr) < NEW_HEADER_LEN) {
+       if (mbuf_avail_for_read(&hdr) < NEW_HEADER_LEN) {
                log_noise("get_header: less then 5 bytes available");
                return false;
        }
-       type = mbuf_get_char(&hdr);
+       if (!mbuf_get_byte(&hdr, &type8))
+               return false;
+       type = type8;
        if (type != 0) {
                /* wire length does not include type byte */
-               len = mbuf_get_uint32(&hdr) + 1;
+               if (!mbuf_get_uint32be(&hdr, &len))
+                       return false;
+               len++;
                got = NEW_HEADER_LEN;
        } else {
-               if (mbuf_get_char(&hdr) != 0) {
+               if (!mbuf_get_byte(&hdr, &type8))
+                       return false;
+               if (type8 != 0) {
                        log_noise("get_header: unknown special pkt");
                        return false;
                }
                /* dont tolerate partial pkt */
-               if (mbuf_avail(&hdr) < OLD_HEADER_LEN - 2) {
+               if (mbuf_avail_for_read(&hdr) < OLD_HEADER_LEN - 2) {
                        log_noise("get_header: less than 8 bytes for special pkt");
                        return false;
                }
-               len = mbuf_get_uint16(&hdr);
-               code = mbuf_get_uint32(&hdr);
+               if (!mbuf_get_uint16be(&hdr, &len16))
+                       return false;
+               len = len16;
+               if (!mbuf_get_uint32be(&hdr, &code))
+                       return false;
                if (code == PKT_CANCEL)
                        type = PKT_CANCEL;
                else if (code == PKT_SSLREQ)
@@ -83,16 +95,15 @@ bool get_header(MBuf *data, PktHdr *pkt)
        pkt->len = len;
 
        /* fill pkt with only data for this packet */
-       if (len > mbuf_avail(data))
-               avail = mbuf_avail(data);
+       if (len > mbuf_avail_for_read(data))
+               avail = mbuf_avail_for_read(data);
        else
                avail = len;
-       mbuf_slice(data, avail, &pkt->data);
+       if (!mbuf_slice(data, avail, &pkt->data))
+               return false;
 
        /* tag header as read */
-       mbuf_get_bytes(&pkt->data, got);
-
-       return true;
+       return mbuf_get_bytes(&pkt->data, got, &ptr);
 }
 
 
@@ -122,13 +133,13 @@ bool send_pooler_error(PgSocket *client, bool send_ready, const char *msg)
 void parse_server_error(PktHdr *pkt, const char **level_p, const char **msg_p)
 {
        const char *level = NULL, *msg = NULL, *val;
-       int type;
-       while (mbuf_avail(&pkt->data)) {
-               type = mbuf_get_char(&pkt->data);
+       uint8_t type;
+       while (mbuf_avail_for_read(&pkt->data)) {
+               if (!mbuf_get_byte(&pkt->data, &type))
+                       break;
                if (type == 0)
                        break;
-               val = mbuf_get_string(&pkt->data);
-               if (!val)
+               if (!mbuf_get_string(&pkt->data, &val))
                        break;
                if (type == 'S')
                        level = val;
@@ -271,15 +282,16 @@ static bool login_md5_psw(PgSocket *server, const uint8_t *salt)
 /* answer server authentication request */
 bool answer_authreq(PgSocket *server, PktHdr *pkt)
 {
-       unsigned cmd;
+       uint32_t cmd;
        const uint8_t *salt;
        bool res = false;
 
        /* authreq body must contain 32bit cmd */
-       if (mbuf_avail(&pkt->data) < 4)
+       if (mbuf_avail_for_read(&pkt->data) < 4)
                return false;
 
-       cmd = mbuf_get_uint32(&pkt->data);
+       if (!mbuf_get_uint32be(&pkt->data, &cmd))
+               return false;
        switch (cmd) {
        case 0:
                slog_debug(server, "S: auth ok");
@@ -291,16 +303,14 @@ bool answer_authreq(PgSocket *server, PktHdr *pkt)
                break;
        case 4:
                slog_debug(server, "S: req crypt psw");
-               if (mbuf_avail(&pkt->data) < 2)
+               if (!mbuf_get_bytes(&pkt->data, 2, &salt))
                        return false;
-               salt = mbuf_get_bytes(&pkt->data, 2);
                res = login_crypt_psw(server, salt);
                break;
        case 5:
                slog_debug(server, "S: req md5-crypted psw");
-               if (mbuf_avail(&pkt->data) < 4)
+               if (!mbuf_get_bytes(&pkt->data, 4, &salt))
                        return false;
-               salt = mbuf_get_bytes(&pkt->data, 4);
                res = login_md5_psw(server, salt);
                break;
        case 2: /* kerberos */
@@ -330,33 +340,39 @@ bool send_startup_packet(PgSocket *server)
        return pktbuf_send_immidiate(&pkt, server);
 }
 
-int scan_text_result(MBuf *pkt, const char *tupdesc, ...)
+int scan_text_result(struct MBuf *pkt, const char *tupdesc, ...)
 {
-       char *val = NULL;
-       int len;
-       unsigned ncol, i, asked;
+       const char *val = NULL;
+       uint32_t len;
+       uint16_t ncol;
+       unsigned i, asked;
        va_list ap;
        int *int_p;
        uint64_t *long_p;
-       char **str_p;
+       const char **str_p;
 
        asked = strlen(tupdesc);
-       ncol = mbuf_get_uint16(pkt);
+       if (!mbuf_get_uint16be(pkt, &ncol))
+               return -1;
 
        va_start(ap, tupdesc);
        for (i = 0; i < asked; i++) {
                if (i < ncol) {
-                       len = mbuf_get_uint32(pkt);
-                       if (len < 0)
+                       if (!mbuf_get_uint32be(pkt, &len))
+                               return -1;
+                       if ((int32_t)len < 0) {
                                val = NULL;
-                       else
-                               val = (char *)mbuf_get_bytes(pkt, len);
+                       } else {
+                               if (!mbuf_get_chars(pkt, len, &val))
+                                       return -1;
+                       }
 
                        /* hack to zero-terminate the result */
                        if (val) {
-                               val--;
-                               memmove(val, val + 1, len);
-                               val[len] = 0;
+                               char *xval = (char *)val - 1;
+                               memmove(xval, val, len);
+                               xval[len] = 0;
+                               val = xval;
                        }
                } else
                        /* tuple was shorter than requested */
@@ -372,7 +388,7 @@ int scan_text_result(MBuf *pkt, const char *tupdesc, ...)
                        *long_p = atoll(val);
                        break;
                case 's':
-                       str_p = va_arg(ap, char **);
+                       str_p = va_arg(ap, const char **);
                        *str_p = val;
                        break;
                default:
index 3bef8f80d201c988e5ea5068d4b0cf6707a88d3b..370ceabfa4b891fe372b0022954fa93dbad98570 100644 (file)
@@ -319,7 +319,7 @@ void sbuf_prepare_fetch(SBuf *sbuf, unsigned amount)
  *************************/
 
 /*
- * Call proto callback with proper MBuf.
+ * Call proto callback with proper struct MBuf.
  *
  * If callback returns true it used one of sbuf_prepare_* on sbuf,
  * and processing can continue.
@@ -330,7 +330,7 @@ void sbuf_prepare_fetch(SBuf *sbuf, unsigned amount)
  */
 static bool sbuf_call_proto(SBuf *sbuf, int event)
 {
-       MBuf mbuf;
+       struct MBuf mbuf;
        IOBuf *io = sbuf->io;
        bool res;
 
index ed18b768819d878743c70dab17f1d07afb166f34..36b1e1f46515120dd15582be3d969f84abcf7356 100644 (file)
@@ -34,12 +34,10 @@ static bool load_parameter(PgSocket *server, PktHdr *pkt, bool startup)
        if (incomplete_pkt(pkt))
                return false;
 
-       key = mbuf_get_string(&pkt->data);
-       val = mbuf_get_string(&pkt->data);
-       if (!key || !val) {
-               disconnect_server(server, true, "broken ParameterStatus packet");
-               return false;
-       }
+       if (!mbuf_get_string(&pkt->data, &key))
+               goto failed;
+       if (!mbuf_get_string(&pkt->data, &val))
+               goto failed;
        slog_debug(server, "S: param: %s = %s", key, val);
 
        varcache_set(&server->vars, key, val);
@@ -53,6 +51,9 @@ static bool load_parameter(PgSocket *server, PktHdr *pkt, bool startup)
                add_welcome_parameter(server->pool, key, val);
 
        return true;
+failed:
+       disconnect_server(server, true, "broken ParameterStatus packet");
+       return false;
 }
 
 /* we cannot log in at all, notify clients */
@@ -80,6 +81,7 @@ static bool handle_server_startup(PgSocket *server, PktHdr *pkt)
 {
        SBuf *sbuf = &server->sbuf;
        bool res = false;
+       const uint8_t *ckey;
 
        if (incomplete_pkt(pkt)) {
                disconnect_server(server, true, "partial pkt in login phase");
@@ -158,10 +160,11 @@ static bool handle_server_startup(PgSocket *server, PktHdr *pkt)
 
        /* ignorable packets */
        case 'K':               /* BackendKeyData */
-               if (mbuf_avail(&pkt->data) >= BACKENDKEY_LEN)
-                       memcpy(server->cancel_key,
-                              mbuf_get_bytes(&pkt->data, BACKENDKEY_LEN),
-                              BACKENDKEY_LEN);
+               if (!mbuf_get_bytes(&pkt->data, BACKENDKEY_LEN, &ckey)) {
+                       disconnect_server(server, true, "bad cancel key");
+                       return false;
+               }
+               memcpy(server->cancel_key, ckey, BACKENDKEY_LEN);
                res = true;
                break;
 
@@ -197,9 +200,8 @@ static bool handle_server_work(PgSocket *server, PktHdr *pkt)
        case 'Z':               /* ReadyForQuery */
 
                /* if partial pkt, wait */
-               if (mbuf_avail(&pkt->data) == 0)
+               if (!mbuf_get_char(&pkt->data, &state))
                        return false;
-               state = mbuf_get_char(&pkt->data);
 
                /* set ready only if no tx */
                if (state == 'I')
@@ -320,7 +322,7 @@ static bool handle_connect(PgSocket *server)
 }
 
 /* callback from SBuf */
-bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data)
+bool server_proto(SBuf *sbuf, SBufEvent evtype, struct MBuf *data)
 {
        bool res = false;
        PgSocket *server = container_of(sbuf, PgSocket, sbuf);
@@ -342,7 +344,7 @@ bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data)
                disconnect_client(server->link, false, "unexpected eof");
                break;
        case SBUF_EV_READ:
-               if (mbuf_avail(data) < NEW_HEADER_LEN) {
+               if (mbuf_avail_for_read(data) < NEW_HEADER_LEN) {
                        slog_noise(server, "S: got partial header, trying to wait a bit");
                        break;
                }
index 743de93b693a80c32ef9ebb4fcacfc7ad5a301e7..8dbf06584d605b6b7e17ef91d47d23bf328d5e12 100644 (file)
@@ -75,7 +75,7 @@ static void takeover_finish_part1(PgSocket *bouncer)
 }
 
 /* parse msg for fd and info */
-static void takeover_load_fd(MBuf *pkt, const struct cmsghdr *cmsg)
+static void takeover_load_fd(struct MBuf *pkt, const struct cmsghdr *cmsg)
 {
        int fd;
        char *task, *saddr, *user, *db;
@@ -102,7 +102,7 @@ static void takeover_load_fd(MBuf *pkt, const struct cmsghdr *cmsg)
        got = scan_text_result(pkt, "issssiqissss", &oldfd, &task, &user, &db,
                               &saddr, &port, &ckey, &linkfd,
                               &client_enc, &std_string, &datestyle, &timezone);
-       if (task == NULL || saddr == NULL)
+       if (got < 0 || task == NULL || saddr == NULL)
                fatal("NULL data from old process");
 
        log_debug("FD row: fd=%d(%d) linkfd=%d task=%s user=%s db=%s enc=%s",
@@ -190,10 +190,13 @@ static void takeover_postprocess_fds(void)
        }
 }
 
-static void next_command(PgSocket *bouncer, MBuf *pkt)
+static void next_command(PgSocket *bouncer, struct MBuf *pkt)
 {
        bool res = true;
-       const char *cmd = mbuf_get_string(pkt);
+       const char *cmd;
+       
+       if (!mbuf_get_string(pkt, &cmd))
+               fatal("bad result pkt");
 
        log_debug("takeover_recv_fds: 'C' body: %s", cmd);
        if (strcmp(cmd, "SUSPEND") == 0) {
@@ -213,14 +216,14 @@ static void next_command(PgSocket *bouncer, MBuf *pkt)
 }
 
 static void takeover_parse_data(PgSocket *bouncer,
-                               struct msghdr *msg, MBuf *data)
+                               struct msghdr *msg, struct MBuf *data)
 {
        struct cmsghdr *cmsg;
        PktHdr pkt;
        
        cmsg = msg->msg_controllen ? CMSG_FIRSTHDR(msg) : NULL;
 
-       while (mbuf_avail(data) > 0) {
+       while (mbuf_avail_for_read(data) > 0) {
                if (!get_header(data, &pkt))
                        fatal("cannot parse packet");
 
@@ -271,7 +274,7 @@ static void takeover_recv_cb(int sock, short flags, void *arg)
        struct msghdr msg;
        struct iovec io;
        int res;
-       MBuf data;
+       struct MBuf data;
 
        memset(&msg, 0, sizeof(msg));
        io.iov_base = data_buf;
@@ -283,7 +286,7 @@ static void takeover_recv_cb(int sock, short flags, void *arg)
 
        res = safe_recvmsg(sock, &msg, 0);
        if (res > 0) {
-               mbuf_init(&data, data_buf, res);
+               mbuf_init_fixed_reader(&data, data_buf, res);
                takeover_parse_data(bouncer, &msg, &data);
        } else if (res == 0) {
                fatal("unexpected EOF");