- Accept custom unix socket location in host=
- Accept quoted values: password=' asd''foo'
* server_reset_query, to be sent immidiately after release
- * Cancel pkt sent for idle connection does not drop it anymore,
- just ReadyForQuery is re-sent.
+ * keep track of following server variables:
+ client_encoding, datestyle, timezone, standard_conforming_strings
+ * Cancel pkt sent for idle connection does not drop it anymore.
+ * Cancel with ^C from psql works for SUSPEND / PAUSE.
* Print FD limits on startup.
* More debug log messages include socket info.
* When suspending, try to hit packet boundary ASAP.
= PgBouncer TODO list =
-== Small stuff ==
+== High-prio ==
* suspend_timeout - drop stalled conns
+ * create manpage
+ * report existing pidfile to console
+ * before loading users, disable all existing
== Low-prio ==
- * create manpage
* drop_on_error/keep_on_error - if released conn is in error state,
then issue rollback and keep it
- * report existing pidfile to console
+ * keep stats about error counts
== Make -R less scary ==
* if tcp - try binding
* if unix - try connect()
-== Suspicious items ==
+== Just ideas ==
- * keep track of requested client_encoding (+others) and update if needed?
- * original values in welcome pkt
- * static: integer_datetimes, server_encoding, server_version,
- * dynamic?: session_authorization, is_superuser?, standard_conforming_strings, TimeZone
- * keep stats about error counts
- * before loading users, disable all existing
* auth_conn - access to pg_shadow, so auth_file is not needed
+ * possibility to specify failover databases
create_auth_cache();
/* prepare welcome */
- pktbuf_static(&msg, db->welcome_msg, sizeof(db->welcome_msg));
+ pktbuf_static(&msg, pool->welcome_msg, sizeof(pool->welcome_msg));
pktbuf_write_AuthenticationOk(&msg);
pktbuf_write_ParameterStatus(&msg, "server_version", "8.0/bouncer");
pktbuf_write_ParameterStatus(&msg, "client_encoding", "UNICODE");
- pktbuf_write_ParameterStatus(&msg, "server_encoding", "UNICODE");
+ pktbuf_write_ParameterStatus(&msg, "server_encoding", "SQL_ASCII");
pktbuf_write_ParameterStatus(&msg, "is_superuser", "on");
- db->welcome_msg_len = pktbuf_written(&msg);
- db->welcome_msg_ready = 1;
+ pool->welcome_msg_len = pktbuf_written(&msg);
+ pool->welcome_msg_ready = 1;
pktbuf_static(&msg, db->startup_params, sizeof(db->startup_params));
pktbuf_put_string(&msg, "database");
#include "mbuf.h"
#include "sbuf.h"
#include "pktbuf.h"
+#include "varcache.h"
#include "admin.h"
#include "loader.h"
PgStats newer_stats;
PgStats older_stats;
+ /* database info to be sent to client */
+ uint8 welcome_msg[256];
+ unsigned welcome_msg_len;
+
+ VarCache orig_vars;
+
/* if last connect failed, there should be delay before next */
usec_t last_connect_time;
unsigned last_connect_failed:1;
unsigned admin:1;
+ unsigned welcome_msg_ready:1;
};
#define pool_server_count(pool) ( \
List head;
char name[MAX_DBNAME];
- /* database info to be sent to client */
- uint8 welcome_msg[512];
- unsigned welcome_msg_len;
- unsigned welcome_msg_ready:1;
-
unsigned db_paused:1;
/* key/val pairs (without user) for startup msg to be sent to server */
unsigned wait_for_response:1;
/* this (server) socket must be closed ASAP */
unsigned close_needed:1;
+ /* setting client vars */
+ unsigned setting_vars:1;
usec_t connect_time; /* when connection was made */
usec_t request_time; /* last activity time */
PgUser * auth_user;
PgAddr addr;
+ VarCache vars;
+
SBuf sbuf; /* stream buffer, must be last */
};
extern int cf_log_disconnections;
extern int cf_log_pooler_errors;
+extern int cf_disable_varcache;
+
extern ConfElem bouncer_params[];
dbname = val;
else if (strcmp(key, "user") == 0)
username = val;
+ else {
+ /* remember requested parameters */
+ if (varcache_set(&client->vars, key, val, true))
+ slog_debug(client, "got var: %s=%s", key, val);
+ }
}
if (!username) {
disconnect_client(client, true, "No username supplied");
if (!statlist_empty(&pool->idle_server_list)) {
/* db not fully initialized after reboot */
- if (client->wait_for_welcome && !pool->db->welcome_msg_ready) {
+ if (client->wait_for_welcome && !pool->welcome_msg_ready) {
launch_new_connection(pool);
continue;
}
int cf_log_disconnections = 1;
int cf_log_pooler_errors = 1;
+int cf_disable_varcache = 0;
+
/*
* config file description
*/
{"log_connections", true, CF_INT, &cf_log_connections},
{"log_disconnections", true, CF_INT, &cf_log_disconnections},
{"log_pooler_errors", true, CF_INT, &cf_log_pooler_errors},
+{"disable_varcache", true, CF_INT, &cf_disable_varcache},
{NULL},
};
sk->query_start = 0;
sk->auth_user = NULL;
+
+ varcache_clean(&sk->vars);
+ sk->setting_vars = 0;
}
/* allocate & fill client socket */
PgPool *pool = client->pool;
PgSocket *server;
bool res;
+ bool varchange = false;
Assert(client->state == CL_ACTIVE);
return true;
/* try to get idle server, if allowed */
- if (cf_pause_mode == P_PAUSE)
+ if (cf_pause_mode == P_PAUSE) {
server = NULL;
- else {
+ } else {
while (1) {
server = first_socket(&pool->idle_server_list);
if (!server || server->ready)
disconnect_server(server, true, "idle server got dirty");
}
}
+ Assert(!server || server->state == SV_IDLE);
+
+ /* send var changes */
+ if (server && !cf_disable_varcache) {
+ res = varcache_apply(server, client, &varchange);
+ if (!res) {
+ disconnect_server(server, true, "var change failed");
+ server = NULL;
+ }
+ }
/* link or send to waiters list */
if (server) {
- Assert(server->state == SV_IDLE);
client->link = server;
server->link = client;
change_server_state(server, SV_ACTIVE);
- res = true;
+ if (varchange) {
+ sbuf_pause(&client->sbuf);
+ res = false; /* don't process client data yet */
+ server->setting_vars = 1;
+ } else
+ res = true;
} else {
pause_client(client);
Assert(client->state == CL_WAITING);
/* is it allowed to add servers? */
total = pool_server_count(pool);
- if (total >= pool->db->pool_size && pool->db->welcome_msg_ready) {
+ if (total >= pool->db->pool_size && pool->welcome_msg_ready) {
log_debug("launch_new_connection: pool full (%d >= %d)",
total, pool->db->pool_size);
return;
bool add_welcome_parameter(PgSocket *server,
unsigned pkt_type, unsigned pkt_len, MBuf *pkt)
{
- PgDatabase *db = server->pool->db;
+ PgPool *pool = server->pool;
PktBuf msg;
const char *key, *val;
- if (db->welcome_msg_ready)
+ if (pool->welcome_msg_ready)
return true;
/* incomplete startup msg from server? */
if (pkt_len - 5 > mbuf_avail(pkt))
return false;
- pktbuf_static(&msg, db->welcome_msg + db->welcome_msg_len,
- sizeof(db->welcome_msg) - db->welcome_msg_len);
+ pktbuf_static(&msg, pool->welcome_msg + pool->welcome_msg_len,
+ sizeof(pool->welcome_msg) - pool->welcome_msg_len);
- if (db->welcome_msg_len == 0)
+ if (pool->welcome_msg_len == 0)
pktbuf_write_AuthenticationOk(&msg);
key = mbuf_get_string(pkt);
slog_error(server, "broken ParameterStatus packet");
return false;
}
+
slog_debug(server, "S: param: %s = %s", key, val);
- pktbuf_write_ParameterStatus(&msg, key, val);
- db->welcome_msg_len += pktbuf_written(&msg);
+ if (varcache_set(&pool->orig_vars, key, val, true)) {
+ slog_debug(server, "interesting var: %s=%s", key, val);
+ varcache_set(&server->vars, key, val, true);
+ } else {
+ slog_debug(server, "uninteresting var: %s=%s", key, val);
+ pktbuf_write_ParameterStatus(&msg, key, val);
+ pool->welcome_msg_len += pktbuf_written(&msg);
+ }
return true;
}
/* all parameters processed */
void finish_welcome_msg(PgSocket *server)
{
- PgDatabase *db = server->pool->db;
- if (db->welcome_msg_ready)
+ PgPool *pool = server->pool;
+ if (pool->welcome_msg_ready)
return;
- db->welcome_msg_ready = 1;
+ pool->welcome_msg_ready = 1;
}
bool welcome_client(PgSocket *client)
int res;
uint8 buf[1024];
PktBuf msg;
- PgDatabase *db = client->pool->db;
+ PgPool *pool = client->pool;
slog_noise(client, "P: welcome_client");
- if (!db->welcome_msg_ready)
+ if (!pool->welcome_msg_ready)
return false;
+ varcache_print(&client->vars, "welcome/client");
+ varcache_print(&client->pool->orig_vars, "welcome/pool");
+
pktbuf_static(&msg, buf, sizeof(buf));
- pktbuf_put_bytes(&msg, db->welcome_msg, db->welcome_msg_len);
+ pktbuf_put_bytes(&msg, pool->welcome_msg, pool->welcome_msg_len);
+
+ varcache_fill_unset(&pool->orig_vars, client);
+ varcache_add_params(&msg, &client->vars);
/* give each client its own cancel key */
get_random_bytes(client->cancel_key, 8);
#include "bouncer.h"
+static void check_parameters(PgSocket *server, MBuf *pkt, unsigned pkt_len)
+{
+ const char *key, *val;
+ PgSocket *client = server->link;
+
+ /* incomplete startup msg from server? */
+ if (pkt_len - 5 > mbuf_avail(pkt))
+ return;
+
+ key = mbuf_get_string(pkt);
+ val = mbuf_get_string(pkt);
+ if (!key || !val) {
+ slog_error(server, "broken ParameterStatus packet");
+ return;
+ }
+ slog_debug(server, "S: param: %s = %s", key, val);
+
+ varcache_set(&server->vars, key, val, true);
+
+ if (client) {
+ slog_debug(client, "setting client var: %s='%s'", key, val);
+ varcache_set(&client->vars, key, val, true);
+ }
+
+ return;
+}
+
/* process packets on server auth phase */
static bool handle_server_startup(PgSocket *server, MBuf *pkt)
{
slog_error(server, "unknown pkt from server: '%c'", pkt_type);
disconnect_server(server, true, "unknown pkt from server");
break;
+
case 'E': /* ErrorResponse */
log_server_error("S: login failed", pkt);
disconnect_server(server, true, "login failed");
if (!res)
disconnect_server(server, false, "failed to answer authreq");
break;
+
case 'S': /* ParameterStatus */
res = add_welcome_parameter(server, pkt_type, pkt_len, pkt);
break;
+
case 'Z': /* ReadyForQuery */
/* login ok */
slog_debug(server, "server login ok, start accepting queries");
memcpy(server->cancel_key, mbuf_get_bytes(pkt, 8), 8);
res = true;
break;
+
case 'N': /* NoticeResponse */
slog_noise(server, "skipping pkt: %c", pkt_type);
res = true;
"Long transactions not allowed");
return false;
}
+ break;
+
+ case 'S': /* ParameterStatus */
+ check_parameters(server, pkt, pkt_len);
+ break;
/*
* 'E' and 'N' packets currently set ->ready to 0. Correct would
* it later.
*/
case 'E': /* ErrorResponse */
+ if (server->setting_vars) {
+ /*
+ * the SET and user query will be different TX
+ * so we cannot report SET error to user.
+ */
+ log_server_error("varcache_apply failed", pkt);
+
+ /*
+ * client probably gave invalid values in startup pkt.
+ *
+ * no reason to keep such guys.
+ */
+ disconnect_client(server->link, true, "invalid server parameter");
+ }
case 'N': /* NoticeResponse */
+ break;
- /*
- * chat packets, but server (and thus pooler)
- * is allowed to buffer them until Sync or Flush
- * is sent by client.
- */
+ /* chat packets */
case '2': /* BindComplete */
case '3': /* CloseComplete */
case 'c': /* CopyDone(F/B) */
case 'd': /* CopyData(F/B) */
case 'D': /* DataRow */
case 't': /* ParameterDescription */
- case 'S': /* ParameterStatus */
case 'T': /* RowDescription */
-
- if (client) {
- sbuf_prepare_send(sbuf, &client->sbuf, pkt_len);
- } else {
- if (server->state != SV_TESTED)
- slog_warning(server,
- "got packet '%c' from server when not linked",
- pkt_type);
- sbuf_prepare_skip(sbuf, pkt_len);
- }
break;
}
server->ready = ready;
-
- /* update stats */
server->pool->stats.server_bytes += pkt_len;
- if (server->ready && client) {
- usec_t total;
- Assert(client->query_start != 0);
-
- total = get_cached_time() - client->query_start;
- client->query_start = 0;
- server->pool->stats.query_time += total;
- slog_debug(client, "query time: %d us", (int)total);
+
+ if (server->setting_vars) {
+ Assert(client);
+ sbuf_prepare_skip(sbuf, pkt_len);
+ if (ready) {
+ server->setting_vars = 0;
+ sbuf_continue(&client->sbuf);
+ }
+ } else if (client) {
+ sbuf_prepare_send(sbuf, &client->sbuf, pkt_len);
+ if (ready) {
+ usec_t total;
+ Assert(client->query_start != 0);
+
+ total = get_cached_time() - client->query_start;
+ client->query_start = 0;
+ server->pool->stats.query_time += total;
+ slog_debug(client, "query time: %d us", (int)total);
+ }
+ } else {
+ if (server->state != SV_TESTED)
+ slog_warning(server,
+ "got packet '%c' from server when not linked",
+ pkt_type);
+ sbuf_prepare_skip(sbuf, pkt_len);
}
return true;
--- /dev/null
+/*
+ * PgBouncer - Lightweight connection pooler for PostgreSQL.
+ *
+ * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+/*
+ * Operations with server config parameters.
+ */
+
+#include "bouncer.h"
+
+struct var_lookup {
+ const char *name;
+ int offset;
+ int len;
+};
+
+static const struct var_lookup lookup [] = {
+{"client_encoding", offsetof(VarCache, client_encoding), VAR_ENCODING_LEN },
+{"datestyle", offsetof(VarCache, datestyle), VAR_DATESTYLE_LEN },
+{"timezone", offsetof(VarCache, timezone), VAR_TIMEZONE_LEN },
+{"standard_conforming_strings", offsetof(VarCache, std_strings), VAR_STDSTR_LEN },
+{NULL},
+};
+
+static char *get_value(VarCache *cache, const struct var_lookup *lk)
+{
+ return (char *)(cache) + lk->offset;
+}
+
+bool varcache_set(VarCache *cache,
+ const char *key, const char *value,
+ bool overwrite)
+{
+ int vlen;
+ char *pos;
+ const struct var_lookup *lk;
+
+ for (lk = lookup; lk->name; lk++) {
+ if (strcasecmp(lk->name, key) != 0)
+ continue;
+
+ pos = get_value(cache, lk);
+
+ if (!overwrite && *pos)
+ break;
+
+ vlen = strlcpy(pos, value, lk->len);
+ if (vlen >= lk->len)
+ log_warning("varcache_set(%s) overflow", key);
+ else
+ log_debug("varcache_set: %s=%s", key, pos);
+ return true;
+ }
+ return false;
+}
+
+static int apply_var(PktBuf *pkt, const char *key,
+ const char *cval, const char *sval)
+{
+ char buf[128];
+ int len;
+
+ if (strcasecmp(cval, sval) == 0)
+ return 0;
+
+ len = snprintf(buf, sizeof(buf), "SET %s='%s';", key, cval);
+ if (len < sizeof(buf)) {
+ pktbuf_put_bytes(pkt, buf, len);
+ return 1;
+ } else {
+ log_warning("got too long value, skipping");
+ return 0;
+ }
+}
+
+bool varcache_apply(PgSocket *server, PgSocket *client, bool *changes_p)
+{
+ PktBuf pkt;
+ uint8 buf[1024];
+ int changes = 0;
+ const char *cval, *sval;
+ const struct var_lookup *lk;
+ uint8 *debug_sql;
+
+
+ pktbuf_static(&pkt, buf, sizeof(buf));
+ pktbuf_start_packet(&pkt, 'Q');
+
+ debug_sql = pkt.buf + pkt.write_pos;
+
+ for (lk = lookup; lk->name; lk++) {
+ sval = get_value(&server->vars, lk);
+ cval = get_value(&client->vars, lk);
+ changes += apply_var(&pkt, lk->name, cval, sval);
+ }
+ *changes_p = changes > 0;
+ if (!changes)
+ return true;
+
+ pktbuf_put_char(&pkt, 0);
+ pktbuf_finish_packet(&pkt);
+
+ slog_debug(server, "varcache_apply: %s", debug_sql);
+ return pktbuf_send_immidiate(&pkt, server);
+}
+
+void varcache_fill_unset(VarCache *src, PgSocket *dst)
+{
+ char *srcval, *dstval;
+ const struct var_lookup *lk;
+ for (lk = lookup; lk->name; lk++) {
+ srcval = get_value(src, lk);
+ dstval = get_value(&dst->vars, lk);
+ if (*dstval)
+ continue;
+
+ /* empty val, copy */
+ slog_debug(dst, "varcache_fill_unset: %s = %s", lk->name, srcval);
+ strlcpy(dstval, srcval, lk->len);
+ }
+}
+
+void varcache_clean(VarCache *cache)
+{
+ cache->client_encoding[0] = 0;
+ cache->datestyle[0] = 0;
+ cache->timezone[0] = 0;
+ cache->std_strings[0] = 0;
+}
+
+void varcache_add_params(PktBuf *pkt, VarCache *vars)
+{
+ char *val;
+ const struct var_lookup *lk;
+ for (lk = lookup; lk->name; lk++) {
+ val = get_value(vars, lk);
+ if (*val)
+ pktbuf_write_ParameterStatus(pkt, lk->name, val);
+ else
+ log_error("varcache_add_params: empty param: %s", lk->name);
+ }
+}
+
+
+void varcache_print(VarCache *vars, const char *desc)
+{
+ char *val;
+ const struct var_lookup *lk;
+ for (lk = lookup; lk->name; lk++) {
+ val = get_value(vars, lk);
+ log_debug("%s: %s='%s'", desc, lk->name, val);
+ }
+}
+
+
+
+
--- /dev/null
+
+#define VAR_ENCODING_LEN 16
+#define VAR_DATESTYLE_LEN 16
+#define VAR_TIMEZONE_LEN 36
+#define VAR_STDSTR_LEN 4
+
+typedef struct VarCache VarCache;
+
+struct VarCache {
+ char client_encoding[VAR_ENCODING_LEN];
+ char datestyle[VAR_DATESTYLE_LEN];
+ char timezone[VAR_TIMEZONE_LEN];
+ char std_strings[VAR_STDSTR_LEN];
+};
+
+bool varcache_set(VarCache *cache, const char *key, const char *value, bool overwrite);
+bool varcache_apply(PgSocket *server, PgSocket *client, bool *changes_p);
+void varcache_fill_unset(VarCache *src, PgSocket *dst);
+void varcache_clean(VarCache *cache);
+void varcache_add_params(PktBuf *pkt, VarCache *vars);
+void varcache_print(VarCache *vars, const char *desc);
+