From 5f8250312f592f0e577c90f7bd282279b584e6c4 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Mon, 11 Jan 2010 18:50:39 +0200 Subject: [PATCH] --- Makefile | 2 +- include/bouncer.h | 2 +- include/client.h | 2 +- include/iobuf.h | 8 +-- include/mbuf.h | 121 ---------------------------------------------- include/proto.h | 8 +-- include/sbuf.h | 2 +- include/server.h | 2 +- src/admin.c | 12 +++-- src/client.c | 24 +++++---- src/proto.c | 104 ++++++++++++++++++++++----------------- src/sbuf.c | 4 +- src/server.c | 30 ++++++------ src/takeover.c | 19 +++++--- 14 files changed, 123 insertions(+), 217 deletions(-) delete mode 100644 include/mbuf.h diff --git a/Makefile b/Makefile index 1ddb63d..5eeddc9 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ SRCS = client.c loader.c objects.c pooler.c proto.c sbuf.c server.c util.c \ varcache.c HDRS = client.h loader.h objects.h pooler.h proto.h sbuf.h server.h util.h \ admin.h stats.h takeover.h janitor.h pktbuf.h system.h bouncer.h \ - mbuf.h varcache.h iobuf.h + varcache.h iobuf.h # data & dirs to include in tgz DOCS = doc/overview.txt doc/usage.txt doc/config.txt doc/todo.txt diff --git a/include/bouncer.h b/include/bouncer.h index b451544..72c614e 100644 --- a/include/bouncer.h +++ b/include/bouncer.h @@ -32,6 +32,7 @@ #include #include #include +#include #include @@ -80,7 +81,6 @@ typedef struct PktHdr PktHdr; extern int cf_sbuf_len; #include "util.h" -#include "mbuf.h" #include "iobuf.h" #include "sbuf.h" #include "pktbuf.h" diff --git a/include/client.h b/include/client.h index 6a4a13e..aaa1431 100644 --- a/include/client.h +++ b/include/client.h @@ -16,7 +16,7 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ -bool client_proto(SBuf *sbuf, SBufEvent evtype, MBuf *pkt) _MUSTCHECK; +bool client_proto(SBuf *sbuf, SBufEvent evtype, struct MBuf *pkt) _MUSTCHECK; bool set_pool(PgSocket *client, const char *dbname, const char *username) _MUSTCHECK; diff --git a/include/iobuf.h b/include/iobuf.h index f4ff180..41e3d10 100644 --- a/include/iobuf.h +++ b/include/iobuf.h @@ -84,22 +84,22 @@ static inline unsigned iobuf_amount_recv(const IOBuf *buf) } /* put all unparsed to mbuf */ -static inline unsigned iobuf_parse_all(const IOBuf *buf, MBuf *mbuf) +static inline unsigned iobuf_parse_all(const IOBuf *buf, struct MBuf *mbuf) { unsigned avail = iobuf_amount_parse(buf); const uint8_t *pos = buf->buf + buf->parse_pos; - mbuf_init(mbuf, pos, avail); + mbuf_init_fixed_reader(mbuf, pos, avail); return avail; } /* put all unparsed to mbuf, with size limit */ -static inline unsigned iobuf_parse_limit(const IOBuf *buf, MBuf *mbuf, unsigned limit) +static inline unsigned iobuf_parse_limit(const IOBuf *buf, struct MBuf *mbuf, unsigned limit) { unsigned avail = iobuf_amount_parse(buf); const uint8_t *pos = buf->buf + buf->parse_pos; if (avail > limit) avail = limit; - mbuf_init(mbuf, pos, avail); + mbuf_init_fixed_reader(mbuf, pos, avail); return avail; } diff --git a/include/mbuf.h b/include/mbuf.h deleted file mode 100644 index 97d8480..0000000 --- a/include/mbuf.h +++ /dev/null @@ -1,121 +0,0 @@ -/* - * PgBouncer - Lightweight connection pooler for PostgreSQL. - * - * Copyright (c) 2007-2009 Marko Kreen, Skype Technologies OÜ - * - * Permission to use, copy, modify, and/or distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - */ - -/* - * Safe and easy access to fixed memory buffer. - */ - -/* - * FIXME: the code should be converted so that - * the fatal()-s can be replaced by Asserts(). - */ - -typedef struct MBuf MBuf; -struct MBuf { - const uint8_t *data; - const uint8_t *end; - const uint8_t *pos; -}; - -static inline void mbuf_init(MBuf *buf, const uint8_t *ptr, int len) -{ - if (len < 0) - fatal("fuckup"); - buf->data = buf->pos = ptr; - buf->end = ptr + len; -} - -static inline uint8_t mbuf_get_char(MBuf *buf) -{ - if (buf->pos + 1 > buf->end) - fatal("buffer overflow"); - return *buf->pos++; -} - -static inline unsigned mbuf_get_uint16(MBuf *buf) -{ - unsigned val; - if (buf->pos + 2 > buf->end) - fatal("buffer overflow"); - val = *buf->pos++; - val = (val << 8) | *buf->pos++; - return val; -} - -static inline uint32_t mbuf_get_uint32(MBuf *buf) -{ - uint32_t val; - if (buf->pos + 4 > buf->end) - fatal("buffer overflow"); - val = *buf->pos++; - val = (val << 8) | *buf->pos++; - val = (val << 8) | *buf->pos++; - val = (val << 8) | *buf->pos++; - return val; -} - -static inline uint64_t mbuf_get_uint64(MBuf *buf) -{ - uint64_t i1, i2; - i1 = mbuf_get_uint32(buf); - i2 = mbuf_get_uint32(buf); - return (i1 << 32) | i2; -} - -static inline const uint8_t * mbuf_get_bytes(MBuf *buf, unsigned len) -{ - const uint8_t *res = buf->pos; - if (buf->pos + len > buf->end) - fatal("buffer overflow"); - buf->pos += len; - return res; -} - -static inline unsigned mbuf_avail(const MBuf *buf) -{ - return buf->end - buf->pos; -} - -static inline unsigned mbuf_size(const MBuf *buf) -{ - return buf->end - buf->data; -} - -static inline const char * mbuf_get_string(MBuf *buf) -{ - const char *res = (const char *)buf->pos; - const uint8_t *nul = memchr(res, 0, mbuf_avail(buf)); - if (!nul) - return NULL; - buf->pos = nul + 1; - return res; -} - -static inline void mbuf_copy(const MBuf *src, MBuf *dst) -{ - *dst = *src; -} - -static inline void mbuf_slice(MBuf *src, unsigned len, MBuf *dst) -{ - if (len > mbuf_avail(src)) - fatal("buffer overflow"); - mbuf_init(dst, src->pos, len); - src->pos += len; -} - diff --git a/include/proto.h b/include/proto.h index 82a2c61..f42aefd 100644 --- a/include/proto.h +++ b/include/proto.h @@ -29,10 +29,10 @@ struct PktHdr { unsigned type; unsigned len; - MBuf data; + struct MBuf data; }; -bool get_header(MBuf *data, PktHdr *pkt) _MUSTCHECK; +bool get_header(struct MBuf *data, PktHdr *pkt) _MUSTCHECK; bool send_pooler_error(PgSocket *client, bool send_ready, const char *msg) /*_MUSTCHECK*/; void log_server_error(const char *note, PktHdr *pkt); @@ -46,12 +46,12 @@ bool answer_authreq(PgSocket *server, PktHdr *pkt) _MUSTCHECK; bool send_startup_packet(PgSocket *server) _MUSTCHECK; -int scan_text_result(MBuf *pkt, const char *tupdesc, ...) _MUSTCHECK; +int scan_text_result(struct MBuf *pkt, const char *tupdesc, ...) _MUSTCHECK; /* is packet completely in our buffer */ static inline bool incomplete_pkt(const PktHdr *pkt) { - return mbuf_size(&pkt->data) != pkt->len; + return mbuf_written(&pkt->data) != pkt->len; } /* one char desc */ diff --git a/include/sbuf.h b/include/sbuf.h index 33af683..ca2c472 100644 --- a/include/sbuf.h +++ b/include/sbuf.h @@ -47,7 +47,7 @@ typedef struct SBuf SBuf; next event loop (eg. too few data available). */ typedef bool (*sbuf_cb_t)(SBuf *sbuf, SBufEvent evtype, - MBuf *mbuf); + struct MBuf *mbuf); /* for some reason, libevent has no typedef for callback */ typedef void (*sbuf_libevent_cb)(int, short, void *); diff --git a/include/server.h b/include/server.h index 6cd0a6b..870a590 100644 --- a/include/server.h +++ b/include/server.h @@ -16,5 +16,5 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ -bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *pkt) _MUSTCHECK; +bool server_proto(SBuf *sbuf, SBufEvent evtype, struct MBuf *pkt) _MUSTCHECK; diff --git a/src/admin.c b/src/admin.c index 731b972..21a6813 100644 --- a/src/admin.c +++ b/src/admin.c @@ -283,10 +283,13 @@ static bool send_one_fd(PgSocket *admin, static bool show_one_fd(PgSocket *admin, PgSocket *sk) { PgAddr *addr = &sk->remote_addr; - MBuf tmp; + struct MBuf tmp; VarCache *v = &sk->vars; + uint64_t ckey; - mbuf_init(&tmp, sk->cancel_key, 8); + mbuf_init_fixed_reader(&tmp, sk->cancel_key, 8); + if (!mbuf_get_uint64be(&tmp, &ckey)) + return false; return send_one_fd(admin, sbuf_socket(&sk->sbuf), is_server_socket(sk) ? "server" : "client", @@ -294,7 +297,7 @@ static bool show_one_fd(PgSocket *admin, PgSocket *sk) sk->pool ? sk->pool->db->name : NULL, addr->is_unix ? "unix" : inet_ntoa(addr->ip_addr), addr->port, - mbuf_get_uint64(&tmp), + ckey, sk->link ? sbuf_socket(&sk->link->sbuf) : 0, v->client_encoding[0] ? v->client_encoding : NULL, v->std_strings[0] ? v->std_strings : NULL, @@ -1061,8 +1064,7 @@ bool admin_handle_client(PgSocket *admin, PktHdr *pkt) switch (pkt->type) { case 'Q': - q = mbuf_get_string(&pkt->data); - if (!q) { + if (!mbuf_get_string(&pkt->data, &q)) { disconnect_client(admin, true, "incomplete query"); return false; } diff --git a/src/client.c b/src/client.c index ccf6edf..a5acf22 100644 --- a/src/client.c +++ b/src/client.c @@ -104,13 +104,14 @@ static bool decide_startup_pool(PgSocket *client, PktHdr *pkt) { const char *username = NULL, *dbname = NULL; const char *key, *val; + bool ok; while (1) { - key = mbuf_get_string(&pkt->data); - if (!key || *key == 0) + ok = mbuf_get_string(&pkt->data, &key); + if (!ok || *key == 0) break; - val = mbuf_get_string(&pkt->data); - if (!val) + ok = mbuf_get_string(&pkt->data, &val); + if (!ok) break; if (strcmp(key, "database") == 0) @@ -191,6 +192,8 @@ static bool send_client_authreq(PgSocket *client) static bool handle_client_startup(PgSocket *client, PktHdr *pkt) { const char *passwd; + const uint8_t *key; + bool ok; SBuf *sbuf = &client->sbuf; @@ -254,8 +257,8 @@ static bool handle_client_startup(PgSocket *client, PktHdr *pkt) return false; } - passwd = mbuf_get_string(&pkt->data); - if (passwd && check_client_passwd(client, passwd)) { + ok = mbuf_get_string(&pkt->data, &passwd); + if (ok && check_client_passwd(client, passwd)) { if (!finish_client_login(client)) return false; } else { @@ -264,8 +267,9 @@ static bool handle_client_startup(PgSocket *client, PktHdr *pkt) } break; case PKT_CANCEL: - if (mbuf_avail(&pkt->data) == BACKENDKEY_LEN) { - const uint8_t *key = mbuf_get_bytes(&pkt->data, BACKENDKEY_LEN); + if (mbuf_avail_for_read(&pkt->data) == BACKENDKEY_LEN + && mbuf_get_bytes(&pkt->data, BACKENDKEY_LEN, &key)) + { memcpy(client->cancel_key, key, BACKENDKEY_LEN); accept_cancel_request(client); } else @@ -345,7 +349,7 @@ static bool handle_client_work(PgSocket *client, PktHdr *pkt) } /* callback from SBuf */ -bool client_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data) +bool client_proto(SBuf *sbuf, SBufEvent evtype, struct MBuf *data) { bool res = false; PgSocket *client = container_of(sbuf, PgSocket, sbuf); @@ -371,7 +375,7 @@ bool client_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data) disconnect_server(client->link, false, "Server connection closed"); break; case SBUF_EV_READ: - if (mbuf_avail(data) < NEW_HEADER_LEN && client->state != CL_LOGIN) { + if (mbuf_avail_for_read(data) < NEW_HEADER_LEN && client->state != CL_LOGIN) { slog_noise(client, "C: got partial header, trying to wait a bit"); return false; } diff --git a/src/proto.c b/src/proto.c index 8c6d379..4ff15b3 100644 --- a/src/proto.c +++ b/src/proto.c @@ -23,42 +23,54 @@ #include "bouncer.h" /* - * parse protocol header from MBuf + * parse protocol header from struct MBuf */ /* parses pkt header from buffer, returns false if failed */ -bool get_header(MBuf *data, PktHdr *pkt) +bool get_header(struct MBuf *data, PktHdr *pkt) { unsigned type; - unsigned len; - unsigned code; + uint32_t len; unsigned got; unsigned avail; - MBuf hdr; + uint16_t len16; + uint8_t type8; + uint32_t code; + struct MBuf hdr; + const uint8_t *ptr; mbuf_copy(data, &hdr); - if (mbuf_avail(&hdr) < NEW_HEADER_LEN) { + if (mbuf_avail_for_read(&hdr) < NEW_HEADER_LEN) { log_noise("get_header: less then 5 bytes available"); return false; } - type = mbuf_get_char(&hdr); + if (!mbuf_get_byte(&hdr, &type8)) + return false; + type = type8; if (type != 0) { /* wire length does not include type byte */ - len = mbuf_get_uint32(&hdr) + 1; + if (!mbuf_get_uint32be(&hdr, &len)) + return false; + len++; got = NEW_HEADER_LEN; } else { - if (mbuf_get_char(&hdr) != 0) { + if (!mbuf_get_byte(&hdr, &type8)) + return false; + if (type8 != 0) { log_noise("get_header: unknown special pkt"); return false; } /* dont tolerate partial pkt */ - if (mbuf_avail(&hdr) < OLD_HEADER_LEN - 2) { + if (mbuf_avail_for_read(&hdr) < OLD_HEADER_LEN - 2) { log_noise("get_header: less than 8 bytes for special pkt"); return false; } - len = mbuf_get_uint16(&hdr); - code = mbuf_get_uint32(&hdr); + if (!mbuf_get_uint16be(&hdr, &len16)) + return false; + len = len16; + if (!mbuf_get_uint32be(&hdr, &code)) + return false; if (code == PKT_CANCEL) type = PKT_CANCEL; else if (code == PKT_SSLREQ) @@ -83,16 +95,15 @@ bool get_header(MBuf *data, PktHdr *pkt) pkt->len = len; /* fill pkt with only data for this packet */ - if (len > mbuf_avail(data)) - avail = mbuf_avail(data); + if (len > mbuf_avail_for_read(data)) + avail = mbuf_avail_for_read(data); else avail = len; - mbuf_slice(data, avail, &pkt->data); + if (!mbuf_slice(data, avail, &pkt->data)) + return false; /* tag header as read */ - mbuf_get_bytes(&pkt->data, got); - - return true; + return mbuf_get_bytes(&pkt->data, got, &ptr); } @@ -122,13 +133,13 @@ bool send_pooler_error(PgSocket *client, bool send_ready, const char *msg) void parse_server_error(PktHdr *pkt, const char **level_p, const char **msg_p) { const char *level = NULL, *msg = NULL, *val; - int type; - while (mbuf_avail(&pkt->data)) { - type = mbuf_get_char(&pkt->data); + uint8_t type; + while (mbuf_avail_for_read(&pkt->data)) { + if (!mbuf_get_byte(&pkt->data, &type)) + break; if (type == 0) break; - val = mbuf_get_string(&pkt->data); - if (!val) + if (!mbuf_get_string(&pkt->data, &val)) break; if (type == 'S') level = val; @@ -271,15 +282,16 @@ static bool login_md5_psw(PgSocket *server, const uint8_t *salt) /* answer server authentication request */ bool answer_authreq(PgSocket *server, PktHdr *pkt) { - unsigned cmd; + uint32_t cmd; const uint8_t *salt; bool res = false; /* authreq body must contain 32bit cmd */ - if (mbuf_avail(&pkt->data) < 4) + if (mbuf_avail_for_read(&pkt->data) < 4) return false; - cmd = mbuf_get_uint32(&pkt->data); + if (!mbuf_get_uint32be(&pkt->data, &cmd)) + return false; switch (cmd) { case 0: slog_debug(server, "S: auth ok"); @@ -291,16 +303,14 @@ bool answer_authreq(PgSocket *server, PktHdr *pkt) break; case 4: slog_debug(server, "S: req crypt psw"); - if (mbuf_avail(&pkt->data) < 2) + if (!mbuf_get_bytes(&pkt->data, 2, &salt)) return false; - salt = mbuf_get_bytes(&pkt->data, 2); res = login_crypt_psw(server, salt); break; case 5: slog_debug(server, "S: req md5-crypted psw"); - if (mbuf_avail(&pkt->data) < 4) + if (!mbuf_get_bytes(&pkt->data, 4, &salt)) return false; - salt = mbuf_get_bytes(&pkt->data, 4); res = login_md5_psw(server, salt); break; case 2: /* kerberos */ @@ -330,33 +340,39 @@ bool send_startup_packet(PgSocket *server) return pktbuf_send_immidiate(&pkt, server); } -int scan_text_result(MBuf *pkt, const char *tupdesc, ...) +int scan_text_result(struct MBuf *pkt, const char *tupdesc, ...) { - char *val = NULL; - int len; - unsigned ncol, i, asked; + const char *val = NULL; + uint32_t len; + uint16_t ncol; + unsigned i, asked; va_list ap; int *int_p; uint64_t *long_p; - char **str_p; + const char **str_p; asked = strlen(tupdesc); - ncol = mbuf_get_uint16(pkt); + if (!mbuf_get_uint16be(pkt, &ncol)) + return -1; va_start(ap, tupdesc); for (i = 0; i < asked; i++) { if (i < ncol) { - len = mbuf_get_uint32(pkt); - if (len < 0) + if (!mbuf_get_uint32be(pkt, &len)) + return -1; + if ((int32_t)len < 0) { val = NULL; - else - val = (char *)mbuf_get_bytes(pkt, len); + } else { + if (!mbuf_get_chars(pkt, len, &val)) + return -1; + } /* hack to zero-terminate the result */ if (val) { - val--; - memmove(val, val + 1, len); - val[len] = 0; + char *xval = (char *)val - 1; + memmove(xval, val, len); + xval[len] = 0; + val = xval; } } else /* tuple was shorter than requested */ @@ -372,7 +388,7 @@ int scan_text_result(MBuf *pkt, const char *tupdesc, ...) *long_p = atoll(val); break; case 's': - str_p = va_arg(ap, char **); + str_p = va_arg(ap, const char **); *str_p = val; break; default: diff --git a/src/sbuf.c b/src/sbuf.c index 3bef8f8..370ceab 100644 --- a/src/sbuf.c +++ b/src/sbuf.c @@ -319,7 +319,7 @@ void sbuf_prepare_fetch(SBuf *sbuf, unsigned amount) *************************/ /* - * Call proto callback with proper MBuf. + * Call proto callback with proper struct MBuf. * * If callback returns true it used one of sbuf_prepare_* on sbuf, * and processing can continue. @@ -330,7 +330,7 @@ void sbuf_prepare_fetch(SBuf *sbuf, unsigned amount) */ static bool sbuf_call_proto(SBuf *sbuf, int event) { - MBuf mbuf; + struct MBuf mbuf; IOBuf *io = sbuf->io; bool res; diff --git a/src/server.c b/src/server.c index ed18b76..36b1e1f 100644 --- a/src/server.c +++ b/src/server.c @@ -34,12 +34,10 @@ static bool load_parameter(PgSocket *server, PktHdr *pkt, bool startup) if (incomplete_pkt(pkt)) return false; - key = mbuf_get_string(&pkt->data); - val = mbuf_get_string(&pkt->data); - if (!key || !val) { - disconnect_server(server, true, "broken ParameterStatus packet"); - return false; - } + if (!mbuf_get_string(&pkt->data, &key)) + goto failed; + if (!mbuf_get_string(&pkt->data, &val)) + goto failed; slog_debug(server, "S: param: %s = %s", key, val); varcache_set(&server->vars, key, val); @@ -53,6 +51,9 @@ static bool load_parameter(PgSocket *server, PktHdr *pkt, bool startup) add_welcome_parameter(server->pool, key, val); return true; +failed: + disconnect_server(server, true, "broken ParameterStatus packet"); + return false; } /* we cannot log in at all, notify clients */ @@ -80,6 +81,7 @@ static bool handle_server_startup(PgSocket *server, PktHdr *pkt) { SBuf *sbuf = &server->sbuf; bool res = false; + const uint8_t *ckey; if (incomplete_pkt(pkt)) { disconnect_server(server, true, "partial pkt in login phase"); @@ -158,10 +160,11 @@ static bool handle_server_startup(PgSocket *server, PktHdr *pkt) /* ignorable packets */ case 'K': /* BackendKeyData */ - if (mbuf_avail(&pkt->data) >= BACKENDKEY_LEN) - memcpy(server->cancel_key, - mbuf_get_bytes(&pkt->data, BACKENDKEY_LEN), - BACKENDKEY_LEN); + if (!mbuf_get_bytes(&pkt->data, BACKENDKEY_LEN, &ckey)) { + disconnect_server(server, true, "bad cancel key"); + return false; + } + memcpy(server->cancel_key, ckey, BACKENDKEY_LEN); res = true; break; @@ -197,9 +200,8 @@ static bool handle_server_work(PgSocket *server, PktHdr *pkt) case 'Z': /* ReadyForQuery */ /* if partial pkt, wait */ - if (mbuf_avail(&pkt->data) == 0) + if (!mbuf_get_char(&pkt->data, &state)) return false; - state = mbuf_get_char(&pkt->data); /* set ready only if no tx */ if (state == 'I') @@ -320,7 +322,7 @@ static bool handle_connect(PgSocket *server) } /* callback from SBuf */ -bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data) +bool server_proto(SBuf *sbuf, SBufEvent evtype, struct MBuf *data) { bool res = false; PgSocket *server = container_of(sbuf, PgSocket, sbuf); @@ -342,7 +344,7 @@ bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data) disconnect_client(server->link, false, "unexpected eof"); break; case SBUF_EV_READ: - if (mbuf_avail(data) < NEW_HEADER_LEN) { + if (mbuf_avail_for_read(data) < NEW_HEADER_LEN) { slog_noise(server, "S: got partial header, trying to wait a bit"); break; } diff --git a/src/takeover.c b/src/takeover.c index 743de93..8dbf065 100644 --- a/src/takeover.c +++ b/src/takeover.c @@ -75,7 +75,7 @@ static void takeover_finish_part1(PgSocket *bouncer) } /* parse msg for fd and info */ -static void takeover_load_fd(MBuf *pkt, const struct cmsghdr *cmsg) +static void takeover_load_fd(struct MBuf *pkt, const struct cmsghdr *cmsg) { int fd; char *task, *saddr, *user, *db; @@ -102,7 +102,7 @@ static void takeover_load_fd(MBuf *pkt, const struct cmsghdr *cmsg) got = scan_text_result(pkt, "issssiqissss", &oldfd, &task, &user, &db, &saddr, &port, &ckey, &linkfd, &client_enc, &std_string, &datestyle, &timezone); - if (task == NULL || saddr == NULL) + if (got < 0 || task == NULL || saddr == NULL) fatal("NULL data from old process"); log_debug("FD row: fd=%d(%d) linkfd=%d task=%s user=%s db=%s enc=%s", @@ -190,10 +190,13 @@ static void takeover_postprocess_fds(void) } } -static void next_command(PgSocket *bouncer, MBuf *pkt) +static void next_command(PgSocket *bouncer, struct MBuf *pkt) { bool res = true; - const char *cmd = mbuf_get_string(pkt); + const char *cmd; + + if (!mbuf_get_string(pkt, &cmd)) + fatal("bad result pkt"); log_debug("takeover_recv_fds: 'C' body: %s", cmd); if (strcmp(cmd, "SUSPEND") == 0) { @@ -213,14 +216,14 @@ static void next_command(PgSocket *bouncer, MBuf *pkt) } static void takeover_parse_data(PgSocket *bouncer, - struct msghdr *msg, MBuf *data) + struct msghdr *msg, struct MBuf *data) { struct cmsghdr *cmsg; PktHdr pkt; cmsg = msg->msg_controllen ? CMSG_FIRSTHDR(msg) : NULL; - while (mbuf_avail(data) > 0) { + while (mbuf_avail_for_read(data) > 0) { if (!get_header(data, &pkt)) fatal("cannot parse packet"); @@ -271,7 +274,7 @@ static void takeover_recv_cb(int sock, short flags, void *arg) struct msghdr msg; struct iovec io; int res; - MBuf data; + struct MBuf data; memset(&msg, 0, sizeof(msg)); io.iov_base = data_buf; @@ -283,7 +286,7 @@ static void takeover_recv_cb(int sock, short flags, void *arg) res = safe_recvmsg(sock, &msg, 0); if (res > 0) { - mbuf_init(&data, data_buf, res); + mbuf_init_fixed_reader(&data, data_buf, res); takeover_parse_data(bouncer, &msg, &data); } else if (res == 0) { fatal("unexpected EOF"); -- 2.40.0