* A simple benchmark program for PostgreSQL
* Originally written by Tatsuo Ishii and enhanced by many contributors.
*
- * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.88 2009/07/30 09:28:00 mha Exp $
+ * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.89 2009/08/03 15:18:14 ishii Exp $
* Copyright (c) 2000-2009, PostgreSQL Global Development Group
* ALL RIGHTS RESERVED;
*
#include "libpq-fe.h"
#include "pqsignal.h"
+#include "portability/instr_time.h"
#include <ctype.h>
#include <sys/resource.h> /* for getrlimit */
#endif
+#ifndef INT64_MAX
+#define INT64_MAX INT64CONST(0x7FFFFFFFFFFFFFFF)
+#endif
+
+/*
+ * Multi-platform pthread implementations
+ */
+
+#ifdef WIN32
+/* Use native win32 threads on Windows */
+typedef struct win32_pthread *pthread_t;
+typedef int pthread_attr_t;
+static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg);
+static int pthread_join(pthread_t th, void **thread_return);
+
+#elif defined(ENABLE_THREAD_SAFETY)
+/* Use platform-dependent pthread */
+#include <pthread.h>
+
+#else
+
+#include <sys/wait.h>
+/* Use emulation with fork. Rename pthread idendifiers to avoid conflictions */
+#define pthread_t pg_pthread_t
+#define pthread_attr_t pg_pthread_attr_t
+#define pthread_create pg_pthread_create
+#define pthread_join pg_pthread_join
+typedef struct fork_pthread *pthread_t;
+typedef int pthread_attr_t;
+static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg);
+static int pthread_join(pthread_t th, void **thread_return);
+
+#endif
+
extern char *optarg;
extern int optind;
#define DEFAULT_NXACTS 10 /* default nxacts */
-int nclients = 1; /* default number of simulated clients */
int nxacts = 0; /* number of transactions per client */
int duration = 0; /* duration in seconds */
bool use_log; /* log transaction latencies to a file */
-int remains; /* number of remaining clients */
-
int is_connect; /* establish connection for each transaction */
char *pghost = "";
int listen; /* 0 indicates that an async query has been
* sent */
int sleeping; /* 1 indicates that the client is napping */
- struct timeval until; /* napping until */
+ int64 until; /* napping until (usec) */
Variable *variables; /* array of variable definitions */
int nvariables;
- struct timeval txn_begin; /* used for measuring latencies */
+ instr_time txn_begin; /* used for measuring latencies */
int use_file; /* index in sql_files for this client */
bool prepared[MAX_FILES];
} CState;
+/*
+ * Thread state and result
+ */
+typedef struct
+{
+ pthread_t thread; /* thread handle */
+ CState *state; /* array of CState */
+ int nstate; /* length of state */
+ instr_time start_time; /* thread start time */
+} TState;
+
+#define INVALID_THREAD ((pthread_t) 0)
+
+typedef struct
+{
+ instr_time conn_time;
+ int xacts;
+} TResult;
+
/*
* queries read from files
*/
char *argv[MAX_ARGS]; /* command list */
} Command;
-Command **sql_files[MAX_FILES]; /* SQL script files */
-int num_files; /* number of script files */
+static Command **sql_files[MAX_FILES]; /* SQL script files */
+static int num_files; /* number of script files */
+static int debug = 0; /* debug flag */
/* default scenario */
static char *tpc_b = {
"SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
};
-/* Connection overhead time */
-static struct timeval conn_total_time = {0, 0};
-
/* Function prototypes */
static void setalarm(int seconds);
-
-
-/* Calculate total time */
-static void
-addTime(struct timeval * t1, struct timeval * t2, struct timeval * result)
-{
- int sec = t1->tv_sec + t2->tv_sec;
- int usec = t1->tv_usec + t2->tv_usec;
-
- if (usec >= 1000000)
- {
- usec -= 1000000;
- sec++;
- }
- result->tv_sec = sec;
- result->tv_usec = usec;
-}
-
-/* Calculate time difference */
-static void
-diffTime(struct timeval * t1, struct timeval * t2, struct timeval * result)
-{
- int sec = t1->tv_sec - t2->tv_sec;
- int usec = t1->tv_usec - t2->tv_usec;
-
- if (usec < 0)
- {
- usec += 1000000;
- sec--;
- }
- result->tv_sec = sec;
- result->tv_usec = usec;
-}
+static void* threadRun(void *arg);
static void
usage(const char *progname)
" -D VARNAME=VALUE\n"
" define variable for use by custom script\n"
" -f FILENAME read transaction script from FILENAME\n"
+ " -j NUM number of threads (default: 1)\n"
" -l write transaction times to log file\n"
" -M {simple|extended|prepared}\n"
" protocol for submitting queries to server (default: simple)\n"
} while (res);
}
-/* check to see if the SQL result was good */
-static int
-check(CState *state, PGresult *res, int n)
-{
- CState *st = &state[n];
-
- switch (PQresultStatus(res))
- {
- case PGRES_COMMAND_OK:
- case PGRES_TUPLES_OK:
- /* OK */
- break;
- default:
- fprintf(stderr, "Client %d aborted in state %d: %s",
- n, st->state, PQerrorMessage(st->con));
- remains--; /* I've aborted */
- PQfinish(st->con);
- st->con = NULL;
- return (-1);
- }
- return (0); /* OK */
-}
-
static int
compareVariables(const void *v1, const void *v2)
{
sprintf(buffer, "P%d_%d", file, state);
}
-static void
-doCustom(CState *state, int n, int debug)
+static bool
+clientDone(CState *st, bool ok)
+{
+ (void) ok; /* unused */
+
+ if (st->con != NULL)
+ {
+ PQfinish(st->con);
+ st->con = NULL;
+ }
+ return false; /* always false */
+}
+
+/* return false iff client should be disconnected */
+static bool
+doCustom(CState *st, instr_time *conn_time)
{
PGresult *res;
- CState *st = &state[n];
Command **commands;
top:
if (st->sleeping)
{ /* are we sleeping? */
- int usec;
- struct timeval now;
+ instr_time now;
- gettimeofday(&now, NULL);
- usec = (st->until.tv_sec - now.tv_sec) * 1000000 +
- st->until.tv_usec - now.tv_usec;
- if (usec <= 0)
+ INSTR_TIME_SET_CURRENT(now);
+ if (st->until <= INSTR_TIME_GET_MICROSEC(now))
st->sleeping = 0; /* Done sleeping, go ahead with next command */
else
- return; /* Still sleeping, nothing to do here */
+ return true; /* Still sleeping, nothing to do here */
}
if (st->listen)
if (commands[st->state]->type == SQL_COMMAND)
{
if (debug)
- fprintf(stderr, "client %d receiving\n", n);
+ fprintf(stderr, "client %d receiving\n", st->id);
if (!PQconsumeInput(st->con))
{ /* there's something wrong */
- fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
- remains--; /* I've aborted */
- PQfinish(st->con);
- st->con = NULL;
- return;
+ fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
+ return clientDone(st, false);
}
if (PQisBusy(st->con))
- return; /* don't have the whole result yet */
+ return true; /* don't have the whole result yet */
}
/*
*/
if (use_log && commands[st->state + 1] == NULL)
{
- double diff;
- struct timeval now;
-
- gettimeofday(&now, NULL);
- diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 +
- (int) (now.tv_usec - st->txn_begin.tv_usec);
-
- fprintf(LOGFILE, "%d %d %.0f %d %ld %ld\n",
- st->id, st->cnt, diff, st->use_file,
- (long) now.tv_sec, (long) now.tv_usec);
+ instr_time diff;
+ double sec;
+ double msec;
+ double usec;
+
+ INSTR_TIME_SET_CURRENT(diff);
+ INSTR_TIME_SUBTRACT(diff, st->txn_begin);
+ sec = INSTR_TIME_GET_DOUBLE(diff);
+ msec = INSTR_TIME_GET_MILLISEC(diff);
+ usec = (double) INSTR_TIME_GET_MICROSEC(diff);
+
+ fprintf(LOGFILE, "%d %d %.0f %d %.0f %.0f\n",
+ st->id, st->cnt, usec, st->use_file,
+ sec, usec - sec * 1000.0);
}
if (commands[st->state]->type == SQL_COMMAND)
{
res = PQgetResult(st->con);
- if (check(state, res, n))
+ switch (PQresultStatus(res))
{
- PQclear(res);
- return;
+ case PGRES_COMMAND_OK:
+ case PGRES_TUPLES_OK:
+ break; /* OK */
+ default:
+ fprintf(stderr, "Client %d aborted in state %d: %s",
+ st->id, st->state, PQerrorMessage(st->con));
+ PQclear(res);
+ return clientDone(st, false);
}
PQclear(res);
discard_response(st);
++st->cnt;
if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
- {
- remains--; /* I've done */
- if (st->con != NULL)
- {
- PQfinish(st->con);
- st->con = NULL;
- }
- return;
- }
+ return clientDone(st, true); /* exit success */
}
/* increment state counter */
if (st->con == NULL)
{
- struct timeval t1,
- t2,
- t3;
+ instr_time start, end;
- gettimeofday(&t1, NULL);
+ INSTR_TIME_SET_CURRENT(start);
if ((st->con = doConnect()) == NULL)
{
- fprintf(stderr, "Client %d aborted in establishing connection.\n",
- n);
- remains--; /* I've aborted */
- PQfinish(st->con);
- st->con = NULL;
- return;
+ fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
+ return clientDone(st, false);
}
- gettimeofday(&t2, NULL);
- diffTime(&t2, &t1, &t3);
- addTime(&conn_total_time, &t3, &conn_total_time);
+ INSTR_TIME_SET_CURRENT(end);
+ INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
}
if (use_log && st->state == 0)
- gettimeofday(&(st->txn_begin), NULL);
+ INSTR_TIME_SET_CURRENT(st->txn_begin);
if (commands[st->state]->type == SQL_COMMAND)
{
{
fprintf(stderr, "out of memory\n");
st->ecnt++;
- return;
+ return true;
}
if (debug)
- fprintf(stderr, "client %d sending %s\n", n, sql);
+ fprintf(stderr, "client %d sending %s\n", st->id, sql);
r = PQsendQuery(st->con, sql);
free(sql);
}
getQueryParams(st, command, params);
if (debug)
- fprintf(stderr, "client %d sending %s\n", n, sql);
+ fprintf(stderr, "client %d sending %s\n", st->id, sql);
r = PQsendQueryParams(st->con, sql, command->argc - 1,
NULL, params, NULL, NULL, 0);
}
preparedStatementName(name, st->use_file, st->state);
if (debug)
- fprintf(stderr, "client %d sending %s\n", n, name);
+ fprintf(stderr, "client %d sending %s\n", st->id, name);
r = PQsendQueryPrepared(st->con, name, command->argc - 1,
params, NULL, NULL, 0);
}
if (r == 0)
{
if (debug)
- fprintf(stderr, "client %d cannot send %s\n", n, command->argv[0]);
+ fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
st->ecnt++;
}
else
if (debug)
{
- fprintf(stderr, "client %d executing \\%s", n, argv[0]);
+ fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
for (i = 1; i < argc; i++)
fprintf(stderr, " %s", argv[i]);
fprintf(stderr, "\n");
{
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
st->ecnt++;
- return;
+ return true;
}
min = atoi(var);
}
{
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
st->ecnt++;
- return;
+ return true;
}
max = atoi(var);
}
{
fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max);
st->ecnt++;
- return;
+ return true;
}
#ifdef DEBUG
{
fprintf(stderr, "%s: out of memory\n", argv[0]);
st->ecnt++;
- return;
+ return true;
}
st->listen = 1;
{
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
st->ecnt++;
- return;
+ return true;
}
ope1 = atoi(var);
}
{
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
st->ecnt++;
- return;
+ return true;
}
ope2 = atoi(var);
}
{
fprintf(stderr, "%s: division by zero\n", argv[0]);
st->ecnt++;
- return;
+ return true;
}
snprintf(res, sizeof(res), "%d", ope1 / ope2);
}
{
fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
st->ecnt++;
- return;
+ return true;
}
}
{
fprintf(stderr, "%s: out of memory\n", argv[0]);
st->ecnt++;
- return;
+ return true;
}
st->listen = 1;
{
char *var;
int usec;
- struct timeval now;
+ instr_time now;
if (*argv[1] == ':')
{
{
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
st->ecnt++;
- return;
+ return true;
}
usec = atoi(var);
}
else
usec *= 1000000;
- gettimeofday(&now, NULL);
- st->until.tv_sec = now.tv_sec + (now.tv_usec + usec) / 1000000;
- st->until.tv_usec = (now.tv_usec + usec) % 1000000;
+ INSTR_TIME_SET_CURRENT(now);
+ st->until = INSTR_TIME_GET_MICROSEC(now) + usec;
st->sleeping = 1;
st->listen = 1;
goto top;
}
+
+ return true;
}
/* discard connections */
static void
-disconnect_all(CState *state)
+disconnect_all(CState *state, int length)
{
int i;
- for (i = 0; i < nclients; i++)
+ for (i = 0; i < length; i++)
{
if (state[i].con)
+ {
PQfinish(state[i].con);
+ state[i].con = NULL;
+ }
}
}
return NULL;
}
+ /*
+ * Split argument into number and unit for "sleep 1ms" or so.
+ * We don't have to terminate the number argument with null
+ * because it will parsed with atoi, that ignores trailing
+ * non-digit characters.
+ */
+ if (my_commands->argv[1][0] != ':')
+ {
+ char *c = my_commands->argv[1];
+ while (isdigit(*c)) { c++; }
+ if (*c)
+ {
+ my_commands->argv[2] = c;
+ if (my_commands->argc < 3)
+ my_commands->argc = 3;
+ }
+ }
+
if (my_commands->argc >= 3)
{
if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
/* print out results */
static void
-printResults(
- int ttype, CState *state,
- struct timeval * start_time, struct timeval * end_time)
+printResults(int ttype, int normal_xacts, int nclients, int nthreads,
+ instr_time total_time, instr_time conn_total_time)
{
- double t1,
- t2;
- int i;
- int normal_xacts = 0;
+ double time_include,
+ tps_include,
+ tps_exclude;
char *s;
- for (i = 0; i < nclients; i++)
- normal_xacts += state[i].cnt;
-
- t1 = (end_time->tv_sec - start_time->tv_sec) * 1000000.0 + (end_time->tv_usec - start_time->tv_usec);
- t1 = normal_xacts * 1000000.0 / t1;
-
- t2 = (end_time->tv_sec - start_time->tv_sec - conn_total_time.tv_sec) * 1000000.0 +
- (end_time->tv_usec - start_time->tv_usec - conn_total_time.tv_usec);
- t2 = normal_xacts * 1000000.0 / t2;
+ time_include = INSTR_TIME_GET_DOUBLE(total_time);
+ tps_include = normal_xacts / time_include;
+ tps_exclude = normal_xacts / (time_include -
+ (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
if (ttype == 0)
s = "TPC-B (sort of)";
printf("scaling factor: %d\n", scale);
printf("query mode: %s\n", QUERYMODE[querymode]);
printf("number of clients: %d\n", nclients);
+ printf("number of threads: %d\n", nthreads);
if (duration <= 0)
{
printf("number of transactions per client: %d\n", nxacts);
printf("number of transactions actually processed: %d\n",
normal_xacts);
}
- printf("tps = %f (including connections establishing)\n", t1);
- printf("tps = %f (excluding connections establishing)\n", t2);
+ printf("tps = %f (including connections establishing)\n", tps_include);
+ printf("tps = %f (excluding connections establishing)\n", tps_exclude);
}
main(int argc, char **argv)
{
int c;
+ int nclients = 1; /* default number of simulated clients */
+ int nthreads = 1; /* default number of threads */
int is_init_mode = 0; /* initialize mode? */
int is_no_vacuum = 0; /* no vacuum at all before testing? */
int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
- int debug = 0; /* debug flag */
int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only,
* 2: skip update of branches and tellers */
char *filename = NULL;
bool scale_given = false;
CState *state; /* status of clients */
+ TState *threads; /* array of thread */
- struct timeval start_time; /* start up time */
- struct timeval end_time; /* end time */
+ instr_time start_time; /* start up time */
+ instr_time total_time;
+ instr_time conn_total_time;
+ int total_xacts;
int i;
- fd_set input_mask;
- int nsocks; /* return from select(2) */
- int maxsock; /* max socket number to be waited */
- struct timeval now;
- struct timeval timeout;
- int min_usec;
-
#ifdef HAVE_GETRLIMIT
struct rlimit rlim;
#endif
memset(state, 0, sizeof(*state));
- while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:")) != -1)
+ while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:j:")) != -1)
{
switch (c)
{
}
#endif /* HAVE_GETRLIMIT */
break;
+ case 'j': /* jobs */
+ nthreads = atoi(optarg);
+ if (nthreads <= 0)
+ {
+ fprintf(stderr, "invalid number of threads: %d\n", nthreads);
+ exit(1);
+ }
+ break;
case 'C':
is_connect = 1;
break;
if (nxacts <= 0 && duration <= 0)
nxacts = DEFAULT_NXACTS;
- remains = nclients;
+ if (nclients % nthreads != 0)
+ {
+ fprintf(stderr, "number of clients (%d) must be a multiple number of threads (%d)\n", nclients, nthreads);
+ exit(1);
+ }
if (nclients > 1)
{
{
int j;
+ state[i].id = i;
for (j = 0; j < state[0].nvariables; j++)
{
if (putVariable(&state[i], state[0].variables[j].name, state[0].variables[j].value) == false)
PQfinish(con);
/* set random seed */
- gettimeofday(&start_time, NULL);
- srandom((unsigned int) start_time.tv_usec);
-
- /* get start up time */
- gettimeofday(&start_time, NULL);
-
- /* set alarm if duration is specified. */
- if (duration > 0)
- setalarm(duration);
-
- if (is_connect == 0)
- {
- struct timeval t,
- now;
-
- /* make connections to the database */
- for (i = 0; i < nclients; i++)
- {
- state[i].id = i;
- if ((state[i].con = doConnect()) == NULL)
- exit(1);
- }
- /* time after connections set up */
- gettimeofday(&now, NULL);
- diffTime(&now, &start_time, &t);
- addTime(&conn_total_time, &t, &conn_total_time);
- }
+ INSTR_TIME_SET_CURRENT(start_time);
+ srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
/* process bultin SQL scripts */
switch (ttype)
break;
}
+ /* get start up time */
+ INSTR_TIME_SET_CURRENT(start_time);
+
+ /* set alarm if duration is specified. */
+ if (duration > 0)
+ setalarm(duration);
+
+ /* start threads */
+ threads = (TState *) malloc(sizeof(TState) * nthreads);
+ for (i = 0; i < nthreads; i++)
+ {
+ threads[i].state = &state[nclients / nthreads * i];
+ threads[i].nstate = nclients / nthreads;
+ INSTR_TIME_SET_CURRENT(threads[i].start_time);
+
+ /* the first thread (i = 0) is executed by main thread */
+ if (i > 0)
+ {
+ int err = pthread_create(&threads[i].thread, NULL, threadRun, &threads[i]);
+ if (err != 0 || threads[i].thread == INVALID_THREAD)
+ {
+ fprintf(stderr, "cannot create thread: %s\n", strerror(err));
+ exit(1);
+ }
+ }
+ else
+ {
+ threads[i].thread = INVALID_THREAD;
+ }
+ }
+
+ /* wait for threads and accumulate results */
+ total_xacts = 0;
+ INSTR_TIME_SET_ZERO(conn_total_time);
+ for (i = 0; i < nthreads; i++)
+ {
+ void *ret = NULL;
+
+ if (threads[i].thread == INVALID_THREAD)
+ ret = threadRun(&threads[i]);
+ else
+ pthread_join(threads[i].thread, &ret);
+
+ if (ret != NULL)
+ {
+ TResult *r = (TResult *) ret;
+ total_xacts += r->xacts;
+ INSTR_TIME_ADD(conn_total_time, r->conn_time);
+ free(ret);
+ }
+ }
+ disconnect_all(state, nclients);
+
+ /* get end time */
+ INSTR_TIME_SET_CURRENT(total_time);
+ INSTR_TIME_SUBTRACT(total_time, start_time);
+ printResults(ttype, total_xacts, nclients, nthreads, total_time, conn_total_time);
+ if (LOGFILE)
+ fclose(LOGFILE);
+
+ return 0;
+}
+
+static void *
+threadRun(void *arg)
+{
+ TState *thread = (TState *) arg;
+ CState *state = thread->state;
+ TResult *result;
+ instr_time start, end;
+ int nstate = thread->nstate;
+ int remains = nstate; /* number of remaining clients */
+ int i;
+
+ result = malloc(sizeof(TResult));
+ INSTR_TIME_SET_ZERO(result->conn_time);
+
+ if (is_connect == 0)
+ {
+ /* make connections to the database */
+ for (i = 0; i < nstate; i++)
+ {
+ if ((state[i].con = doConnect()) == NULL)
+ goto done;
+ }
+ }
+
+ /* time after thread and connections set up */
+ INSTR_TIME_SET_CURRENT(result->conn_time);
+ INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
+
/* send start up queries in async manner */
- for (i = 0; i < nclients; i++)
+ for (i = 0; i < nstate; i++)
{
- Command **commands = sql_files[state[i].use_file];
- int prev_ecnt = state[i].ecnt;
+ CState *st = &state[i];
+ Command **commands = sql_files[st->use_file];
+ int prev_ecnt = st->ecnt;
- state[i].use_file = getrand(0, num_files - 1);
- doCustom(state, i, debug);
+ st->use_file = getrand(0, num_files - 1);
+ if (!doCustom(st, &result->conn_time))
+ remains--; /* I've aborted */
- if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
+ if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
{
- fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state);
+ fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
remains--; /* I've aborted */
- PQfinish(state[i].con);
- state[i].con = NULL;
+ PQfinish(st->con);
+ st->con = NULL;
}
}
- for (;;)
+ while (remains > 0)
{
- if (remains <= 0)
- { /* all done ? */
- disconnect_all(state);
- /* get end time */
- gettimeofday(&end_time, NULL);
- printResults(ttype, state, &start_time, &end_time);
- if (LOGFILE)
- fclose(LOGFILE);
- exit(0);
- }
+ fd_set input_mask;
+ int maxsock; /* max socket number to be waited */
+ int64 now_usec = 0;
+ int64 min_usec;
FD_ZERO(&input_mask);
maxsock = -1;
- min_usec = -1;
- for (i = 0; i < nclients; i++)
+ min_usec = INT64_MAX;
+ for (i = 0; i < nstate; i++)
{
- Command **commands = sql_files[state[i].use_file];
+ CState *st = &state[i];
+ Command **commands = sql_files[st->use_file];
+ int sock;
- if (state[i].sleeping)
+ if (st->sleeping)
{
int this_usec;
- int sock = PQsocket(state[i].con);
- if (min_usec < 0)
+ if (min_usec == INT64_MAX)
{
- gettimeofday(&now, NULL);
- min_usec = 0;
+ instr_time now;
+ INSTR_TIME_SET_CURRENT(now);
+ now_usec = INSTR_TIME_GET_MICROSEC(now);
}
- this_usec = (state[i].until.tv_sec - now.tv_sec) * 1000000 +
- state[i].until.tv_usec - now.tv_usec;
-
- if (this_usec > 0 && (min_usec == 0 || this_usec < min_usec))
+ this_usec = st->until - now_usec;
+ if (min_usec > this_usec)
min_usec = this_usec;
-
- FD_SET (sock, &input_mask);
-
- if (maxsock < sock)
- maxsock = sock;
}
- else if (state[i].con && commands[state[i].state]->type != META_COMMAND)
+ else if (st->con == NULL)
{
- int sock = PQsocket(state[i].con);
-
- if (sock < 0)
- {
- disconnect_all(state);
- exit(1);
- }
- FD_SET (sock, &input_mask);
+ continue;
+ }
+ else if (commands[st->state]->type == META_COMMAND)
+ {
+ min_usec = 0; /* the connection is ready to run */
+ break;
+ }
- if (maxsock < sock)
- maxsock = sock;
+ sock = PQsocket(st->con);
+ if (sock < 0)
+ {
+ fprintf(stderr, "bad socket: %s\n", strerror(errno));
+ goto done;
}
+
+ FD_SET(sock, &input_mask);
+ if (maxsock < sock)
+ maxsock = sock;
}
- if (maxsock != -1)
+ if (min_usec > 0 && maxsock != -1)
{
- if (min_usec >= 0)
+ int nsocks; /* return from select(2) */
+
+ if (min_usec != INT64_MAX)
{
+ struct timeval timeout;
timeout.tv_sec = min_usec / 1000000;
timeout.tv_usec = min_usec % 1000000;
-
- nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
- (fd_set *) NULL, &timeout);
+ nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
}
else
- nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
- (fd_set *) NULL, (struct timeval *) NULL);
+ nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
if (nsocks < 0)
{
if (errno == EINTR)
continue;
/* must be something wrong */
- disconnect_all(state);
fprintf(stderr, "select failed: %s\n", strerror(errno));
- exit(1);
- }
-#ifdef NOT_USED
- else if (nsocks == 0)
- { /* timeout */
- fprintf(stderr, "select timeout\n");
- for (i = 0; i < nclients; i++)
- {
- fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
- i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
- }
- exit(0);
+ goto done;
}
-#endif
}
/* ok, backend returns reply */
- for (i = 0; i < nclients; i++)
+ for (i = 0; i < nstate; i++)
{
- Command **commands = sql_files[state[i].use_file];
- int prev_ecnt = state[i].ecnt;
+ CState *st = &state[i];
+ Command **commands = sql_files[st->use_file];
+ int prev_ecnt = st->ecnt;
- if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask)
- || commands[state[i].state]->type == META_COMMAND))
+ if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
+ || commands[st->state]->type == META_COMMAND))
{
- doCustom(state, i, debug);
+ if (!doCustom(st, &result->conn_time))
+ remains--; /* I've aborted */
}
- if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
+ if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
{
- fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, state[i].state);
+ fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
remains--; /* I've aborted */
- PQfinish(state[i].con);
- state[i].con = NULL;
+ PQfinish(st->con);
+ st->con = NULL;
}
}
}
+
+done:
+ INSTR_TIME_SET_CURRENT(start);
+ disconnect_all(state, nstate);
+ result->xacts = 0;
+ for (i = 0; i < nstate; i++)
+ result->xacts += state[i].cnt;
+ INSTR_TIME_SET_CURRENT(end);
+ INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
+ return result;
}
pqsignal(SIGALRM, handle_sig_alarm);
alarm(seconds);
}
+
+#ifndef ENABLE_THREAD_SAFETY
+
+/*
+ * implements pthread using fork.
+ */
+
+typedef struct fork_pthread
+{
+ pid_t pid;
+ int pipes[2];
+} fork_pthread;
+
+static int
+pthread_create(pthread_t *thread,
+ pthread_attr_t *attr,
+ void * (*start_routine)(void *),
+ void *arg)
+{
+ fork_pthread *th;
+ void *ret;
+
+ th = (fork_pthread *) malloc(sizeof(fork_pthread));
+ pipe(th->pipes);
+
+ th->pid = fork();
+ if (th->pid == -1) /* error */
+ {
+ free(th);
+ return errno;
+ }
+ if (th->pid != 0) /* parent process */
+ {
+ close(th->pipes[1]);
+ *thread = th;
+ return 0;
+ }
+
+ /* child process */
+ close(th->pipes[0]);
+
+ /* set alarm again because the child does not inherit timers */
+ if (duration > 0)
+ setalarm(duration);
+
+ ret = start_routine(arg);
+ write(th->pipes[1], ret, sizeof(TResult));
+ close(th->pipes[1]);
+ free(th);
+ exit(0);
+}
+
+static int
+pthread_join(pthread_t th, void **thread_return)
+{
+ int status;
+
+ while (waitpid(th->pid, &status, 0) != th->pid)
+ {
+ if (errno != EINTR)
+ return errno;
+ }
+
+ if (thread_return != NULL)
+ {
+ /* assume result is TResult */
+ *thread_return = malloc(sizeof(TResult));
+ if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
+ {
+ free(*thread_return);
+ *thread_return = NULL;
+ }
+ }
+ close(th->pipes[0]);
+
+ free(th);
+ return 0;
+}
+
+#endif
+
#else /* WIN32 */
static VOID CALLBACK
}
}
+/* partial pthread implementation for Windows */
+
+typedef struct win32_pthread
+{
+ HANDLE handle;
+ void *(*routine)(void *);
+ void *arg;
+ void *result;
+} win32_pthread;
+
+static unsigned __stdcall
+win32_pthread_run(void *arg)
+{
+ win32_pthread *th = (win32_pthread *) arg;
+
+ th->result = th->routine(th->arg);
+
+ return 0;
+}
+
+static int
+pthread_create(pthread_t *thread,
+ pthread_attr_t *attr,
+ void * (*start_routine)(void *),
+ void *arg)
+{
+ int save_errno;
+ win32_pthread *th;
+
+ th = (win32_pthread *) malloc(sizeof(win32_pthread));
+ th->routine = start_routine;
+ th->arg = arg;
+ th->result = NULL;
+
+ th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
+ if (th->handle == NULL)
+ {
+ save_errno = errno;
+ free(th);
+ return save_errno;
+ }
+
+ *thread = th;
+ return 0;
+}
+
+static int
+pthread_join(pthread_t th, void **thread_return)
+{
+ if (th == NULL || th->handle == NULL)
+ return errno = EINVAL;
+
+ if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
+ {
+ _dosmaperr(GetLastError());
+ return errno;
+ }
+
+ if (thread_return)
+ *thread_return = th->result;
+
+ CloseHandle(th->handle);
+ free(th);
+ return 0;
+}
+
#endif /* WIN32 */