]> granicus.if.org Git - pgbouncer/commitdiff
more universal asynctest
authorMarko Kreen <markokr@gmail.com>
Mon, 7 Jan 2008 08:40:17 +0000 (08:40 +0000)
committerMarko Kreen <markokr@gmail.com>
Mon, 7 Jan 2008 08:40:17 +0000 (08:40 +0000)
test/Makefile
test/asynctest.c

index f7740539d650d07994896b5130ebde009837bd3e..4deae03534d8dd865949df31c68111d82b26f64e 100644 (file)
@@ -4,7 +4,7 @@ PGLIB = -L$(shell pg_config --libdir)
 
 include ../config.mak
 
-CPPFLAGS += -I../src $(PGINC)
+CPPFLAGS += -I../include $(PGINC)
 LDFLAGS += $(PGLIB) -lpq -levent
 
 all: asynctest
index ac3d1692b2fb4ba191544e51d771bda3d145c2a6..bd7aebd38f334fb37523d247a56e439e4ce759c7 100644 (file)
@@ -9,18 +9,34 @@
 #include <sys/time.h>
 #include <sys/select.h>
 #include <sys/types.h>
+#include <time.h>
 #include <unistd.h>
 #include <errno.h>
 #include <string.h>
 #include <stdio.h>
 #include <stdlib.h>
+#include <getopt.h>
 
 #include <libpq-fe.h>
 #include <event.h>
 
+static void log_error(const char *, ...);
+static void log_debug(const char *, ...);
+
+typedef uint64_t usec_t;
+#define USEC 1000000ULL
+static usec_t get_time_usec(void)
+{
+       struct timeval tv;
+       gettimeofday(&tv, NULL);
+       return (usec_t)tv.tv_sec * USEC + tv.tv_usec;
+}
+
+typedef void (*libev_cb_f)(int sock, short flags, void *arg);
+
 #define Assert(e) do { if (!(e)) { \
-       printf("Assert(%s) failed: %s:%d in %s\n", \
-               #e, __FILE__, __LINE__, __FUNCTION__); \
+       log_error("Assert(%s) failed: %s:%d in %s", \
+                 #e, __FILE__, __LINE__, __FUNCTION__); \
        exit(1); } } while (0)
 
 typedef enum { false=0, true=1 } bool;
@@ -30,18 +46,26 @@ typedef enum { false=0, true=1 } bool;
 static LIST(idle_list);
 static LIST(active_list);
 
+#define QT_SIMPLE  1
+#define QT_BIGDATA 2
+#define QT_SLEEP   4
+static unsigned QueryTypes = 0;
+static uint64_t QueryCount = 0;
+
 typedef struct DbConn {
        List            head;
        const char      *connstr;
        struct event    ev;
+       PGconn          *con;
+
        //time_t                connect_time;
        //unsigned      query_count;
-       PGconn          *con;
        //const char    *query;
 } DbConn;
 
 static char *bulk_data;
 static int bulk_data_max = 16*1024;  /* power of 2 */
+static int verbose = 0;
 
 /* fill mem with random junk */
 static void init_bulk_data(void)
@@ -68,6 +92,7 @@ static void set_idle(DbConn *db)
        Assert(item_in_list(&db->head, &active_list));
        list_del(&db->head);
        list_append(&db->head, &idle_list);
+       log_debug("%p: set_idle", db);
 }
 
 static void set_active(DbConn *db)
@@ -75,23 +100,71 @@ static void set_active(DbConn *db)
        Assert(item_in_list(&db->head, &idle_list));
        list_del(&db->head);
        list_append(&db->head, &active_list);
+       log_debug("%p: set_active", db);
 }
 
-/** some error happened */
-static void conn_error(DbConn *db, const char *desc)
+static void fatal_perror(const char *err)
 {
+       log_error("%s: %s", err, strerror(errno));
+       exit(1);
+}
+
+static void log_debug(const char *fmt, ...)
+{
+       va_list ap;
+       char buf[1024];
+       if (verbose == 0)
+               return;
+       va_start(ap, fmt);
+       vsnprintf(buf, sizeof(buf), fmt, ap);
+       va_end(ap);
+       printf("dbg: %s\n", buf);
+}
+
+static void log_error(const char *fmt, ...)
+{
+       va_list ap;
+       char buf[1024];
+       va_start(ap, fmt);
+       vsnprintf(buf, sizeof(buf), fmt, ap);
+       va_end(ap);
+       printf("ERR: %s\n", buf);
+}
+
+static void wait_event(DbConn *db, short ev, libev_cb_f fn)
+{
+       event_set(&db->ev, PQsocket(db->con), ev, fn, db);
+       if (event_add(&db->ev, NULL) < 0)
+               fatal_perror("event_add");
+}
+
+static void disconnect(DbConn *db, const char *reason, ...)
+{
+       char buf[1024];
+       va_list ap;
        if (db->con) {
-               //printf("libpq error in %s: %s\n",
-               //       desc, PQerrorMessage(db->con));
+               va_start(ap, reason);
+               vsnprintf(buf, sizeof(buf), reason, ap);
+               va_end(ap);
+               log_debug("disconnect because: %s", buf);
                PQfinish(db->con);
                db->con = NULL;
+       }
+}
+
+/* some error happened */
+static void conn_error(DbConn *db, const char *desc)
+{
+       if (db->con) {
+               /* fixme show firt couple errors */
+               disconnect(db, "%s: %s", desc, PQerrorMessage(db->con));
        } else {
-               printf("random error\n");
+               printf("random error: %s\n", desc);
        }
        set_idle(db);
 }
 
-/**
+/*
  * Connection has a resultset avalable, fetch it.
  *
  * Returns true if there may be more results coming,
@@ -104,11 +177,10 @@ static bool another_result(DbConn *db)
        /* got one */
        res = PQgetResult(db->con);
        if (res == NULL) {
+               QueryCount++;
                set_idle(db);
-               if (1) {
-                       PQfinish(db->con);
-                       db->con = NULL;
-               }
+
+               disconnect(db, "query done");
                return false;
        }
 
@@ -146,8 +218,7 @@ static void result_cb(int sock, short flags, void *arg)
        while (1) {
                /* if PQisBusy, then incomplete result */
                if (PQisBusy(db->con)) {
-                       event_set(&db->ev, PQsocket(db->con), EV_READ, result_cb, db);
-                       event_add(&db->ev, NULL);
+                       wait_event(db, EV_READ, result_cb);
                        break;
                }
 
@@ -164,41 +235,53 @@ static void send_cb(int sock, short flags, void *arg)
 
        res = PQflush(db->con);
        if (res > 0) {
-               event_set(&db->ev, PQsocket(db->con), EV_WRITE, send_cb, db);
-               event_add(&db->ev, NULL);
+               wait_event(db, EV_WRITE, send_cb);
        } else if (res == 0) {
-               event_set(&db->ev, PQsocket(db->con), EV_READ, result_cb, db);
-               event_add(&db->ev, NULL);
+               wait_event(db, EV_READ, result_cb);
        } else
                conn_error(db, "PQflush");
 }
 
-/** send the query to server connection */
-static void send_query(DbConn *db)
+static int send_query_bigdata(DbConn *db)
 {
-       int res;
-       const char *q = "select $1::text";
        const char *values[1];
        int lengths[1];
        int fmts[1];
        int arglen;
+       char *q = "select $1::text";
 
        arglen = random() & (bulk_data_max - 1);
        values[0] = bulk_data + bulk_data_max - arglen;
        lengths[0] = arglen;
        fmts[0] = 1;
 
+       return PQsendQueryParams(db->con, q, 1, NULL, values, lengths, fmts, 1);
+}
+
+static int send_query_sleep(DbConn *db)
+{
+       const char *q = "select pg_sleep(0.2)";
+       return PQsendQueryParams(db->con, q, 0, NULL, NULL, NULL, NULL, 0);
+}
+
+static int send_query_simple(DbConn *db)
+{
+       const char *q = "select 1";
+       return PQsendQueryParams(db->con, q, 0, NULL, NULL, NULL, NULL, 0);
+}
+
+/** send the query to server connection */
+static void send_query(DbConn *db)
+{
+       int res;
+
        /* send query */
-       if ((random() & 63) == 0) {
-               res = PQsendQueryParams(db->con, "select pg_sleep(0.2)", 0,
-                                       NULL, NULL, NULL, NULL, 0);
+       if (QueryTypes & QT_SLEEP) {
+               res = send_query_sleep(db);
+       } else if (QueryTypes & QT_BIGDATA) {
+               res = send_query_bigdata(db);
        } else {
-               res = PQsendQueryParams(db->con, q, 1,
-                                       NULL,   /* paramTypes */
-                                       values, /* paramValues */
-                                       lengths,/* paramLengths */
-                                       fmts,   /* paramFormats */
-                                       1);     /* resultformat, 0-text, 1-bin */
+               res = send_query_simple(db);
        }
        if (!res) {
                conn_error(db, "PQsendQueryParams");
@@ -208,11 +291,9 @@ static void send_query(DbConn *db)
        /* flush it down */
        res = PQflush(db->con);
        if (res > 0) {
-               event_set(&db->ev, PQsocket(db->con), EV_WRITE, send_cb, db);
-               event_add(&db->ev, NULL);
+               wait_event(db, EV_WRITE, send_cb);
        } else if (res == 0) {
-               event_set(&db->ev, PQsocket(db->con), EV_READ, result_cb, db);
-               event_add(&db->ev, NULL);
+               wait_event(db, EV_READ, result_cb);
        } else
                conn_error(db, "PQflush");
 }
@@ -225,12 +306,10 @@ static void connect_cb(int sock, short flags, void *arg)
        poll_res = PQconnectPoll(db->con);
        switch (poll_res) {
        case PGRES_POLLING_WRITING:
-               event_set(&db->ev, PQsocket(db->con), EV_WRITE, connect_cb, db);
-               event_add(&db->ev, NULL);
+               wait_event(db, EV_WRITE, connect_cb);
                break;
        case PGRES_POLLING_READING:
-               event_set(&db->ev, PQsocket(db->con), EV_READ, connect_cb, db);
-               event_add(&db->ev, NULL);
+               wait_event(db, EV_READ, connect_cb);
                break;
        case PGRES_POLLING_OK:
                send_query(db);
@@ -255,8 +334,7 @@ static void launch_connect(DbConn *db)
                return;
        }
 
-       event_set(&db->ev, PQsocket(db->con), EV_WRITE, connect_cb, db);
-       event_add(&db->ev, NULL);
+       wait_event(db, EV_WRITE, connect_cb);
 }
 
 static void handle_idle(DbConn *db)
@@ -268,33 +346,110 @@ static void handle_idle(DbConn *db)
                launch_connect(db);
 }
 
-int main(void)
+static const char usage_str [] =
+"usage: asynctest [-d connstr][-n numconn][-s seed][-t <types>]\n"
+"accepted types:\n"
+"  B - bigdata\n"
+"  S - sleep occasionally\n"
+"  1 - simple 'select 1'\n";
+
+static void run_stats(int fd, short ev, void *arg)
 {
-       int i;
+       static struct event ev_stats;
+       struct timeval period = { 2, 0 };
+
+       static usec_t last_time = 0;
+       static uint64_t last_count = 0;
+
+       double time_diff, count_diff;
+       usec_t now = get_time_usec();
+
+       time_diff = now - last_time;
+       if (last_time && time_diff) {
+               count_diff = QueryCount - last_count;
+               if (verbose == 0) {
+                       printf(" qps: %8.1f\r", USEC * count_diff / time_diff);
+                       fflush(stdout);
+               }
+       }
+
+       if (!last_time)
+               evtimer_set(&ev_stats, run_stats, NULL);
+       if (evtimer_add(&ev_stats, &period) < 0)
+               fatal_perror("evtimer_add");
+
+       last_count = QueryCount;
+       last_time = now;
+}
+
+int main(int argc, char *argv[])
+{
+       int i, c;
        DbConn *db;
        List *item, *tmp;
-       unsigned seed;
+       unsigned seed = time(NULL) ^ getpid();
+       char *cstr = "dbname=conntest port=6000 host=127.0.0.1";
+       int numcon = 50;
+
+       while ((c = getopt(argc, argv, "d:n:s:t:hv")) != EOF) {
+               switch (c) {
+               default:
+               case 'h':
+                       printf("%s", usage_str);
+                       return 0;
+               case 'd':
+                       cstr = optarg;
+                       break;
+               case 'n':
+                       numcon = atoi(optarg);
+                       break;
+               case 's':
+                       seed = atoi(optarg);
+                       break;
+               case 'v':
+                       verbose++;
+                       break;
+               case 't':
+                       for (i = 0; optarg[i]; i++) {
+                               switch (optarg[i]) {
+                               case 'B': QueryTypes = QT_BIGDATA; break;
+                               case 'S': QueryTypes = QT_SLEEP; break;
+                               case '1': QueryTypes = QT_SIMPLE; break;
+                               default: log_error("bad type"); break;
+                               }
+                       }
+               }
+       }
+
+       if (QueryTypes == 0)
+               QueryTypes = QT_SIMPLE;
 
-       seed = time(NULL) ^ getpid();
        printf("using seed: %u\n", seed);
        srandom(seed);
 
        init_bulk_data();
 
-       for (i = 0; i < 50; i++) {
-               db = new_db("dbname=conntest port=6000 host=127.0.0.1 password=kama");
+       for (i = 0; i < numcon; i++) {
+               db = new_db(cstr);
                list_append(&db->head, &idle_list);
        }
+
+#if 0
        if (1)
        for (i = 0; i < 50; i++) {
                db = new_db("dbname=conntest port=7000 host=127.0.0.1 password=kama");
                list_append(&db->head, &idle_list);
        }
-
+#endif
        event_init();
 
+       run_stats(0, 0, NULL);
+
+       printf("running..\n");
+
        while (1) {
-               event_loop(EVLOOP_ONCE);
+               if (event_loop(EVLOOP_ONCE) < 0)
+                       log_error("event_loop: %s", strerror(errno));
                list_for_each_safe(item, &idle_list, tmp) {
                        db = container_of(item, DbConn, head);
                        handle_idle(db);