From: Marko Kreen Date: Thu, 2 Aug 2007 11:59:19 +0000 (+0000) Subject: Server parameter tracking. X-Git-Tag: pgbouncer_1_1~65 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=926c2225db95577e68d87c234243c69024a42f47;p=pgbouncer Server parameter tracking. --- diff --git a/NEWS b/NEWS index 17f038d..1b6b310 100644 --- a/NEWS +++ b/NEWS @@ -8,8 +8,10 @@ - Accept custom unix socket location in host= - Accept quoted values: password=' asd''foo' * server_reset_query, to be sent immidiately after release - * Cancel pkt sent for idle connection does not drop it anymore, - just ReadyForQuery is re-sent. + * keep track of following server variables: + client_encoding, datestyle, timezone, standard_conforming_strings + * Cancel pkt sent for idle connection does not drop it anymore. + * Cancel with ^C from psql works for SUSPEND / PAUSE. * Print FD limits on startup. * More debug log messages include socket info. * When suspending, try to hit packet boundary ASAP. diff --git a/doc/todo.txt b/doc/todo.txt index c38e1ee..05eec62 100644 --- a/doc/todo.txt +++ b/doc/todo.txt @@ -1,15 +1,17 @@ = PgBouncer TODO list = -== Small stuff == +== High-prio == * suspend_timeout - drop stalled conns + * create manpage + * report existing pidfile to console + * before loading users, disable all existing == Low-prio == - * create manpage * drop_on_error/keep_on_error - if released conn is in error state, then issue rollback and keep it - * report existing pidfile to console + * keep stats about error counts == Make -R less scary == @@ -26,12 +28,7 @@ * if tcp - try binding * if unix - try connect() -== Suspicious items == +== Just ideas == - * keep track of requested client_encoding (+others) and update if needed? - * original values in welcome pkt - * static: integer_datetimes, server_encoding, server_version, - * dynamic?: session_authorization, is_superuser?, standard_conforming_strings, TimeZone - * keep stats about error counts - * before loading users, disable all existing * auth_conn - access to pg_shadow, so auth_file is not needed + * possibility to specify failover databases diff --git a/src/admin.c b/src/admin.c index 4c51129..6677c9d 100644 --- a/src/admin.c +++ b/src/admin.c @@ -951,15 +951,15 @@ void admin_setup(void) create_auth_cache(); /* prepare welcome */ - pktbuf_static(&msg, db->welcome_msg, sizeof(db->welcome_msg)); + pktbuf_static(&msg, pool->welcome_msg, sizeof(pool->welcome_msg)); pktbuf_write_AuthenticationOk(&msg); pktbuf_write_ParameterStatus(&msg, "server_version", "8.0/bouncer"); pktbuf_write_ParameterStatus(&msg, "client_encoding", "UNICODE"); - pktbuf_write_ParameterStatus(&msg, "server_encoding", "UNICODE"); + pktbuf_write_ParameterStatus(&msg, "server_encoding", "SQL_ASCII"); pktbuf_write_ParameterStatus(&msg, "is_superuser", "on"); - db->welcome_msg_len = pktbuf_written(&msg); - db->welcome_msg_ready = 1; + pool->welcome_msg_len = pktbuf_written(&msg); + pool->welcome_msg_ready = 1; pktbuf_static(&msg, db->startup_params, sizeof(db->startup_params)); pktbuf_put_string(&msg, "database"); diff --git a/src/bouncer.h b/src/bouncer.h index bf5e529..691cf34 100644 --- a/src/bouncer.h +++ b/src/bouncer.h @@ -70,6 +70,7 @@ typedef enum SocketState SocketState; #include "mbuf.h" #include "sbuf.h" #include "pktbuf.h" +#include "varcache.h" #include "admin.h" #include "loader.h" @@ -149,10 +150,17 @@ struct PgPool { PgStats newer_stats; PgStats older_stats; + /* database info to be sent to client */ + uint8 welcome_msg[256]; + unsigned welcome_msg_len; + + VarCache orig_vars; + /* if last connect failed, there should be delay before next */ usec_t last_connect_time; unsigned last_connect_failed:1; unsigned admin:1; + unsigned welcome_msg_ready:1; }; #define pool_server_count(pool) ( \ @@ -177,11 +185,6 @@ struct PgDatabase { List head; char name[MAX_DBNAME]; - /* database info to be sent to client */ - uint8 welcome_msg[512]; - unsigned welcome_msg_len; - unsigned welcome_msg_ready:1; - unsigned db_paused:1; /* key/val pairs (without user) for startup msg to be sent to server */ @@ -221,6 +224,8 @@ struct PgSocket { unsigned wait_for_response:1; /* this (server) socket must be closed ASAP */ unsigned close_needed:1; + /* setting client vars */ + unsigned setting_vars:1; usec_t connect_time; /* when connection was made */ usec_t request_time; /* last activity time */ @@ -231,6 +236,8 @@ struct PgSocket { PgUser * auth_user; PgAddr addr; + VarCache vars; + SBuf sbuf; /* stream buffer, must be last */ }; @@ -288,6 +295,8 @@ extern int cf_log_connections; extern int cf_log_disconnections; extern int cf_log_pooler_errors; +extern int cf_disable_varcache; + extern ConfElem bouncer_params[]; diff --git a/src/client.c b/src/client.c index 89aa21c..6af2544 100644 --- a/src/client.c +++ b/src/client.c @@ -111,6 +111,11 @@ static bool decide_startup_pool(PgSocket *client, MBuf *pkt) dbname = val; else if (strcmp(key, "user") == 0) username = val; + else { + /* remember requested parameters */ + if (varcache_set(&client->vars, key, val, true)) + slog_debug(client, "got var: %s=%s", key, val); + } } if (!username) { disconnect_client(client, true, "No username supplied"); diff --git a/src/janitor.c b/src/janitor.c index 6fb8fcf..cfddef7 100644 --- a/src/janitor.c +++ b/src/janitor.c @@ -158,7 +158,7 @@ static void per_loop_activate(PgPool *pool) if (!statlist_empty(&pool->idle_server_list)) { /* db not fully initialized after reboot */ - if (client->wait_for_welcome && !pool->db->welcome_msg_ready) { + if (client->wait_for_welcome && !pool->welcome_msg_ready) { launch_new_connection(pool); continue; } diff --git a/src/main.c b/src/main.c index e6b2af2..7cae3b5 100644 --- a/src/main.c +++ b/src/main.c @@ -101,6 +101,8 @@ int cf_log_connections = 1; int cf_log_disconnections = 1; int cf_log_pooler_errors = 1; +int cf_disable_varcache = 0; + /* * config file description */ @@ -143,6 +145,7 @@ ConfElem bouncer_params[] = { {"log_connections", true, CF_INT, &cf_log_connections}, {"log_disconnections", true, CF_INT, &cf_log_disconnections}, {"log_pooler_errors", true, CF_INT, &cf_log_pooler_errors}, +{"disable_varcache", true, CF_INT, &cf_disable_varcache}, {NULL}, }; diff --git a/src/objects.c b/src/objects.c index 4e3387a..8f30751 100644 --- a/src/objects.c +++ b/src/objects.c @@ -91,6 +91,9 @@ static void clean_socket(PgSocket *sk) sk->query_start = 0; sk->auth_user = NULL; + + varcache_clean(&sk->vars); + sk->setting_vars = 0; } /* allocate & fill client socket */ @@ -513,6 +516,7 @@ bool find_server(PgSocket *client) PgPool *pool = client->pool; PgSocket *server; bool res; + bool varchange = false; Assert(client->state == CL_ACTIVE); @@ -520,9 +524,9 @@ bool find_server(PgSocket *client) return true; /* try to get idle server, if allowed */ - if (cf_pause_mode == P_PAUSE) + if (cf_pause_mode == P_PAUSE) { server = NULL; - else { + } else { while (1) { server = first_socket(&pool->idle_server_list); if (!server || server->ready) @@ -530,14 +534,28 @@ bool find_server(PgSocket *client) disconnect_server(server, true, "idle server got dirty"); } } + Assert(!server || server->state == SV_IDLE); + + /* send var changes */ + if (server && !cf_disable_varcache) { + res = varcache_apply(server, client, &varchange); + if (!res) { + disconnect_server(server, true, "var change failed"); + server = NULL; + } + } /* link or send to waiters list */ if (server) { - Assert(server->state == SV_IDLE); client->link = server; server->link = client; change_server_state(server, SV_ACTIVE); - res = true; + if (varchange) { + sbuf_pause(&client->sbuf); + res = false; /* don't process client data yet */ + server->setting_vars = 1; + } else + res = true; } else { pause_client(client); Assert(client->state == CL_WAITING); @@ -737,7 +755,7 @@ void launch_new_connection(PgPool *pool) /* is it allowed to add servers? */ total = pool_server_count(pool); - if (total >= pool->db->pool_size && pool->db->welcome_msg_ready) { + if (total >= pool->db->pool_size && pool->welcome_msg_ready) { log_debug("launch_new_connection: pool full (%d >= %d)", total, pool->db->pool_size); return; diff --git a/src/proto.c b/src/proto.c index dfc22eb..3ac9f56 100644 --- a/src/proto.c +++ b/src/proto.c @@ -124,21 +124,21 @@ void log_server_error(const char *note, MBuf *pkt) bool add_welcome_parameter(PgSocket *server, unsigned pkt_type, unsigned pkt_len, MBuf *pkt) { - PgDatabase *db = server->pool->db; + PgPool *pool = server->pool; PktBuf msg; const char *key, *val; - if (db->welcome_msg_ready) + if (pool->welcome_msg_ready) return true; /* incomplete startup msg from server? */ if (pkt_len - 5 > mbuf_avail(pkt)) return false; - pktbuf_static(&msg, db->welcome_msg + db->welcome_msg_len, - sizeof(db->welcome_msg) - db->welcome_msg_len); + pktbuf_static(&msg, pool->welcome_msg + pool->welcome_msg_len, + sizeof(pool->welcome_msg) - pool->welcome_msg_len); - if (db->welcome_msg_len == 0) + if (pool->welcome_msg_len == 0) pktbuf_write_AuthenticationOk(&msg); key = mbuf_get_string(pkt); @@ -147,9 +147,16 @@ bool add_welcome_parameter(PgSocket *server, slog_error(server, "broken ParameterStatus packet"); return false; } + slog_debug(server, "S: param: %s = %s", key, val); - pktbuf_write_ParameterStatus(&msg, key, val); - db->welcome_msg_len += pktbuf_written(&msg); + if (varcache_set(&pool->orig_vars, key, val, true)) { + slog_debug(server, "interesting var: %s=%s", key, val); + varcache_set(&server->vars, key, val, true); + } else { + slog_debug(server, "uninteresting var: %s=%s", key, val); + pktbuf_write_ParameterStatus(&msg, key, val); + pool->welcome_msg_len += pktbuf_written(&msg); + } return true; } @@ -157,10 +164,10 @@ bool add_welcome_parameter(PgSocket *server, /* all parameters processed */ void finish_welcome_msg(PgSocket *server) { - PgDatabase *db = server->pool->db; - if (db->welcome_msg_ready) + PgPool *pool = server->pool; + if (pool->welcome_msg_ready) return; - db->welcome_msg_ready = 1; + pool->welcome_msg_ready = 1; } bool welcome_client(PgSocket *client) @@ -168,14 +175,20 @@ bool welcome_client(PgSocket *client) int res; uint8 buf[1024]; PktBuf msg; - PgDatabase *db = client->pool->db; + PgPool *pool = client->pool; slog_noise(client, "P: welcome_client"); - if (!db->welcome_msg_ready) + if (!pool->welcome_msg_ready) return false; + varcache_print(&client->vars, "welcome/client"); + varcache_print(&client->pool->orig_vars, "welcome/pool"); + pktbuf_static(&msg, buf, sizeof(buf)); - pktbuf_put_bytes(&msg, db->welcome_msg, db->welcome_msg_len); + pktbuf_put_bytes(&msg, pool->welcome_msg, pool->welcome_msg_len); + + varcache_fill_unset(&pool->orig_vars, client); + varcache_add_params(&msg, &client->vars); /* give each client its own cancel key */ get_random_bytes(client->cancel_key, 8); diff --git a/src/server.c b/src/server.c index 53b881f..1e8fd94 100644 --- a/src/server.c +++ b/src/server.c @@ -22,6 +22,33 @@ #include "bouncer.h" +static void check_parameters(PgSocket *server, MBuf *pkt, unsigned pkt_len) +{ + const char *key, *val; + PgSocket *client = server->link; + + /* incomplete startup msg from server? */ + if (pkt_len - 5 > mbuf_avail(pkt)) + return; + + key = mbuf_get_string(pkt); + val = mbuf_get_string(pkt); + if (!key || !val) { + slog_error(server, "broken ParameterStatus packet"); + return; + } + slog_debug(server, "S: param: %s = %s", key, val); + + varcache_set(&server->vars, key, val, true); + + if (client) { + slog_debug(client, "setting client var: %s='%s'", key, val); + varcache_set(&client->vars, key, val, true); + } + + return; +} + /* process packets on server auth phase */ static bool handle_server_startup(PgSocket *server, MBuf *pkt) { @@ -47,6 +74,7 @@ static bool handle_server_startup(PgSocket *server, MBuf *pkt) slog_error(server, "unknown pkt from server: '%c'", pkt_type); disconnect_server(server, true, "unknown pkt from server"); break; + case 'E': /* ErrorResponse */ log_server_error("S: login failed", pkt); disconnect_server(server, true, "login failed"); @@ -59,9 +87,11 @@ static bool handle_server_startup(PgSocket *server, MBuf *pkt) if (!res) disconnect_server(server, false, "failed to answer authreq"); break; + case 'S': /* ParameterStatus */ res = add_welcome_parameter(server, pkt_type, pkt_len, pkt); break; + case 'Z': /* ReadyForQuery */ /* login ok */ slog_debug(server, "server login ok, start accepting queries"); @@ -83,6 +113,7 @@ static bool handle_server_startup(PgSocket *server, MBuf *pkt) memcpy(server->cancel_key, mbuf_get_bytes(pkt, 8), 8); res = true; break; + case 'N': /* NoticeResponse */ slog_noise(server, "skipping pkt: %c", pkt_type); res = true; @@ -135,6 +166,11 @@ static bool handle_server_work(PgSocket *server, MBuf *pkt) "Long transactions not allowed"); return false; } + break; + + case 'S': /* ParameterStatus */ + check_parameters(server, pkt, pkt_len); + break; /* * 'E' and 'N' packets currently set ->ready to 0. Correct would @@ -147,13 +183,24 @@ static bool handle_server_work(PgSocket *server, MBuf *pkt) * it later. */ case 'E': /* ErrorResponse */ + if (server->setting_vars) { + /* + * the SET and user query will be different TX + * so we cannot report SET error to user. + */ + log_server_error("varcache_apply failed", pkt); + + /* + * client probably gave invalid values in startup pkt. + * + * no reason to keep such guys. + */ + disconnect_client(server->link, true, "invalid server parameter"); + } case 'N': /* NoticeResponse */ + break; - /* - * chat packets, but server (and thus pooler) - * is allowed to buffer them until Sync or Flush - * is sent by client. - */ + /* chat packets */ case '2': /* BindComplete */ case '3': /* CloseComplete */ case 'c': /* CopyDone(F/B) */ @@ -172,32 +219,36 @@ static bool handle_server_work(PgSocket *server, MBuf *pkt) case 'd': /* CopyData(F/B) */ case 'D': /* DataRow */ case 't': /* ParameterDescription */ - case 'S': /* ParameterStatus */ case 'T': /* RowDescription */ - - if (client) { - sbuf_prepare_send(sbuf, &client->sbuf, pkt_len); - } else { - if (server->state != SV_TESTED) - slog_warning(server, - "got packet '%c' from server when not linked", - pkt_type); - sbuf_prepare_skip(sbuf, pkt_len); - } break; } server->ready = ready; - - /* update stats */ server->pool->stats.server_bytes += pkt_len; - if (server->ready && client) { - usec_t total; - Assert(client->query_start != 0); - - total = get_cached_time() - client->query_start; - client->query_start = 0; - server->pool->stats.query_time += total; - slog_debug(client, "query time: %d us", (int)total); + + if (server->setting_vars) { + Assert(client); + sbuf_prepare_skip(sbuf, pkt_len); + if (ready) { + server->setting_vars = 0; + sbuf_continue(&client->sbuf); + } + } else if (client) { + sbuf_prepare_send(sbuf, &client->sbuf, pkt_len); + if (ready) { + usec_t total; + Assert(client->query_start != 0); + + total = get_cached_time() - client->query_start; + client->query_start = 0; + server->pool->stats.query_time += total; + slog_debug(client, "query time: %d us", (int)total); + } + } else { + if (server->state != SV_TESTED) + slog_warning(server, + "got packet '%c' from server when not linked", + pkt_type); + sbuf_prepare_skip(sbuf, pkt_len); } return true; diff --git a/src/varcache.c b/src/varcache.c new file mode 100644 index 0000000..2ffd1ea --- /dev/null +++ b/src/varcache.c @@ -0,0 +1,171 @@ +/* + * PgBouncer - Lightweight connection pooler for PostgreSQL. + * + * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +/* + * Operations with server config parameters. + */ + +#include "bouncer.h" + +struct var_lookup { + const char *name; + int offset; + int len; +}; + +static const struct var_lookup lookup [] = { +{"client_encoding", offsetof(VarCache, client_encoding), VAR_ENCODING_LEN }, +{"datestyle", offsetof(VarCache, datestyle), VAR_DATESTYLE_LEN }, +{"timezone", offsetof(VarCache, timezone), VAR_TIMEZONE_LEN }, +{"standard_conforming_strings", offsetof(VarCache, std_strings), VAR_STDSTR_LEN }, +{NULL}, +}; + +static char *get_value(VarCache *cache, const struct var_lookup *lk) +{ + return (char *)(cache) + lk->offset; +} + +bool varcache_set(VarCache *cache, + const char *key, const char *value, + bool overwrite) +{ + int vlen; + char *pos; + const struct var_lookup *lk; + + for (lk = lookup; lk->name; lk++) { + if (strcasecmp(lk->name, key) != 0) + continue; + + pos = get_value(cache, lk); + + if (!overwrite && *pos) + break; + + vlen = strlcpy(pos, value, lk->len); + if (vlen >= lk->len) + log_warning("varcache_set(%s) overflow", key); + else + log_debug("varcache_set: %s=%s", key, pos); + return true; + } + return false; +} + +static int apply_var(PktBuf *pkt, const char *key, + const char *cval, const char *sval) +{ + char buf[128]; + int len; + + if (strcasecmp(cval, sval) == 0) + return 0; + + len = snprintf(buf, sizeof(buf), "SET %s='%s';", key, cval); + if (len < sizeof(buf)) { + pktbuf_put_bytes(pkt, buf, len); + return 1; + } else { + log_warning("got too long value, skipping"); + return 0; + } +} + +bool varcache_apply(PgSocket *server, PgSocket *client, bool *changes_p) +{ + PktBuf pkt; + uint8 buf[1024]; + int changes = 0; + const char *cval, *sval; + const struct var_lookup *lk; + uint8 *debug_sql; + + + pktbuf_static(&pkt, buf, sizeof(buf)); + pktbuf_start_packet(&pkt, 'Q'); + + debug_sql = pkt.buf + pkt.write_pos; + + for (lk = lookup; lk->name; lk++) { + sval = get_value(&server->vars, lk); + cval = get_value(&client->vars, lk); + changes += apply_var(&pkt, lk->name, cval, sval); + } + *changes_p = changes > 0; + if (!changes) + return true; + + pktbuf_put_char(&pkt, 0); + pktbuf_finish_packet(&pkt); + + slog_debug(server, "varcache_apply: %s", debug_sql); + return pktbuf_send_immidiate(&pkt, server); +} + +void varcache_fill_unset(VarCache *src, PgSocket *dst) +{ + char *srcval, *dstval; + const struct var_lookup *lk; + for (lk = lookup; lk->name; lk++) { + srcval = get_value(src, lk); + dstval = get_value(&dst->vars, lk); + if (*dstval) + continue; + + /* empty val, copy */ + slog_debug(dst, "varcache_fill_unset: %s = %s", lk->name, srcval); + strlcpy(dstval, srcval, lk->len); + } +} + +void varcache_clean(VarCache *cache) +{ + cache->client_encoding[0] = 0; + cache->datestyle[0] = 0; + cache->timezone[0] = 0; + cache->std_strings[0] = 0; +} + +void varcache_add_params(PktBuf *pkt, VarCache *vars) +{ + char *val; + const struct var_lookup *lk; + for (lk = lookup; lk->name; lk++) { + val = get_value(vars, lk); + if (*val) + pktbuf_write_ParameterStatus(pkt, lk->name, val); + else + log_error("varcache_add_params: empty param: %s", lk->name); + } +} + + +void varcache_print(VarCache *vars, const char *desc) +{ + char *val; + const struct var_lookup *lk; + for (lk = lookup; lk->name; lk++) { + val = get_value(vars, lk); + log_debug("%s: %s='%s'", desc, lk->name, val); + } +} + + + + diff --git a/src/varcache.h b/src/varcache.h new file mode 100644 index 0000000..0a0c6a6 --- /dev/null +++ b/src/varcache.h @@ -0,0 +1,22 @@ + +#define VAR_ENCODING_LEN 16 +#define VAR_DATESTYLE_LEN 16 +#define VAR_TIMEZONE_LEN 36 +#define VAR_STDSTR_LEN 4 + +typedef struct VarCache VarCache; + +struct VarCache { + char client_encoding[VAR_ENCODING_LEN]; + char datestyle[VAR_DATESTYLE_LEN]; + char timezone[VAR_TIMEZONE_LEN]; + char std_strings[VAR_STDSTR_LEN]; +}; + +bool varcache_set(VarCache *cache, const char *key, const char *value, bool overwrite); +bool varcache_apply(PgSocket *server, PgSocket *client, bool *changes_p); +void varcache_fill_unset(VarCache *src, PgSocket *dst); +void varcache_clean(VarCache *cache); +void varcache_add_params(PktBuf *pkt, VarCache *vars); +void varcache_print(VarCache *vars, const char *desc); +