varcache.c
HDRS = client.h loader.h objects.h pooler.h proto.h sbuf.h server.h util.h \
admin.h stats.h takeover.h janitor.h pktbuf.h system.h bouncer.h \
- mbuf.h varcache.h iobuf.h
+ varcache.h iobuf.h
# data & dirs to include in tgz
DOCS = doc/overview.txt doc/usage.txt doc/config.txt doc/todo.txt
#include <usual/slab.h>
#include <usual/socket.h>
#include <usual/safeio.h>
+#include <usual/mbuf.h>
#include <event.h>
extern int cf_sbuf_len;
#include "util.h"
-#include "mbuf.h"
#include "iobuf.h"
#include "sbuf.h"
#include "pktbuf.h"
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
-bool client_proto(SBuf *sbuf, SBufEvent evtype, MBuf *pkt) _MUSTCHECK;
+bool client_proto(SBuf *sbuf, SBufEvent evtype, struct MBuf *pkt) _MUSTCHECK;
bool set_pool(PgSocket *client, const char *dbname, const char *username) _MUSTCHECK;
}
/* put all unparsed to mbuf */
-static inline unsigned iobuf_parse_all(const IOBuf *buf, MBuf *mbuf)
+static inline unsigned iobuf_parse_all(const IOBuf *buf, struct MBuf *mbuf)
{
unsigned avail = iobuf_amount_parse(buf);
const uint8_t *pos = buf->buf + buf->parse_pos;
- mbuf_init(mbuf, pos, avail);
+ mbuf_init_fixed_reader(mbuf, pos, avail);
return avail;
}
/* put all unparsed to mbuf, with size limit */
-static inline unsigned iobuf_parse_limit(const IOBuf *buf, MBuf *mbuf, unsigned limit)
+static inline unsigned iobuf_parse_limit(const IOBuf *buf, struct MBuf *mbuf, unsigned limit)
{
unsigned avail = iobuf_amount_parse(buf);
const uint8_t *pos = buf->buf + buf->parse_pos;
if (avail > limit)
avail = limit;
- mbuf_init(mbuf, pos, avail);
+ mbuf_init_fixed_reader(mbuf, pos, avail);
return avail;
}
+++ /dev/null
-/*
- * PgBouncer - Lightweight connection pooler for PostgreSQL.
- *
- * Copyright (c) 2007-2009 Marko Kreen, Skype Technologies OÜ
- *
- * Permission to use, copy, modify, and/or distribute this software for any
- * purpose with or without fee is hereby granted, provided that the above
- * copyright notice and this permission notice appear in all copies.
- *
- * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- */
-
-/*
- * Safe and easy access to fixed memory buffer.
- */
-
-/*
- * FIXME: the code should be converted so that
- * the fatal()-s can be replaced by Asserts().
- */
-
-typedef struct MBuf MBuf;
-struct MBuf {
- const uint8_t *data;
- const uint8_t *end;
- const uint8_t *pos;
-};
-
-static inline void mbuf_init(MBuf *buf, const uint8_t *ptr, int len)
-{
- if (len < 0)
- fatal("fuckup");
- buf->data = buf->pos = ptr;
- buf->end = ptr + len;
-}
-
-static inline uint8_t mbuf_get_char(MBuf *buf)
-{
- if (buf->pos + 1 > buf->end)
- fatal("buffer overflow");
- return *buf->pos++;
-}
-
-static inline unsigned mbuf_get_uint16(MBuf *buf)
-{
- unsigned val;
- if (buf->pos + 2 > buf->end)
- fatal("buffer overflow");
- val = *buf->pos++;
- val = (val << 8) | *buf->pos++;
- return val;
-}
-
-static inline uint32_t mbuf_get_uint32(MBuf *buf)
-{
- uint32_t val;
- if (buf->pos + 4 > buf->end)
- fatal("buffer overflow");
- val = *buf->pos++;
- val = (val << 8) | *buf->pos++;
- val = (val << 8) | *buf->pos++;
- val = (val << 8) | *buf->pos++;
- return val;
-}
-
-static inline uint64_t mbuf_get_uint64(MBuf *buf)
-{
- uint64_t i1, i2;
- i1 = mbuf_get_uint32(buf);
- i2 = mbuf_get_uint32(buf);
- return (i1 << 32) | i2;
-}
-
-static inline const uint8_t * mbuf_get_bytes(MBuf *buf, unsigned len)
-{
- const uint8_t *res = buf->pos;
- if (buf->pos + len > buf->end)
- fatal("buffer overflow");
- buf->pos += len;
- return res;
-}
-
-static inline unsigned mbuf_avail(const MBuf *buf)
-{
- return buf->end - buf->pos;
-}
-
-static inline unsigned mbuf_size(const MBuf *buf)
-{
- return buf->end - buf->data;
-}
-
-static inline const char * mbuf_get_string(MBuf *buf)
-{
- const char *res = (const char *)buf->pos;
- const uint8_t *nul = memchr(res, 0, mbuf_avail(buf));
- if (!nul)
- return NULL;
- buf->pos = nul + 1;
- return res;
-}
-
-static inline void mbuf_copy(const MBuf *src, MBuf *dst)
-{
- *dst = *src;
-}
-
-static inline void mbuf_slice(MBuf *src, unsigned len, MBuf *dst)
-{
- if (len > mbuf_avail(src))
- fatal("buffer overflow");
- mbuf_init(dst, src->pos, len);
- src->pos += len;
-}
-
struct PktHdr {
unsigned type;
unsigned len;
- MBuf data;
+ struct MBuf data;
};
-bool get_header(MBuf *data, PktHdr *pkt) _MUSTCHECK;
+bool get_header(struct MBuf *data, PktHdr *pkt) _MUSTCHECK;
bool send_pooler_error(PgSocket *client, bool send_ready, const char *msg) /*_MUSTCHECK*/;
void log_server_error(const char *note, PktHdr *pkt);
bool send_startup_packet(PgSocket *server) _MUSTCHECK;
-int scan_text_result(MBuf *pkt, const char *tupdesc, ...) _MUSTCHECK;
+int scan_text_result(struct MBuf *pkt, const char *tupdesc, ...) _MUSTCHECK;
/* is packet completely in our buffer */
static inline bool incomplete_pkt(const PktHdr *pkt)
{
- return mbuf_size(&pkt->data) != pkt->len;
+ return mbuf_written(&pkt->data) != pkt->len;
}
/* one char desc */
next event loop (eg. too few data available). */
typedef bool (*sbuf_cb_t)(SBuf *sbuf,
SBufEvent evtype,
- MBuf *mbuf);
+ struct MBuf *mbuf);
/* for some reason, libevent has no typedef for callback */
typedef void (*sbuf_libevent_cb)(int, short, void *);
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
-bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *pkt) _MUSTCHECK;
+bool server_proto(SBuf *sbuf, SBufEvent evtype, struct MBuf *pkt) _MUSTCHECK;
static bool show_one_fd(PgSocket *admin, PgSocket *sk)
{
PgAddr *addr = &sk->remote_addr;
- MBuf tmp;
+ struct MBuf tmp;
VarCache *v = &sk->vars;
+ uint64_t ckey;
- mbuf_init(&tmp, sk->cancel_key, 8);
+ mbuf_init_fixed_reader(&tmp, sk->cancel_key, 8);
+ if (!mbuf_get_uint64be(&tmp, &ckey))
+ return false;
return send_one_fd(admin, sbuf_socket(&sk->sbuf),
is_server_socket(sk) ? "server" : "client",
sk->pool ? sk->pool->db->name : NULL,
addr->is_unix ? "unix" : inet_ntoa(addr->ip_addr),
addr->port,
- mbuf_get_uint64(&tmp),
+ ckey,
sk->link ? sbuf_socket(&sk->link->sbuf) : 0,
v->client_encoding[0] ? v->client_encoding : NULL,
v->std_strings[0] ? v->std_strings : NULL,
switch (pkt->type) {
case 'Q':
- q = mbuf_get_string(&pkt->data);
- if (!q) {
+ if (!mbuf_get_string(&pkt->data, &q)) {
disconnect_client(admin, true, "incomplete query");
return false;
}
{
const char *username = NULL, *dbname = NULL;
const char *key, *val;
+ bool ok;
while (1) {
- key = mbuf_get_string(&pkt->data);
- if (!key || *key == 0)
+ ok = mbuf_get_string(&pkt->data, &key);
+ if (!ok || *key == 0)
break;
- val = mbuf_get_string(&pkt->data);
- if (!val)
+ ok = mbuf_get_string(&pkt->data, &val);
+ if (!ok)
break;
if (strcmp(key, "database") == 0)
static bool handle_client_startup(PgSocket *client, PktHdr *pkt)
{
const char *passwd;
+ const uint8_t *key;
+ bool ok;
SBuf *sbuf = &client->sbuf;
return false;
}
- passwd = mbuf_get_string(&pkt->data);
- if (passwd && check_client_passwd(client, passwd)) {
+ ok = mbuf_get_string(&pkt->data, &passwd);
+ if (ok && check_client_passwd(client, passwd)) {
if (!finish_client_login(client))
return false;
} else {
}
break;
case PKT_CANCEL:
- if (mbuf_avail(&pkt->data) == BACKENDKEY_LEN) {
- const uint8_t *key = mbuf_get_bytes(&pkt->data, BACKENDKEY_LEN);
+ if (mbuf_avail_for_read(&pkt->data) == BACKENDKEY_LEN
+ && mbuf_get_bytes(&pkt->data, BACKENDKEY_LEN, &key))
+ {
memcpy(client->cancel_key, key, BACKENDKEY_LEN);
accept_cancel_request(client);
} else
}
/* callback from SBuf */
-bool client_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data)
+bool client_proto(SBuf *sbuf, SBufEvent evtype, struct MBuf *data)
{
bool res = false;
PgSocket *client = container_of(sbuf, PgSocket, sbuf);
disconnect_server(client->link, false, "Server connection closed");
break;
case SBUF_EV_READ:
- if (mbuf_avail(data) < NEW_HEADER_LEN && client->state != CL_LOGIN) {
+ if (mbuf_avail_for_read(data) < NEW_HEADER_LEN && client->state != CL_LOGIN) {
slog_noise(client, "C: got partial header, trying to wait a bit");
return false;
}
#include "bouncer.h"
/*
- * parse protocol header from MBuf
+ * parse protocol header from struct MBuf
*/
/* parses pkt header from buffer, returns false if failed */
-bool get_header(MBuf *data, PktHdr *pkt)
+bool get_header(struct MBuf *data, PktHdr *pkt)
{
unsigned type;
- unsigned len;
- unsigned code;
+ uint32_t len;
unsigned got;
unsigned avail;
- MBuf hdr;
+ uint16_t len16;
+ uint8_t type8;
+ uint32_t code;
+ struct MBuf hdr;
+ const uint8_t *ptr;
mbuf_copy(data, &hdr);
- if (mbuf_avail(&hdr) < NEW_HEADER_LEN) {
+ if (mbuf_avail_for_read(&hdr) < NEW_HEADER_LEN) {
log_noise("get_header: less then 5 bytes available");
return false;
}
- type = mbuf_get_char(&hdr);
+ if (!mbuf_get_byte(&hdr, &type8))
+ return false;
+ type = type8;
if (type != 0) {
/* wire length does not include type byte */
- len = mbuf_get_uint32(&hdr) + 1;
+ if (!mbuf_get_uint32be(&hdr, &len))
+ return false;
+ len++;
got = NEW_HEADER_LEN;
} else {
- if (mbuf_get_char(&hdr) != 0) {
+ if (!mbuf_get_byte(&hdr, &type8))
+ return false;
+ if (type8 != 0) {
log_noise("get_header: unknown special pkt");
return false;
}
/* dont tolerate partial pkt */
- if (mbuf_avail(&hdr) < OLD_HEADER_LEN - 2) {
+ if (mbuf_avail_for_read(&hdr) < OLD_HEADER_LEN - 2) {
log_noise("get_header: less than 8 bytes for special pkt");
return false;
}
- len = mbuf_get_uint16(&hdr);
- code = mbuf_get_uint32(&hdr);
+ if (!mbuf_get_uint16be(&hdr, &len16))
+ return false;
+ len = len16;
+ if (!mbuf_get_uint32be(&hdr, &code))
+ return false;
if (code == PKT_CANCEL)
type = PKT_CANCEL;
else if (code == PKT_SSLREQ)
pkt->len = len;
/* fill pkt with only data for this packet */
- if (len > mbuf_avail(data))
- avail = mbuf_avail(data);
+ if (len > mbuf_avail_for_read(data))
+ avail = mbuf_avail_for_read(data);
else
avail = len;
- mbuf_slice(data, avail, &pkt->data);
+ if (!mbuf_slice(data, avail, &pkt->data))
+ return false;
/* tag header as read */
- mbuf_get_bytes(&pkt->data, got);
-
- return true;
+ return mbuf_get_bytes(&pkt->data, got, &ptr);
}
void parse_server_error(PktHdr *pkt, const char **level_p, const char **msg_p)
{
const char *level = NULL, *msg = NULL, *val;
- int type;
- while (mbuf_avail(&pkt->data)) {
- type = mbuf_get_char(&pkt->data);
+ uint8_t type;
+ while (mbuf_avail_for_read(&pkt->data)) {
+ if (!mbuf_get_byte(&pkt->data, &type))
+ break;
if (type == 0)
break;
- val = mbuf_get_string(&pkt->data);
- if (!val)
+ if (!mbuf_get_string(&pkt->data, &val))
break;
if (type == 'S')
level = val;
/* answer server authentication request */
bool answer_authreq(PgSocket *server, PktHdr *pkt)
{
- unsigned cmd;
+ uint32_t cmd;
const uint8_t *salt;
bool res = false;
/* authreq body must contain 32bit cmd */
- if (mbuf_avail(&pkt->data) < 4)
+ if (mbuf_avail_for_read(&pkt->data) < 4)
return false;
- cmd = mbuf_get_uint32(&pkt->data);
+ if (!mbuf_get_uint32be(&pkt->data, &cmd))
+ return false;
switch (cmd) {
case 0:
slog_debug(server, "S: auth ok");
break;
case 4:
slog_debug(server, "S: req crypt psw");
- if (mbuf_avail(&pkt->data) < 2)
+ if (!mbuf_get_bytes(&pkt->data, 2, &salt))
return false;
- salt = mbuf_get_bytes(&pkt->data, 2);
res = login_crypt_psw(server, salt);
break;
case 5:
slog_debug(server, "S: req md5-crypted psw");
- if (mbuf_avail(&pkt->data) < 4)
+ if (!mbuf_get_bytes(&pkt->data, 4, &salt))
return false;
- salt = mbuf_get_bytes(&pkt->data, 4);
res = login_md5_psw(server, salt);
break;
case 2: /* kerberos */
return pktbuf_send_immidiate(&pkt, server);
}
-int scan_text_result(MBuf *pkt, const char *tupdesc, ...)
+int scan_text_result(struct MBuf *pkt, const char *tupdesc, ...)
{
- char *val = NULL;
- int len;
- unsigned ncol, i, asked;
+ const char *val = NULL;
+ uint32_t len;
+ uint16_t ncol;
+ unsigned i, asked;
va_list ap;
int *int_p;
uint64_t *long_p;
- char **str_p;
+ const char **str_p;
asked = strlen(tupdesc);
- ncol = mbuf_get_uint16(pkt);
+ if (!mbuf_get_uint16be(pkt, &ncol))
+ return -1;
va_start(ap, tupdesc);
for (i = 0; i < asked; i++) {
if (i < ncol) {
- len = mbuf_get_uint32(pkt);
- if (len < 0)
+ if (!mbuf_get_uint32be(pkt, &len))
+ return -1;
+ if ((int32_t)len < 0) {
val = NULL;
- else
- val = (char *)mbuf_get_bytes(pkt, len);
+ } else {
+ if (!mbuf_get_chars(pkt, len, &val))
+ return -1;
+ }
/* hack to zero-terminate the result */
if (val) {
- val--;
- memmove(val, val + 1, len);
- val[len] = 0;
+ char *xval = (char *)val - 1;
+ memmove(xval, val, len);
+ xval[len] = 0;
+ val = xval;
}
} else
/* tuple was shorter than requested */
*long_p = atoll(val);
break;
case 's':
- str_p = va_arg(ap, char **);
+ str_p = va_arg(ap, const char **);
*str_p = val;
break;
default:
*************************/
/*
- * Call proto callback with proper MBuf.
+ * Call proto callback with proper struct MBuf.
*
* If callback returns true it used one of sbuf_prepare_* on sbuf,
* and processing can continue.
*/
static bool sbuf_call_proto(SBuf *sbuf, int event)
{
- MBuf mbuf;
+ struct MBuf mbuf;
IOBuf *io = sbuf->io;
bool res;
if (incomplete_pkt(pkt))
return false;
- key = mbuf_get_string(&pkt->data);
- val = mbuf_get_string(&pkt->data);
- if (!key || !val) {
- disconnect_server(server, true, "broken ParameterStatus packet");
- return false;
- }
+ if (!mbuf_get_string(&pkt->data, &key))
+ goto failed;
+ if (!mbuf_get_string(&pkt->data, &val))
+ goto failed;
slog_debug(server, "S: param: %s = %s", key, val);
varcache_set(&server->vars, key, val);
add_welcome_parameter(server->pool, key, val);
return true;
+failed:
+ disconnect_server(server, true, "broken ParameterStatus packet");
+ return false;
}
/* we cannot log in at all, notify clients */
{
SBuf *sbuf = &server->sbuf;
bool res = false;
+ const uint8_t *ckey;
if (incomplete_pkt(pkt)) {
disconnect_server(server, true, "partial pkt in login phase");
/* ignorable packets */
case 'K': /* BackendKeyData */
- if (mbuf_avail(&pkt->data) >= BACKENDKEY_LEN)
- memcpy(server->cancel_key,
- mbuf_get_bytes(&pkt->data, BACKENDKEY_LEN),
- BACKENDKEY_LEN);
+ if (!mbuf_get_bytes(&pkt->data, BACKENDKEY_LEN, &ckey)) {
+ disconnect_server(server, true, "bad cancel key");
+ return false;
+ }
+ memcpy(server->cancel_key, ckey, BACKENDKEY_LEN);
res = true;
break;
case 'Z': /* ReadyForQuery */
/* if partial pkt, wait */
- if (mbuf_avail(&pkt->data) == 0)
+ if (!mbuf_get_char(&pkt->data, &state))
return false;
- state = mbuf_get_char(&pkt->data);
/* set ready only if no tx */
if (state == 'I')
}
/* callback from SBuf */
-bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data)
+bool server_proto(SBuf *sbuf, SBufEvent evtype, struct MBuf *data)
{
bool res = false;
PgSocket *server = container_of(sbuf, PgSocket, sbuf);
disconnect_client(server->link, false, "unexpected eof");
break;
case SBUF_EV_READ:
- if (mbuf_avail(data) < NEW_HEADER_LEN) {
+ if (mbuf_avail_for_read(data) < NEW_HEADER_LEN) {
slog_noise(server, "S: got partial header, trying to wait a bit");
break;
}
}
/* parse msg for fd and info */
-static void takeover_load_fd(MBuf *pkt, const struct cmsghdr *cmsg)
+static void takeover_load_fd(struct MBuf *pkt, const struct cmsghdr *cmsg)
{
int fd;
char *task, *saddr, *user, *db;
got = scan_text_result(pkt, "issssiqissss", &oldfd, &task, &user, &db,
&saddr, &port, &ckey, &linkfd,
&client_enc, &std_string, &datestyle, &timezone);
- if (task == NULL || saddr == NULL)
+ if (got < 0 || task == NULL || saddr == NULL)
fatal("NULL data from old process");
log_debug("FD row: fd=%d(%d) linkfd=%d task=%s user=%s db=%s enc=%s",
}
}
-static void next_command(PgSocket *bouncer, MBuf *pkt)
+static void next_command(PgSocket *bouncer, struct MBuf *pkt)
{
bool res = true;
- const char *cmd = mbuf_get_string(pkt);
+ const char *cmd;
+
+ if (!mbuf_get_string(pkt, &cmd))
+ fatal("bad result pkt");
log_debug("takeover_recv_fds: 'C' body: %s", cmd);
if (strcmp(cmd, "SUSPEND") == 0) {
}
static void takeover_parse_data(PgSocket *bouncer,
- struct msghdr *msg, MBuf *data)
+ struct msghdr *msg, struct MBuf *data)
{
struct cmsghdr *cmsg;
PktHdr pkt;
cmsg = msg->msg_controllen ? CMSG_FIRSTHDR(msg) : NULL;
- while (mbuf_avail(data) > 0) {
+ while (mbuf_avail_for_read(data) > 0) {
if (!get_header(data, &pkt))
fatal("cannot parse packet");
struct msghdr msg;
struct iovec io;
int res;
- MBuf data;
+ struct MBuf data;
memset(&msg, 0, sizeof(msg));
io.iov_base = data_buf;
res = safe_recvmsg(sock, &msg, 0);
if (res > 0) {
- mbuf_init(&data, data_buf, res);
+ mbuf_init_fixed_reader(&data, data_buf, res);
takeover_parse_data(bouncer, &msg, &data);
} else if (res == 0) {
fatal("unexpected EOF");