+2007-04-11 - PgBouncer 1.0.3 - "Fearless Fork"
+
+ = Fixes =
+
+ * Some error handling was missing in login path, so dying
+ connection there could trigger asserts.
+ * Cleanup of asserts in sbuf.c to catch problems earlier.
+ * Create core when Assert() triggers.
+
+ = New stuff =
+
+ * New config vars: log_connections, log_disconnections,
+ log_pooler_errors to turn on/off noise.
+ * Config var: client_login_timeout to kill dead connections
+ in login phase that could stall SUSPEND and thus online restart.
+
2007-03-28 - PgBouncer 1.0.2 - "Supersonic Spoon"
* libevent may report a deleted event inside same loop.
dnl Process this file with autoconf to produce a configure script.
-AC_INIT(pgbouncer, 1.0.2)
+AC_INIT(pgbouncer, 1.0.3)
AC_CONFIG_SRCDIR(src/bouncer.h)
AC_CONFIG_HEADER(config.h)
-pgbouncer (1.0.2) unstable; urgency=low
+pgbouncer (1.0.3-1) unstable; urgency=low
+
+ * more error handling fixes.
+
+ -- Marko Kreen <marko.kreen@skype.net> Tue, 10 Apr 2007 17:22:49 +0300
+
+pgbouncer (1.0.2-1) unstable; urgency=low
* 2 more bugs.
How many server connection to allow per user/database pair.
Can be overrided in per-database config.
+
+
+=== Log settings ===
+
+==== log_connections ====
+
+Log successful logins.
+
+Default: 1
+
+==== log_disconnections ====
+
+Log disconnections with reasons.
+
+Default: 1
+
+==== log_pooler_errors ====
+
+Log error messaged pooler sends to clients.
+
+Default: 1
+
=== Console access control ===
==== admin_users ====
List of users that are allowed to run read-only queries on console.
Thats means all SHOW commands except SHOW FDS.
+
+
=== Connection sanity checks, timeouts ===
==== server_check_delay ====
==== client_idle_timeout ====
-Client connections idling longer than that are closed.
+Client connections idling longer than that are closed. This should be
+larger then client-side connection lifetime settings, to apply only for
+network problems.
+
+Default: 0 (disabled)
+
+==== client_login_timeout ====
+
+If client connect but does not manage to login in this time,
+it will be disconnected. Mainly needed to avoid dead connections
+stalling SUSPEND and thus online restart.
Default: 0 (disabled)
+
=== Low-level network settings ===
==== pkt_buf ====
==== tcp_keepintvl ====
Default: not set
+
+
== Section [databases] ==
This contains key=value pairs where key will be taken as database name and value as
== Docs ==
* Detailed usage info: ./UsageInfo
- * COnfig file help: ./ConfigFile
+ * Config file help: ./ConfigFile
* TODO list: ./ToDo
* SHUTDOWN cmd should print notice?
* before loading users, disable all existing?
- * log_connects, log_disconnects settings
-
* Split CL_IDLE from CL_ACTIVE. Does not give functional
advantage, but makes monitoring easier.
max_client_conn = 100
default_pool_size = 20
+log_connections = 1
+log_disconnections = 1
+
+; log error messages pooler sends to clients
+log_pooler_errors = 1
+
;;;
;;; Timeouts
;;;
;; Should be used to survive network problems. (default: 0)
;client_idle_timeout = 0
+client_login_timeout = 0
;;;
;;; Low-level tuning options
[databases]
-marko = host=127.0.0.1
+marko = host=127.0.0.1 port=7000
[pgbouncer]
logfile = lib/pgbouncer.log
pidfile = lib/pgbouncer.pid
-#listen_addr = 127.0.0.1
+listen_addr = 127.0.0.1
listen_port = 6000
unix_socket_dir = /tmp
; any, trust, plain, crypt, md5
-auth_type = trust
+auth_type = md5
auth_file = etc/test.users
; When server connection is released back to pool:
pool_mode = transaction
server_check_query = select 1
-server_check_delay = 10
-max_client_conn = 2000
-default_pool_size = 80
+server_check_delay = 5
+max_client_conn = 100
+default_pool_size = 30
admin_users = plproxy
stats_users = marko
stats_period = 60
+log_connections = 0
+log_disconnections = 0
+log_pooler_errors = 0
+
+; short timeouts
+server_lifetime = 5
+server_idle_timeout = 3
+server_connect_timeout = 1
+server_login_retry = 1
+query_timeout = 10
+client_idle_timeout = 10
+
+client_login_timeout = 3
+
"webstore" "" ""
"wypbe" "md57e17e9c6cfde1c1f6f9155071d7d18a8" ""
"wypfe" "md5e3b7c35f688032d97ab066210a33184b" ""
-"marko" "funky"
+"marko" "kama"
pktbuf_write_DataRow(buf, debug ? SKF_DBG : SKF_STD,
is_server_socket(sk) ? "S" :"C",
- sk->auth_user->name,
- sk->pool->db->name,
+ sk->auth_user ? sk->auth_user->name : "(nouser)",
+ sk->pool ? sk->pool->db->name : "(nodb)",
state, addr, sk->addr.port,
sk->connect_time,
sk->request_time,
socket_header(buf, true);
statlist_for_each(item, &pool_list) {
pool = container_of(item, PgPool, head);
- show_socket_list(buf, &pool->active_client_list, "active", true);
- show_socket_list(buf, &pool->waiting_client_list, "waiting", true);
-
- show_socket_list(buf, &pool->active_server_list, "active", true);
- show_socket_list(buf, &pool->idle_server_list, "idle", true);
- show_socket_list(buf, &pool->used_server_list, "used", true);
- show_socket_list(buf, &pool->tested_server_list, "tested", true);
- show_socket_list(buf, &pool->new_server_list, "login", true);
+ show_socket_list(buf, &pool->active_client_list, "cl_active", true);
+ show_socket_list(buf, &pool->waiting_client_list, "cl_waiting", true);
+
+ show_socket_list(buf, &pool->active_server_list, "sv_active", true);
+ show_socket_list(buf, &pool->idle_server_list, "sv_idle", true);
+ show_socket_list(buf, &pool->used_server_list, "sv_used", true);
+ show_socket_list(buf, &pool->tested_server_list, "sv_tested", true);
+ show_socket_list(buf, &pool->new_server_list, "sv_login", true);
}
+ show_socket_list(buf, &login_client_list, "cl_login", true);
admin_flush(admin, buf, "SHOW");
return true;
}
extern usec_t cf_server_login_retry;
extern usec_t cf_query_timeout;
extern usec_t cf_client_idle_timeout;
+extern usec_t cf_client_login_timeout;
extern int cf_auth_type;
extern char *cf_auth_file;
extern int cf_tcp_socket_buffer;
extern int cf_tcp_defer_accept;
+extern int cf_log_connections;
+extern int cf_log_disconnections;
+extern int cf_log_pooler_errors;
+
extern ConfElem bouncer_params[];
disconnect_client(client, true, "No database supplied");
return false;
}
- slog_debug(client, "login request: db=%s user=%s", dbname, username);
+
+ if (cf_log_connections)
+ slog_info(client, "login request: db=%s user=%s", dbname, username);
/* check if limit allows, dont limit admin db
nb: new incoming conn will be attached to PgSocket, thus
case PKT_SSLREQ:
log_noise("C: req SSL");
log_noise("P: nak");
- sbuf_answer(&client->sbuf, "N", 1);
+
+ /* reject SSL attempt */
+ if (!sbuf_answer(&client->sbuf, "N", 1)) {
+ disconnect_client(client, false, "failed to nak SSL");
+ return false;
+ }
break;
case PKT_STARTUP:
if (mbuf_avail(pkt) < pkt_len - 8) {
if (!finish_client_login(client))
return false;
} else {
- send_client_authreq(client);
+ if (!send_client_authreq(client)) {
+ disconnect_client(client, false, "failed to send auth req");
+ return false;
+ }
}
break;
case 'p': /* PasswordMessage */
Assert(client->state == CL_WAITING);
if (client->query_start == 0) {
age = now - client->request_time;
- log_warning("query_start==0");
+ //log_warning("query_start==0");
} else
age = now - client->query_start;
if (age > cf_query_timeout)
check_pool_size(pool);
}
+static void cleanup_client_logins(void)
+{
+ List *item, *tmp;
+ PgSocket *client;
+ usec_t age;
+ usec_t now = get_cached_time();
+
+ if (cf_client_login_timeout <= 0)
+ return;
+
+ statlist_for_each_safe(item, &login_client_list, tmp) {
+ client = container_of(item, PgSocket, head);
+ age = now - client->connect_time;
+ if (age > cf_client_login_timeout)
+ disconnect_client(client, true, "client_login_timeout");
+ }
+}
+
/* full-scale maintenenace, done only occasionally */
static void do_full_maint(int sock, short flags, void *arg)
{
pool_client_maint(pool);
}
+ cleanup_client_logins();
+
if (cf_shutdown && get_active_server_count() == 0) {
log_info("server connections dropped, exiting");
exit(0);
usec_t cf_server_login_retry = 15*USEC;
usec_t cf_query_timeout = 0*USEC;
usec_t cf_client_idle_timeout = 0*USEC;
+usec_t cf_client_login_timeout = 0*USEC;
char *cf_logfile = NULL;
char *cf_pidfile = NULL;
char *cf_stats_users = "";
int cf_stats_period = 60;
+int cf_log_connections = 1;
+int cf_log_disconnections = 1;
+int cf_log_pooler_errors = 1;
/*
* config file description
{"server_check_delay", true, CF_TIME, &cf_server_check_delay},
{"query_timeout", true, CF_TIME, &cf_query_timeout},
{"client_idle_timeout", true, CF_TIME, &cf_client_idle_timeout},
+{"client_login_timeout",true, CF_TIME, &cf_client_login_timeout},
{"server_lifetime", true, CF_TIME, &cf_server_lifetime},
{"server_idle_timeout", true, CF_TIME, &cf_server_idle_timeout},
{"server_connect_timeout",true, CF_TIME, &cf_server_connect_timeout},
{"admin_users", true, CF_STR, &cf_admin_users},
{"stats_users", true, CF_STR, &cf_stats_users},
{"stats_period", true, CF_INT, &cf_stats_period},
+{"log_connections", true, CF_INT, &cf_log_connections},
+{"log_disconnections", true, CF_INT, &cf_log_disconnections},
+{"log_pooler_errors", true, CF_INT, &cf_log_pooler_errors},
{NULL},
};
* is called from somewhere else. So hide just freed
* PgSockets for one loop.
*/
-STATLIST(justfree_client_list);
-STATLIST(justfree_server_list);
+static STATLIST(justfree_client_list);
+static STATLIST(justfree_server_list);
/* how many client sockets are allocated */
static int absolute_client_count = 0;
}
/* connecting/active -> idle, unlink if needed */
-void release_server(PgSocket *server)
+bool release_server(PgSocket *server)
{
PgPool *pool = server->pool;
SocketState newstate = SV_IDLE;
- /* btw, this function is not allowed to disconnect,
- as there may be packet pending */
Assert(server->ready);
/* remove from old list */
/* immidiately process waiters, to give fair chance */
if (newstate == SV_IDLE) {
PgSocket *client = first_socket(&pool->waiting_client_list);
- if (client)
+ if (client) {
activate_client(client);
+
+ /*
+ * As the activate_client() does full read loop,
+ * then it may happen that linked client close
+ * couses server close. Report it.
+ */
+ switch (server->state) {
+ case SV_FREE:
+ case SV_JUSTFREE:
+ return false;
+ default:
+ break;
+ }
+ }
}
+ return true;
}
/* drop server connection */
static const uint8 pkt_term[] = {'X', 0,0,0,4};
int send_term = 1;
- log_debug("disconnect_server");
- slog_info(server, "closing because: %s", reason);
+ if (cf_log_disconnections)
+ slog_info(server, "closing because: %s", reason);
switch (server->state) {
case SV_ACTIVE:
/* drop client connection */
void disconnect_client(PgSocket *client, bool notify, const char *reason)
{
- slog_debug(client, "closing because: %s", reason);
+ if (cf_log_disconnections)
+ slog_debug(client, "closing because: %s", reason);
switch (client->state) {
case CL_ACTIVE:
pool->last_connect_time = get_cached_time();
change_server_state(server, SV_LOGIN);
+ if (cf_log_connections)
+ slog_info(server, "new connection to server");
+
/* start connecting */
- slog_info(server, "new connection to server");
sbuf_connect(&server->sbuf, &server->addr, cf_server_connect_timeout / USEC);
}
client->addr.is_unix = is_unix;
change_client_state(client, CL_LOGIN);
- slog_debug(client, "got connection attempt");
+ if (cf_log_connections)
+ slog_debug(client, "got connection attempt");
sbuf_accept(&client->sbuf, sock, is_unix);
return client;
PgUser *find_user(const char *name);
PgPool *get_pool(PgDatabase *, PgUser *);
bool find_server(PgSocket *client);
-void release_server(PgSocket *server);
+bool release_server(PgSocket *server);
bool finish_client_login(PgSocket *client);
PgSocket * accept_client(int sock, const struct sockaddr_in *addr, bool is_unix);
return false;
res = safe_send(fd, pos, amount, 0);
if (res < 0) {
- log_error("pktbuf_send_immidiate: %s", strerror(errno));
+ log_debug("pktbuf_send_immidiate: %s", strerror(errno));
}
return res == amount;
}
uint8 tmpbuf[512];
PktBuf buf;
- slog_error(client, "Pooler Error: %s", msg);
+ if (cf_log_pooler_errors)
+ slog_info(client, "Pooler Error: %s", msg);
pktbuf_static(&buf, tmpbuf, sizeof(tmpbuf));
pktbuf_write_generic(&buf, 'E', "cscscsc",
*/
/* actual packet send */
-static void send_password(PgSocket *server, const char *enc_psw)
+static bool send_password(PgSocket *server, const char *enc_psw)
{
bool res;
SEND_PasswordMessage(res, server, enc_psw);
- if (!res)
- disconnect_server(server, true,
- "partial send unhandled in send_password");
+ return res;
}
-static void login_clear_psw(PgSocket *server)
+static bool login_clear_psw(PgSocket *server)
{
log_debug("P: send clear password");
- send_password(server, server->pool->user->passwd);
+ return send_password(server, server->pool->user->passwd);
}
-static void login_crypt_psw(PgSocket *server, const uint8 *salt)
+static bool login_crypt_psw(PgSocket *server, const uint8 *salt)
{
char saltbuf[3];
const char *enc;
log_debug("P: send crypt password");
strncpy(saltbuf, (char *)salt, 2);
enc = pg_crypt(user->passwd, saltbuf);
- send_password(server, enc);
+ return send_password(server, enc);
}
-static void login_md5_psw(PgSocket *server, const uint8 *salt)
+static bool login_md5_psw(PgSocket *server, const uint8 *salt)
{
char txt[MD5_PASSWD_LEN + 1], *src;
PgUser *user = server->pool->user;
src = user->passwd + 3;
pg_md5_encrypt(src, (char *)salt, 4, txt);
- send_password(server, txt);
+ return send_password(server, txt);
}
/* answer server authentication request */
{
unsigned cmd;
const uint8 *salt;
+ bool res = false;
if (pkt_len < 5 + 4)
return false;
switch (cmd) {
case 0:
log_debug("S: auth ok");
+ res = true;
break;
case 3:
log_debug("S: req cleartext password");
- login_clear_psw(server);
+ res = login_clear_psw(server);
break;
case 4:
if (pkt_len < 5 + 4 + 2)
return false;
log_debug("S: req crypt psw");
salt = mbuf_get_bytes(pkt, 2);
- login_crypt_psw(server, salt);
+ res = login_crypt_psw(server, salt);
break;
case 5:
if (pkt_len < 5 + 4 + 4)
return false;
log_debug("S: req md5-crypted psw");
salt = mbuf_get_bytes(pkt, 4);
- login_md5_psw(server, salt);
+ res = login_md5_psw(server, salt);
break;
case 2: /* kerberos */
case 6: /* scm something */
log_error("unsupported auth method: %d", cmd);
+ res = false;
+ break;
default:
log_error("unknown auth method: %d", cmd);
+ res = false;
+ break;
}
- return true;
+ return res;
}
bool send_startup_packet(PgSocket *server)
*/
#define SMALL_PKT 16
+#define AssertSanity(sbuf) do { \
+ Assert((sbuf)->send_pos >= 0); \
+ Assert((sbuf)->send_pos <= (sbuf)->pkt_pos); \
+ Assert((sbuf)->pkt_pos <= (sbuf)->recv_pos); \
+ Assert((sbuf)->recv_pos <= cf_sbuf_len); \
+ Assert((sbuf)->pkt_remain >= 0); \
+ Assert((sbuf)->send_remain >= 0); \
+} while (0)
+
+#define AssertActive(sbuf) do { \
+ Assert((sbuf)->sock > 0); \
+ AssertSanity(sbuf); \
+} while (0)
+
/* declare static stuff */
static void sbuf_queue_send(SBuf *sbuf);
static bool sbuf_send_pending(SBuf *sbuf);
static void sbuf_send_cb(int sock, short flags, void *arg);
static void sbuf_try_resync(SBuf *sbuf);
static void sbuf_wait_for_data(SBuf *sbuf);
+static bool sbuf_call_proto(SBuf *sbuf, int event);
-/*
- * Call proto callback with proper MBuf.
- *
- * If callback returns true it used one of sbuf_prepare_* on sbuf,
- * and processing can continue.
- *
- * If it returned false it used sbuf_pause(), sbuf_close() or simply
- * wants to wait for next event loop (eg. too few data available).
- * Callee should not touch sbuf in that case and just return to libevent.
- */
-static inline bool sbuf_call_proto(SBuf *sbuf, int event)
-{
- MBuf mbuf;
- uint8 *pos = sbuf->buf + sbuf->pkt_pos;
- int avail = sbuf->recv_pos - sbuf->pkt_pos;
-
- Assert(avail >= 0);
- Assert(pos + avail <= sbuf->buf + cf_sbuf_len);
- Assert(event != SBUF_EV_READ || avail > 0);
-
- mbuf_init(&mbuf, pos, avail);
- return sbuf->proto_handler(sbuf, event, &mbuf, sbuf->arg);
-}
-
-/* lets wait for new data */
-static void sbuf_wait_for_data(SBuf *sbuf)
-{
- event_set(&sbuf->ev, sbuf->sock, EV_READ | EV_PERSIST, sbuf_recv_cb, sbuf);
- event_add(&sbuf->ev, NULL);
-}
+/*********************************
+ * Public functions
+ *********************************/
/* initialize SBuf with proto handler */
void sbuf_init(SBuf *sbuf, sbuf_proto_cb_t proto_fn, void *arg)
/* got new socket from accept() */
void sbuf_accept(SBuf *sbuf, int sock, bool is_unix)
{
- Assert(sbuf->pkt_pos == 0);
- Assert(sbuf->recv_pos == 0);
- Assert(sbuf->send_pos == 0);
+ Assert(sbuf->recv_pos == 0 && sbuf->sock == 0);
+ AssertSanity(sbuf);
tune_socket(sock, is_unix);
sbuf->sock = sock;
socklen_t len;
struct timeval timeout;
+ Assert(sbuf->recv_pos == 0 && sbuf->sock == 0);
+ AssertSanity(sbuf);
+
/* prepare sockaddr */
if (addr->is_unix) {
sa = (void*)&sa_un;
/* dont wait for data on this socket */
void sbuf_pause(SBuf *sbuf)
{
+ AssertActive(sbuf);
Assert(sbuf->wait_send == 0);
event_del(&sbuf->ev);
/* resume from pause, start waiting for data */
void sbuf_continue(SBuf *sbuf)
{
+ AssertActive(sbuf);
+
sbuf_wait_for_data(sbuf);
/* there is some data already received */
*/
void sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb user_cb)
{
+ AssertActive(sbuf);
+
event_set(&sbuf->ev, sbuf->sock, EV_READ | EV_PERSIST,
user_cb, sbuf->arg);
event_add(&sbuf->ev, NULL);
}
/* proto_fn tells to send some bytes to socket */
-void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, unsigned amount, bool flush)
+void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, int amount, bool flush)
{
+ AssertActive(sbuf);
Assert(sbuf->pkt_remain == 0);
Assert(sbuf->pkt_skip == 0 || sbuf->send_remain == 0);
Assert(!sbuf->pkt_flush || sbuf->send_remain == 0);
/* proto_fn tells to skip sone amount of bytes */
void sbuf_prepare_skip(SBuf *sbuf, int amount)
{
+ AssertActive(sbuf);
Assert(sbuf->pkt_remain == 0);
Assert(sbuf->pkt_skip == 0 || sbuf->send_remain == 0);
Assert(!sbuf->pkt_flush || sbuf->send_remain == 0);
sbuf->dst = NULL;
}
+/*************************
+ * Internal functions
+ *************************/
+
+/*
+ * Call proto callback with proper MBuf.
+ *
+ * If callback returns true it used one of sbuf_prepare_* on sbuf,
+ * and processing can continue.
+ *
+ * If it returned false it used sbuf_pause(), sbuf_close() or simply
+ * wants to wait for next event loop (eg. too few data available).
+ * Callee should not touch sbuf in that case and just return to libevent.
+ */
+static bool sbuf_call_proto(SBuf *sbuf, int event)
+{
+ MBuf mbuf;
+ uint8 *pos = sbuf->buf + sbuf->pkt_pos;
+ int avail = sbuf->recv_pos - sbuf->pkt_pos;
+ bool res;
+
+ AssertSanity(sbuf);
+ Assert(event != SBUF_EV_READ || avail > 0);
+
+ mbuf_init(&mbuf, pos, avail);
+ res = sbuf->proto_handler(sbuf, event, &mbuf, sbuf->arg);
+
+ AssertSanity(sbuf);
+ if (event == SBUF_EV_READ && res)
+ Assert(sbuf->sock > 0);
+
+ return res;
+}
+
+/* lets wait for new data */
+static void sbuf_wait_for_data(SBuf *sbuf)
+{
+ event_set(&sbuf->ev, sbuf->sock, EV_READ | EV_PERSIST, sbuf_recv_cb, sbuf);
+ event_add(&sbuf->ev, NULL);
+}
+
/* libevent EV_WRITE: called when dest socket is writable again */
static void sbuf_send_cb(int sock, short flags, void *arg)
{
if (!sbuf->sock)
return;
+ AssertSanity(sbuf);
+
/* prepare normal situation for sbuf_recv_cb() */
sbuf->wait_send = 0;
sbuf_wait_for_data(sbuf);
/* socket is full, wait until its writable again */
static void sbuf_queue_send(SBuf *sbuf)
{
+ AssertActive(sbuf);
+
sbuf->wait_send = 1;
event_del(&sbuf->ev);
event_set(&sbuf->ev, sbuf->dst->sock, EV_WRITE, sbuf_send_cb, sbuf);
int res, avail;
uint8 *pos;
+ AssertActive(sbuf);
Assert(sbuf->dst || !sbuf->send_remain);
try_more:
if (avail > sbuf->send_remain)
avail = sbuf->send_remain;
if (avail == 0)
- return true;
+ goto all_sent;
if (sbuf->dst->sock == 0) {
log_error("sbuf_send_pending: no dst sock?");
/* actually send it */
pos = sbuf->buf + sbuf->send_pos;
res = safe_send(sbuf->dst->sock, pos, avail, 0);
- if (res >= 0) {
- sbuf->send_remain -= res;
- sbuf->send_pos += res;
-
- if (res < avail) {
- /*
- * Should do sbuf_queue_send() immidiately?
- *
- * To be sure, lets run into EAGAIN.
- */
- goto try_more;
- }
- return true;
- } else if (errno == EAGAIN) {
- sbuf_queue_send(sbuf);
- return false;
- } else {
- sbuf_call_proto(sbuf, SBUF_EV_SEND_FAILED);
+ if (res < 0) {
+ if (errno == EAGAIN)
+ sbuf_queue_send(sbuf);
+ else
+ sbuf_call_proto(sbuf, SBUF_EV_SEND_FAILED);
return false;
}
+
+ sbuf->send_remain -= res;
+ sbuf->send_pos += res;
+
+ AssertActive(sbuf);
+
+ /*
+ * Should do sbuf_queue_send() immidiately?
+ *
+ * To be sure, lets run into EAGAIN.
+ */
+ if (res < avail)
+ goto try_more;
+
+all_sent:
+
+ /* send_pos may lag pkt_pos in case of skip packets, move it here */
+ if (sbuf->send_remain == 0 && sbuf->send_pos < sbuf->pkt_pos)
+ sbuf->send_pos = sbuf->pkt_pos;
+
+ return true;
}
/* process as much data as possible */
bool res;
while (1) {
- Assert(sbuf->recv_pos >= sbuf->pkt_pos);
+ AssertActive(sbuf);
/*
* Enough for now?
{
int avail;
- if (sbuf->pkt_pos == 0)
+ AssertActive(sbuf);
+
+ if (sbuf->send_pos == 0)
return;
- if (sbuf->send_remain > 0)
- avail = sbuf->recv_pos - sbuf->send_pos;
- else
- avail = sbuf->recv_pos - sbuf->pkt_pos;
+ avail = sbuf->recv_pos - sbuf->send_pos;
if (avail == 0) {
sbuf->recv_pos = sbuf->pkt_pos = sbuf->send_pos = 0;
} else if (avail <= SMALL_PKT) {
- if (sbuf->send_remain > 0) {
- memmove(sbuf->buf, sbuf->buf + sbuf->send_pos, avail);
- sbuf->pkt_pos -= sbuf->send_pos;
- sbuf->send_pos = 0;
- sbuf->recv_pos = avail;
- } else {
- memmove(sbuf->buf, sbuf->buf + sbuf->pkt_pos, avail);
- sbuf->send_pos = 0;
- sbuf->pkt_pos = 0;
- sbuf->recv_pos = avail;
- }
+ memmove(sbuf->buf, sbuf->buf + sbuf->send_pos, avail);
+ sbuf->pkt_pos -= sbuf->send_pos;
+ sbuf->send_pos = 0;
+ sbuf->recv_pos = avail;
}
}
int got;
uint8 *pos;
+ AssertActive(sbuf);
+ Assert(len > 0);
+ Assert(sbuf->recv_pos + len <= cf_sbuf_len);
+
pos = sbuf->buf + sbuf->recv_pos;
got = safe_recv(sbuf->sock, pos, len, 0);
/* reading should be disabled when waiting */
Assert(sbuf->wait_send == 0);
+ AssertSanity(sbuf);
try_more:
/* make room in buffer */
err = getsockopt(sbuf->sock, SOL_SOCKET, SO_ERROR, (void*)&optval, &optlen);
if (err < 0) {
log_error("sbuf_after_connect_check: getsockopt: %s",
- strerror(errno));
+ strerror(errno));
return false;
}
if (optval != 0) {
log_error("sbuf_after_connect_check: pending error: %s",
- strerror(optval));
+ strerror(optval));
return false;
}
return true;
return false;
res = safe_send(sbuf->sock, buf, len, 0);
if (res < 0)
- log_error("sbuf_answer: error sending: %s", strerror(errno));
+ log_debug("sbuf_answer: error sending: %s", strerror(errno));
else if (res != len)
- log_error("sbuf_answer: partial send: len=%d sent=%d", len, res);
+ log_debug("sbuf_answer: partial send: len=%d sent=%d", len, res);
return res == len;
}
/* dest SBuf for current packet */
SBuf *dst;
- unsigned recv_pos;
- unsigned pkt_pos;
- unsigned pkt_remain;
- unsigned send_pos;
- unsigned send_remain;
+ int recv_pos;
+ int pkt_pos;
+ int pkt_remain;
+ int send_pos;
+ int send_remain;
unsigned wait_send:1;
unsigned pkt_skip:1;
void sbuf_close(SBuf *sbuf);
/* proto_fn can use those functions to order behaviour */
-void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, unsigned amount, bool flush);
+void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, int amount, bool flush);
void sbuf_prepare_skip(SBuf *sbuf, int amount);
bool sbuf_answer(SBuf *sbuf, const void *buf, int len);
case 'R': /* AuthenticationXXX */
log_debug("calling login_answer");
res = answer_authreq(server, pkt_type, pkt_len, pkt);
+ if (!res)
+ disconnect_server(server, false, "failed to answer authreq");
break;
case 'S': /* ParameterStatus */
res = add_welcome_parameter(server, pkt_type, pkt_len, pkt);
log_debug("server login ok, start accepting queries");
server->ready = 1;
+ /* got all params */
finish_welcome_msg(server);
- release_server(server);
+
+ // FIXME: res check
+ res = release_server(server);
/* let the takeover process handle it */
- if (server->pool->admin)
- takeover_login(server);
- res = true;
+ if (res && server->pool->admin)
+ res = takeover_login(server);
break;
/* ignorable packets */
switch (server->state) {
case SV_ACTIVE:
case SV_TESTED:
+ /* retval does not matter here */
release_server(server);
break;
default:
#endif
#ifdef CASSERT
-#define Assert(e) do { if (!(e)) fatal("Assert(%s) failed", #e); } while (0)
+#define Assert(e) do { if (!(e)) { \
+ fatal_noexit("Assert(%s) failed", #e); abort(); } } while (0)
#else
#define Assert(e)
#endif
disconnect_server(bouncer, false, "disko over");
cf_reboot = 0;
resume_all();
+ log_info("disko over, resuming work");
}
/* parse msg for fd and info */
* login finished, send first command,
* replace recv callback with custom recvmsg() based one.
*/
-void takeover_login(PgSocket *bouncer)
+bool takeover_login(PgSocket *bouncer)
{
bool res;
slog_info(bouncer, "Login OK, sending SUSPEND");
SEND_generic(res, bouncer, 'Q', "s", "SUSPEND;");
-
- /* use own callback */
- sbuf_pause(&bouncer->sbuf);
- sbuf_continue_with_callback(&bouncer->sbuf, takeover_recv_cb);
+ if (res) {
+ /* use own callback */
+ sbuf_pause(&bouncer->sbuf);
+ sbuf_continue_with_callback(&bouncer->sbuf, takeover_recv_cb);
+ } else {
+ disconnect_server(bouncer, false, "failed to send command");
+ }
+ return res;
}
/* launch connection to running process */
*/
void takeover_init(void);
-void takeover_login(PgSocket *bouncer);
+bool takeover_login(PgSocket *bouncer);
}
void _fatal(const char *file, int line, const char *func,
- const char *fmt, ...)
+ bool do_exit, const char *fmt, ...)
{
va_list ap;
char buf[1024];
va_start(ap, fmt);
_log("FATAL", buf, ap);
va_end(ap);
- if (cf_verbose > 2)
- abort();
- exit(1);
+ if (do_exit)
+ exit(1);
}
void _fatal_perror(const char *file, int line, const char *func,
va_start(ap, fmt);
vsnprintf(buf, sizeof(buf), fmt, ap);
va_end(ap);
- _fatal(file, line, func, "%s: %s", buf, strerror(errno));
+ _fatal(file, line, func, false, "%s: %s", buf, strerror(errno));
}
/*
/*
* log and exit
*/
-void _fatal(const char *file, int line, const char *func, const char *s, ...);
+void _fatal(const char *file, int line, const char *func, bool do_exit, const char *s, ...);
void _fatal_perror(const char *file, int line, const char *func, const char *s, ...);
#define fatal(args...) \
- _fatal(__FILE__, __LINE__, __FUNCTION__, ## args)
+ _fatal(__FILE__, __LINE__, __FUNCTION__, true, ## args)
+#define fatal_noexit(args...) \
+ _fatal(__FILE__, __LINE__, __FUNCTION__, false, ## args)
#define fatal_perror(args...) \
_fatal_perror(__FILE__, __LINE__, __FUNCTION__, ## args)