]> granicus.if.org Git - pgbouncer/commitdiff
draft throttling implementation
authorMarko Kreen <markokr@gmail.com>
Mon, 7 Jan 2008 21:16:04 +0000 (21:16 +0000)
committerMarko Kreen <markokr@gmail.com>
Mon, 7 Jan 2008 21:16:04 +0000 (21:16 +0000)
test/asynctest.c

index 6434f8b3dba1816cc76b289fd11839dddac6efa1..b91eecfa0048b5f1d05972120599ddba56f57672 100644 (file)
@@ -32,6 +32,18 @@ static usec_t get_time_usec(void)
        gettimeofday(&tv, NULL);
        return (usec_t)tv.tv_sec * USEC + tv.tv_usec;
 }
+static usec_t _time_cache = 0;
+
+static usec_t get_cached_time(void)
+{
+       if (!_time_cache)
+               _time_cache = get_time_usec();
+       return _time_cache;
+}
+static void reset_time_cache(void)
+{
+       _time_cache = 0;
+}
 
 typedef void (*libev_cb_f)(int sock, short flags, void *arg);
 
@@ -54,6 +66,7 @@ static STATLIST(active_list);
 #define QT_SLEEP   4
 static unsigned QueryTypes = 0;
 
+static uint64_t LoginOkCount = 0;
 static uint64_t LoginFailedCount = 0;
 static uint64_t SqlErrorCount = 0;
 static uint64_t QueryCount = 0;
@@ -73,6 +86,9 @@ typedef struct DbConn {
 static char *bulk_data;
 static int bulk_data_max = 16*1024;  /* power of 2 */
 static int verbose = 0;
+static int throttle_connects = 300;
+static int throttle_queries = 3000;
+
 
 /* fill mem with random junk */
 static void init_bulk_data(void)
@@ -222,7 +238,7 @@ static bool another_result(DbConn *db)
        return true;
 }
 
-/**
+/*
  * Called when select() told that conn is avail for reading/writing.
  *
  * It should call postgres handlers and then change state if needed.
@@ -294,7 +310,7 @@ static int send_query_simple(DbConn *db)
        return PQsendQueryParams(db->con, q, 0, NULL, NULL, NULL, NULL, 0);
 }
 
-/** send the query to server connection */
+/* send the query to server connection */
 static void send_query(DbConn *db)
 {
        int res;
@@ -337,6 +353,7 @@ static void connect_cb(int sock, short flags, void *arg)
                break;
        case PGRES_POLLING_OK:
                log_debug("login ok: fd=%d", PQsocket(db->con));
+               LoginOkCount++;
                db->logged_in = 1;
                send_query(db);
                break;
@@ -363,42 +380,80 @@ static void launch_connect(DbConn *db)
        wait_event(db, EV_WRITE, connect_cb);
 }
 
-static void handle_idle(DbConn *db)
+#define ACT_ONCE 10
+
+static void handle_idle(void)
 {
-       set_active(db);
-       if (db->con)
-               send_query(db);
-       else
-               launch_connect(db);
-}
+       DbConn *db;
+       List *item, *tmp;
+       int allow_connects = 100;
+       int allow_queries = 100;
+       static usec_t startup_time = 0;
+       usec_t now = get_cached_time();
+       usec_t diff;
+       int once;
+
+       if (startup_time == 0)
+               startup_time = get_cached_time();
+
+       diff = now - startup_time;
+       if (diff < USEC)
+               diff = USEC;
+
+       if (throttle_connects > 0) {
+               allow_connects = throttle_connects - LoginOkCount * USEC / diff;
+               once = throttle_connects / ACT_ONCE;
+               if (!once)
+                       once = 1;
+               if (once < allow_connects)
+                       allow_connects = once;
+       }
+       if (throttle_queries > 0) {
+               allow_queries = throttle_queries - QueryCount * USEC / diff;
+               once = throttle_connects / ACT_ONCE;
+               if (!once)
+                       once = 1;
+               if (once < allow_connects)
+                       allow_connects = once;
+       }
 
-static const char usage_str [] =
-"usage: asynctest [-d connstr][-n numconn][-s seed][-t <types>]\n"
-"accepted types:\n"
-"  B - bigdata\n"
-"  S - sleep occasionally\n"
-"  1 - simple 'select 1'\n";
+       statlist_for_each_safe(item, &idle_list, tmp) {
+               db = container_of(item, DbConn, head);
+               if (db->con && allow_queries > 0) {
+                       set_active(db);
+                       send_query(db);
+                       allow_queries--;
+               } else if (allow_connects > 0) {
+                       set_active(db);
+                       launch_connect(db);
+                       allow_connects--;
+               }
+       }
+}
 
 static void run_stats(int fd, short ev, void *arg)
 {
        static struct event ev_stats;
-       struct timeval period = { 2, 0 };
+       struct timeval period = { 5, 0 };
 
        static usec_t last_time = 0;
        static uint64_t last_query_count = 0;
        static uint64_t last_login_failed_count = 0;
+       static uint64_t last_login_ok_count = 0;
        static uint64_t last_sql_error_count = 0;
 
-       double time_diff, qcount_diff, loginerr_diff, sqlerr_diff;
-       usec_t now = get_time_usec();
+       double time_diff, qcount_diff, loginerr_diff, loginok_diff, sqlerr_diff;
+       usec_t now = get_cached_time();
 
        time_diff = now - last_time;
        if (last_time && time_diff) {
                qcount_diff = QueryCount - last_query_count;
                loginerr_diff = LoginFailedCount - last_login_failed_count;
                sqlerr_diff = SqlErrorCount - last_sql_error_count;
+               loginok_diff = LoginOkCount - last_login_ok_count;
                if (verbose == 0) {
-                       printf(">> loginerr,sqlerr,qcount: %6.1f / %6.1f / %6.1f  active/idle: %3d / %3d   \r",
+                       printf(">> loginok,loginerr,sqlerr,qcount: %6.1f / %6.1f / %6.1f / %6.1f  active/idle: %3d / %3d   \r",
+                              USEC * loginok_diff / time_diff,
                               USEC * loginerr_diff / time_diff,
                               USEC * sqlerr_diff / time_diff,
                               USEC * qcount_diff / time_diff,
@@ -415,19 +470,26 @@ static void run_stats(int fd, short ev, void *arg)
        last_query_count = QueryCount;
        last_login_failed_count = LoginFailedCount;
        last_sql_error_count = SqlErrorCount;
+       last_login_ok_count = LoginOkCount;
        last_time = now;
 }
 
+static const char usage_str [] =
+"usage: asynctest [-d connstr][-n numconn][-s seed][-t <types>][-C maxconn][-Q maxquery]\n"
+"accepted types:\n"
+"  B - bigdata\n"
+"  S - sleep occasionally\n"
+"  1 - simple 'select 1'\n";
+
 int main(int argc, char *argv[])
 {
        int i, c;
        DbConn *db;
-       List *item, *tmp;
        unsigned seed = time(NULL) ^ getpid();
        char *cstr = "dbname=conntest port=6000 host=127.0.0.1";
        int numcon = 50;
 
-       while ((c = getopt(argc, argv, "d:n:s:t:hv")) != EOF) {
+       while ((c = getopt(argc, argv, "d:n:s:t:hvC:Q:")) != EOF) {
                switch (c) {
                default:
                case 'h':
@@ -436,6 +498,12 @@ int main(int argc, char *argv[])
                case 'd':
                        cstr = optarg;
                        break;
+               case 'C':
+                       throttle_connects = atoi(optarg);
+                       break;
+               case 'Q':
+                       throttle_queries = atoi(optarg);
+                       break;
                case 'n':
                        numcon = atoi(optarg);
                        break;
@@ -470,13 +538,6 @@ int main(int argc, char *argv[])
                statlist_append(&db->head, &idle_list);
        }
 
-#if 0
-       if (1)
-       for (i = 0; i < 50; i++) {
-               db = new_db("dbname=conntest port=7000 host=127.0.0.1 password=kama");
-               list_append(&db->head, &idle_list);
-       }
-#endif
        event_init();
 
        run_stats(0, 0, NULL);
@@ -484,12 +545,10 @@ int main(int argc, char *argv[])
        printf("running..\n");
 
        while (1) {
+               handle_idle();
+               reset_time_cache();
                if (event_loop(EVLOOP_ONCE) < 0)
                        log_error("event_loop: %s", strerror(errno));
-               statlist_for_each_safe(item, &idle_list, tmp) {
-                       db = container_of(item, DbConn, head);
-                       handle_idle(db);
-               }
        }
        return 0;
 }