From 079e2c5f6b5a3fbcb393f580029307eb6bda1abc Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Mon, 7 Jan 2008 08:40:17 +0000 Subject: [PATCH] more universal asynctest --- test/Makefile | 2 +- test/asynctest.c | 255 +++++++++++++++++++++++++++++++++++++---------- 2 files changed, 206 insertions(+), 51 deletions(-) diff --git a/test/Makefile b/test/Makefile index f774053..4deae03 100644 --- a/test/Makefile +++ b/test/Makefile @@ -4,7 +4,7 @@ PGLIB = -L$(shell pg_config --libdir) include ../config.mak -CPPFLAGS += -I../src $(PGINC) +CPPFLAGS += -I../include $(PGINC) LDFLAGS += $(PGLIB) -lpq -levent all: asynctest diff --git a/test/asynctest.c b/test/asynctest.c index ac3d169..bd7aebd 100644 --- a/test/asynctest.c +++ b/test/asynctest.c @@ -9,18 +9,34 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include +static void log_error(const char *, ...); +static void log_debug(const char *, ...); + +typedef uint64_t usec_t; +#define USEC 1000000ULL +static usec_t get_time_usec(void) +{ + struct timeval tv; + gettimeofday(&tv, NULL); + return (usec_t)tv.tv_sec * USEC + tv.tv_usec; +} + +typedef void (*libev_cb_f)(int sock, short flags, void *arg); + #define Assert(e) do { if (!(e)) { \ - printf("Assert(%s) failed: %s:%d in %s\n", \ - #e, __FILE__, __LINE__, __FUNCTION__); \ + log_error("Assert(%s) failed: %s:%d in %s", \ + #e, __FILE__, __LINE__, __FUNCTION__); \ exit(1); } } while (0) typedef enum { false=0, true=1 } bool; @@ -30,18 +46,26 @@ typedef enum { false=0, true=1 } bool; static LIST(idle_list); static LIST(active_list); +#define QT_SIMPLE 1 +#define QT_BIGDATA 2 +#define QT_SLEEP 4 +static unsigned QueryTypes = 0; +static uint64_t QueryCount = 0; + typedef struct DbConn { List head; const char *connstr; struct event ev; + PGconn *con; + //time_t connect_time; //unsigned query_count; - PGconn *con; //const char *query; } DbConn; static char *bulk_data; static int bulk_data_max = 16*1024; /* power of 2 */ +static int verbose = 0; /* fill mem with random junk */ static void init_bulk_data(void) @@ -68,6 +92,7 @@ static void set_idle(DbConn *db) Assert(item_in_list(&db->head, &active_list)); list_del(&db->head); list_append(&db->head, &idle_list); + log_debug("%p: set_idle", db); } static void set_active(DbConn *db) @@ -75,23 +100,71 @@ static void set_active(DbConn *db) Assert(item_in_list(&db->head, &idle_list)); list_del(&db->head); list_append(&db->head, &active_list); + log_debug("%p: set_active", db); } -/** some error happened */ -static void conn_error(DbConn *db, const char *desc) +static void fatal_perror(const char *err) { + log_error("%s: %s", err, strerror(errno)); + exit(1); +} + +static void log_debug(const char *fmt, ...) +{ + va_list ap; + char buf[1024]; + if (verbose == 0) + return; + va_start(ap, fmt); + vsnprintf(buf, sizeof(buf), fmt, ap); + va_end(ap); + printf("dbg: %s\n", buf); +} + +static void log_error(const char *fmt, ...) +{ + va_list ap; + char buf[1024]; + va_start(ap, fmt); + vsnprintf(buf, sizeof(buf), fmt, ap); + va_end(ap); + printf("ERR: %s\n", buf); +} + +static void wait_event(DbConn *db, short ev, libev_cb_f fn) +{ + event_set(&db->ev, PQsocket(db->con), ev, fn, db); + if (event_add(&db->ev, NULL) < 0) + fatal_perror("event_add"); +} + +static void disconnect(DbConn *db, const char *reason, ...) +{ + char buf[1024]; + va_list ap; if (db->con) { - //printf("libpq error in %s: %s\n", - // desc, PQerrorMessage(db->con)); + va_start(ap, reason); + vsnprintf(buf, sizeof(buf), reason, ap); + va_end(ap); + log_debug("disconnect because: %s", buf); PQfinish(db->con); db->con = NULL; + } +} + +/* some error happened */ +static void conn_error(DbConn *db, const char *desc) +{ + if (db->con) { + /* fixme show firt couple errors */ + disconnect(db, "%s: %s", desc, PQerrorMessage(db->con)); } else { - printf("random error\n"); + printf("random error: %s\n", desc); } set_idle(db); } -/** +/* * Connection has a resultset avalable, fetch it. * * Returns true if there may be more results coming, @@ -104,11 +177,10 @@ static bool another_result(DbConn *db) /* got one */ res = PQgetResult(db->con); if (res == NULL) { + QueryCount++; set_idle(db); - if (1) { - PQfinish(db->con); - db->con = NULL; - } + + disconnect(db, "query done"); return false; } @@ -146,8 +218,7 @@ static void result_cb(int sock, short flags, void *arg) while (1) { /* if PQisBusy, then incomplete result */ if (PQisBusy(db->con)) { - event_set(&db->ev, PQsocket(db->con), EV_READ, result_cb, db); - event_add(&db->ev, NULL); + wait_event(db, EV_READ, result_cb); break; } @@ -164,41 +235,53 @@ static void send_cb(int sock, short flags, void *arg) res = PQflush(db->con); if (res > 0) { - event_set(&db->ev, PQsocket(db->con), EV_WRITE, send_cb, db); - event_add(&db->ev, NULL); + wait_event(db, EV_WRITE, send_cb); } else if (res == 0) { - event_set(&db->ev, PQsocket(db->con), EV_READ, result_cb, db); - event_add(&db->ev, NULL); + wait_event(db, EV_READ, result_cb); } else conn_error(db, "PQflush"); } -/** send the query to server connection */ -static void send_query(DbConn *db) +static int send_query_bigdata(DbConn *db) { - int res; - const char *q = "select $1::text"; const char *values[1]; int lengths[1]; int fmts[1]; int arglen; + char *q = "select $1::text"; arglen = random() & (bulk_data_max - 1); values[0] = bulk_data + bulk_data_max - arglen; lengths[0] = arglen; fmts[0] = 1; + return PQsendQueryParams(db->con, q, 1, NULL, values, lengths, fmts, 1); +} + +static int send_query_sleep(DbConn *db) +{ + const char *q = "select pg_sleep(0.2)"; + return PQsendQueryParams(db->con, q, 0, NULL, NULL, NULL, NULL, 0); +} + +static int send_query_simple(DbConn *db) +{ + const char *q = "select 1"; + return PQsendQueryParams(db->con, q, 0, NULL, NULL, NULL, NULL, 0); +} + +/** send the query to server connection */ +static void send_query(DbConn *db) +{ + int res; + /* send query */ - if ((random() & 63) == 0) { - res = PQsendQueryParams(db->con, "select pg_sleep(0.2)", 0, - NULL, NULL, NULL, NULL, 0); + if (QueryTypes & QT_SLEEP) { + res = send_query_sleep(db); + } else if (QueryTypes & QT_BIGDATA) { + res = send_query_bigdata(db); } else { - res = PQsendQueryParams(db->con, q, 1, - NULL, /* paramTypes */ - values, /* paramValues */ - lengths,/* paramLengths */ - fmts, /* paramFormats */ - 1); /* resultformat, 0-text, 1-bin */ + res = send_query_simple(db); } if (!res) { conn_error(db, "PQsendQueryParams"); @@ -208,11 +291,9 @@ static void send_query(DbConn *db) /* flush it down */ res = PQflush(db->con); if (res > 0) { - event_set(&db->ev, PQsocket(db->con), EV_WRITE, send_cb, db); - event_add(&db->ev, NULL); + wait_event(db, EV_WRITE, send_cb); } else if (res == 0) { - event_set(&db->ev, PQsocket(db->con), EV_READ, result_cb, db); - event_add(&db->ev, NULL); + wait_event(db, EV_READ, result_cb); } else conn_error(db, "PQflush"); } @@ -225,12 +306,10 @@ static void connect_cb(int sock, short flags, void *arg) poll_res = PQconnectPoll(db->con); switch (poll_res) { case PGRES_POLLING_WRITING: - event_set(&db->ev, PQsocket(db->con), EV_WRITE, connect_cb, db); - event_add(&db->ev, NULL); + wait_event(db, EV_WRITE, connect_cb); break; case PGRES_POLLING_READING: - event_set(&db->ev, PQsocket(db->con), EV_READ, connect_cb, db); - event_add(&db->ev, NULL); + wait_event(db, EV_READ, connect_cb); break; case PGRES_POLLING_OK: send_query(db); @@ -255,8 +334,7 @@ static void launch_connect(DbConn *db) return; } - event_set(&db->ev, PQsocket(db->con), EV_WRITE, connect_cb, db); - event_add(&db->ev, NULL); + wait_event(db, EV_WRITE, connect_cb); } static void handle_idle(DbConn *db) @@ -268,33 +346,110 @@ static void handle_idle(DbConn *db) launch_connect(db); } -int main(void) +static const char usage_str [] = +"usage: asynctest [-d connstr][-n numconn][-s seed][-t ]\n" +"accepted types:\n" +" B - bigdata\n" +" S - sleep occasionally\n" +" 1 - simple 'select 1'\n"; + +static void run_stats(int fd, short ev, void *arg) { - int i; + static struct event ev_stats; + struct timeval period = { 2, 0 }; + + static usec_t last_time = 0; + static uint64_t last_count = 0; + + double time_diff, count_diff; + usec_t now = get_time_usec(); + + time_diff = now - last_time; + if (last_time && time_diff) { + count_diff = QueryCount - last_count; + if (verbose == 0) { + printf(" qps: %8.1f\r", USEC * count_diff / time_diff); + fflush(stdout); + } + } + + if (!last_time) + evtimer_set(&ev_stats, run_stats, NULL); + if (evtimer_add(&ev_stats, &period) < 0) + fatal_perror("evtimer_add"); + + last_count = QueryCount; + last_time = now; +} + +int main(int argc, char *argv[]) +{ + int i, c; DbConn *db; List *item, *tmp; - unsigned seed; + unsigned seed = time(NULL) ^ getpid(); + char *cstr = "dbname=conntest port=6000 host=127.0.0.1"; + int numcon = 50; + + while ((c = getopt(argc, argv, "d:n:s:t:hv")) != EOF) { + switch (c) { + default: + case 'h': + printf("%s", usage_str); + return 0; + case 'd': + cstr = optarg; + break; + case 'n': + numcon = atoi(optarg); + break; + case 's': + seed = atoi(optarg); + break; + case 'v': + verbose++; + break; + case 't': + for (i = 0; optarg[i]; i++) { + switch (optarg[i]) { + case 'B': QueryTypes = QT_BIGDATA; break; + case 'S': QueryTypes = QT_SLEEP; break; + case '1': QueryTypes = QT_SIMPLE; break; + default: log_error("bad type"); break; + } + } + } + } + + if (QueryTypes == 0) + QueryTypes = QT_SIMPLE; - seed = time(NULL) ^ getpid(); printf("using seed: %u\n", seed); srandom(seed); init_bulk_data(); - for (i = 0; i < 50; i++) { - db = new_db("dbname=conntest port=6000 host=127.0.0.1 password=kama"); + for (i = 0; i < numcon; i++) { + db = new_db(cstr); list_append(&db->head, &idle_list); } + +#if 0 if (1) for (i = 0; i < 50; i++) { db = new_db("dbname=conntest port=7000 host=127.0.0.1 password=kama"); list_append(&db->head, &idle_list); } - +#endif event_init(); + run_stats(0, 0, NULL); + + printf("running..\n"); + while (1) { - event_loop(EVLOOP_ONCE); + if (event_loop(EVLOOP_ONCE) < 0) + log_error("event_loop: %s", strerror(errno)); list_for_each_safe(item, &idle_list, tmp) { db = container_of(item, DbConn, head); handle_idle(db); -- 2.40.0