} StatsData;
/*
- * Connection state
+ * Connection state machine states.
+ */
+typedef enum
+{
+ /*
+ * The client must first choose a script to execute. Once chosen, it can
+ * either be throttled (state CSTATE_START_THROTTLE under --rate) or start
+ * right away (state CSTATE_START_TX).
+ */
+ CSTATE_CHOOSE_SCRIPT,
+
+ /*
+ * In CSTATE_START_THROTTLE state, we calculate when to begin the next
+ * transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
+ * sleeps until that moment. (If throttling is not enabled, doCustom()
+ * falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.)
+ */
+ CSTATE_START_THROTTLE,
+ CSTATE_THROTTLE,
+
+ /*
+ * CSTATE_START_TX performs start-of-transaction processing. Establishes
+ * a new connection for the transaction, in --connect mode, and records
+ * the transaction start time.
+ */
+ CSTATE_START_TX,
+
+ /*
+ * We loop through these states, to process each command in the script:
+ *
+ * CSTATE_START_COMMAND starts the execution of a command. On a SQL
+ * command, the command is sent to the server, and we move to
+ * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set,
+ * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
+ * meta-commands are executed immediately.
+ *
+ * CSTATE_WAIT_RESULT waits until we get a result set back from the server
+ * for the current command.
+ *
+ * CSTATE_SLEEP waits until the end of \sleep.
+ *
+ * CSTATE_END_COMMAND records the end-of-command timestamp, increments the
+ * command counter, and loops back to CSTATE_START_COMMAND state.
+ */
+ CSTATE_START_COMMAND,
+ CSTATE_WAIT_RESULT,
+ CSTATE_SLEEP,
+ CSTATE_END_COMMAND,
+
+ /*
+ * CSTATE_END_TX performs end-of-transaction processing. Calculates
+ * latency, and logs the transaction. In --connect mode, closes the
+ * current connection. Chooses the next script to execute and starts over
+ * in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we have no
+ * more work to do.
+ */
+ CSTATE_END_TX,
+
+ /*
+ * Final states. CSTATE_ABORTED means that the script execution was
+ * aborted because a command failed, CSTATE_FINISHED means success.
+ */
+ CSTATE_ABORTED,
+ CSTATE_FINISHED
+} ConnectionStateEnum;
+
+/*
+ * Connection state.
*/
typedef struct
{
PGconn *con; /* connection handle to DB */
int id; /* client No. */
- int state; /* state No. */
- bool listen; /* whether an async query has been sent */
- bool sleeping; /* whether the client is napping */
- bool throttling; /* whether nap is for throttling */
- bool is_throttled; /* whether transaction throttling is done */
+ ConnectionStateEnum state; /* state machine's current state. */
+
+ int use_file; /* index in sql_script for this client */
+ int command; /* command number in script */
+
+ /* client variables */
Variable *variables; /* array of variable definitions */
int nvariables; /* number of variables */
bool vars_sorted; /* are variables sorted by name? */
+
+ /* various times about current transaction */
int64 txn_scheduled; /* scheduled start time of transaction (usec) */
int64 sleep_until; /* scheduled start time of next cmd (usec) */
instr_time txn_begin; /* used for measuring schedule lag times */
instr_time stmt_begin; /* used for measuring statement latencies */
- int use_file; /* index in sql_scripts for this client */
+
bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
/* per client collected stats */
Assert(nargs == 1);
fprintf(stderr, "debug(script=%d,command=%d): ",
- st->use_file, st->state + 1);
+ st->use_file, st->command + 1);
if (varg->type == PGBT_INT)
fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
sprintf(buffer, "P%d_%d", file, state);
}
-static bool
-clientDone(CState *st)
+static void
+commandFailed(CState *st, char *message)
{
- if (st->con != NULL)
- {
- PQfinish(st->con);
- st->con = NULL;
- }
- return false; /* always false */
+ fprintf(stderr,
+ "client %d aborted in command %d of script %d; %s\n",
+ st->id, st->command, st->use_file, message);
}
/* return a script number with a weighted choice. */
return i - 1;
}
-/* return false iff client should be disconnected */
+/* Send a SQL command, using the chosen querymode */
static bool
-doCustom(TState *thread, CState *st, StatsData *agg)
+sendCommand(CState *st, Command *command)
{
- PGresult *res;
- Command **commands;
- bool trans_needs_throttle = false;
- instr_time now;
+ int r;
- /*
- * gettimeofday() isn't free, so we get the current timestamp lazily the
- * first time it's needed, and reuse the same value throughout this
- * function after that. This also ensures that e.g. the calculated latency
- * reported in the log file and in the totals are the same. Zero means
- * "not set yet". Reset "now" when we step to the next command with "goto
- * top", though.
- */
-top:
- INSTR_TIME_SET_ZERO(now);
+ if (querymode == QUERY_SIMPLE)
+ {
+ char *sql;
- commands = sql_script[st->use_file].commands;
+ sql = pg_strdup(command->argv[0]);
+ sql = assignVariables(st, sql);
- /*
- * Handle throttling once per transaction by sleeping. It is simpler to
- * do this here rather than at the end, because so much complicated logic
- * happens below when statements finish.
- */
- if (throttle_delay && !st->is_throttled)
+ if (debug)
+ fprintf(stderr, "client %d sending %s\n", st->id, sql);
+ r = PQsendQuery(st->con, sql);
+ free(sql);
+ }
+ else if (querymode == QUERY_EXTENDED)
{
- /*
- * Generate a delay such that the series of delays will approximate a
- * Poisson distribution centered on the throttle_delay time.
- *
- * If transactions are too slow or a given wait is shorter than a
- * transaction, the next transaction will start right away.
- */
- int64 wait = getPoissonRand(thread, throttle_delay);
+ const char *sql = command->argv[0];
+ const char *params[MAX_ARGS];
- thread->throttle_trigger += wait;
- st->txn_scheduled = thread->throttle_trigger;
+ getQueryParams(st, command, params);
- /* stop client if next transaction is beyond pgbench end of execution */
- if (duration > 0 && st->txn_scheduled > end_time)
- return clientDone(st);
+ if (debug)
+ fprintf(stderr, "client %d sending %s\n", st->id, sql);
+ r = PQsendQueryParams(st->con, sql, command->argc - 1,
+ NULL, params, NULL, NULL, 0);
+ }
+ else if (querymode == QUERY_PREPARED)
+ {
+ char name[MAX_PREPARE_NAME];
+ const char *params[MAX_ARGS];
- /*
- * If this --latency-limit is used, and this slot is already late so
- * that the transaction will miss the latency limit even if it
- * completed immediately, we skip this time slot and iterate till the
- * next slot that isn't late yet.
- */
- if (latency_limit)
+ if (!st->prepared[st->use_file])
{
- int64 now_us;
+ int j;
+ Command **commands = sql_script[st->use_file].commands;
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
- now_us = INSTR_TIME_GET_MICROSEC(now);
- while (thread->throttle_trigger < now_us - latency_limit)
+ for (j = 0; commands[j] != NULL; j++)
{
- processXactStats(thread, st, &now, true, agg);
- /* next rendez-vous */
- wait = getPoissonRand(thread, throttle_delay);
- thread->throttle_trigger += wait;
- st->txn_scheduled = thread->throttle_trigger;
+ PGresult *res;
+ char name[MAX_PREPARE_NAME];
+
+ if (commands[j]->type != SQL_COMMAND)
+ continue;
+ preparedStatementName(name, st->use_file, j);
+ res = PQprepare(st->con, name,
+ commands[j]->argv[0], commands[j]->argc - 1, NULL);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ fprintf(stderr, "%s", PQerrorMessage(st->con));
+ PQclear(res);
}
+ st->prepared[st->use_file] = true;
}
- st->sleep_until = st->txn_scheduled;
- st->sleeping = true;
- st->throttling = true;
- st->is_throttled = true;
+ getQueryParams(st, command, params);
+ preparedStatementName(name, st->use_file, st->command);
+
if (debug)
- fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
- st->id, wait);
+ fprintf(stderr, "client %d sending %s\n", st->id, name);
+ r = PQsendQueryPrepared(st->con, name, command->argc - 1,
+ params, NULL, NULL, 0);
}
+ else /* unknown sql mode */
+ r = 0;
- if (st->sleeping)
- { /* are we sleeping? */
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
- if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
- return true; /* Still sleeping, nothing to do here */
- /* Else done sleeping, go ahead with next command */
- st->sleeping = false;
- st->throttling = false;
+ if (r == 0)
+ {
+ if (debug)
+ fprintf(stderr, "client %d could not send %s\n",
+ st->id, command->argv[0]);
+ st->ecnt++;
+ return false;
}
+ else
+ return true;
+}
+
+/*
+ * Parse the argument to a \sleep command, and return the requested amount
+ * of delay, in microseconds. Returns true on success, false on error.
+ */
+static bool
+evaluateSleep(CState *st, int argc, char **argv, int *usecs)
+{
+ char *var;
+ int usec;
- if (st->listen)
- { /* are we receiver? */
- if (commands[st->state]->type == SQL_COMMAND)
+ if (*argv[1] == ':')
+ {
+ if ((var = getVariable(st, argv[1] + 1)) == NULL)
{
- if (debug)
- fprintf(stderr, "client %d receiving\n", st->id);
- if (!PQconsumeInput(st->con))
- { /* there's something wrong */
- fprintf(stderr, "client %d aborted in state %d; perhaps the backend died while processing\n", st->id, st->state);
- return clientDone(st);
- }
- if (PQisBusy(st->con))
- return true; /* don't have the whole result yet */
+ fprintf(stderr, "%s: undefined variable \"%s\"\n",
+ argv[0], argv[1]);
+ return false;
}
+ usec = atoi(var);
+ }
+ else
+ usec = atoi(argv[1]);
- /*
- * command finished: accumulate per-command execution times in
- * thread-local data structure, if per-command latencies are requested
- */
- if (is_latencies)
- {
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
+ if (argc > 2)
+ {
+ if (pg_strcasecmp(argv[2], "ms") == 0)
+ usec *= 1000;
+ else if (pg_strcasecmp(argv[2], "s") == 0)
+ usec *= 1000000;
+ }
+ else
+ usec *= 1000000;
- /* XXX could use a mutex here, but we choose not to */
- addToSimpleStats(&commands[st->state]->stats,
- INSTR_TIME_GET_DOUBLE(now) -
- INSTR_TIME_GET_DOUBLE(st->stmt_begin));
- }
+ *usecs = usec;
+ return true;
+}
- /* transaction finished: calculate latency and log the transaction */
- if (commands[st->state + 1] == NULL)
- {
- if (progress || throttle_delay || latency_limit ||
- per_script_stats || use_log)
- processXactStats(thread, st, &now, false, agg);
- else
- thread->stats.cnt++;
- }
+/*
+ * Advance the state machine of a connection, if possible.
+ */
+static void
+doCustom(TState *thread, CState *st, StatsData *agg)
+{
+ PGresult *res;
+ Command *command;
+ instr_time now;
+ bool end_tx_processed = false;
+ int64 wait;
- if (commands[st->state]->type == SQL_COMMAND)
- {
- /*
- * Read and discard the query result; note this is not included in
- * the statement latency numbers.
- */
- res = PQgetResult(st->con);
- switch (PQresultStatus(res))
- {
- case PGRES_COMMAND_OK:
- case PGRES_TUPLES_OK:
- case PGRES_EMPTY_QUERY:
- break; /* OK */
- default:
- fprintf(stderr, "client %d aborted in state %d: %s",
- st->id, st->state, PQerrorMessage(st->con));
- PQclear(res);
- return clientDone(st);
- }
- PQclear(res);
- discard_response(st);
- }
+ /*
+ * gettimeofday() isn't free, so we get the current timestamp lazily the
+ * first time it's needed, and reuse the same value throughout this
+ * function after that. This also ensures that e.g. the calculated
+ * latency reported in the log file and in the totals are the same. Zero
+ * means "not set yet". Reset "now" when we execute shell commands or
+ * expressions, which might take a non-negligible amount of time, though.
+ */
+ INSTR_TIME_SET_ZERO(now);
- if (commands[st->state + 1] == NULL)
+ /*
+ * Loop in the state machine, until we have to wait for a result from the
+ * server (or have to sleep, for throttling or for \sleep).
+ *
+ * Note: In the switch-statement below, 'break' will loop back here,
+ * meaning "continue in the state machine". Return is used to return to
+ * the caller.
+ */
+ for (;;)
+ {
+ switch (st->state)
{
- if (is_connect)
- {
- PQfinish(st->con);
- st->con = NULL;
- }
+ /*
+ * Select transaction to run.
+ */
+ case CSTATE_CHOOSE_SCRIPT:
- ++st->cnt;
- if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
- return clientDone(st); /* exit success */
- }
+ st->use_file = chooseScript(thread);
- /* increment state counter */
- st->state++;
- if (commands[st->state] == NULL)
- {
- st->state = 0;
- st->use_file = chooseScript(thread);
- commands = sql_script[st->use_file].commands;
- if (debug)
- fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
- sql_script[st->use_file].desc);
- st->is_throttled = false;
-
- /*
- * No transaction is underway anymore, which means there is
- * nothing to listen to right now. When throttling rate limits
- * are active, a sleep will happen next, as the next transaction
- * starts. And then in any case the next SQL command will set
- * listen back to true.
- */
- st->listen = false;
- trans_needs_throttle = (throttle_delay > 0);
- }
- }
+ if (debug)
+ fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
+ sql_script[st->use_file].desc);
- if (st->con == NULL)
- {
- instr_time start,
- end;
+ if (throttle_delay > 0)
+ st->state = CSTATE_START_THROTTLE;
+ else
+ st->state = CSTATE_START_TX;
+ break;
- INSTR_TIME_SET_CURRENT(start);
- if ((st->con = doConnect()) == NULL)
- {
- fprintf(stderr, "client %d aborted while establishing connection\n",
- st->id);
- return clientDone(st);
- }
- INSTR_TIME_SET_CURRENT(end);
- INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
-
- /* Reset session-local state */
- st->listen = false;
- st->sleeping = false;
- st->throttling = false;
- st->is_throttled = false;
- memset(st->prepared, 0, sizeof(st->prepared));
- }
+ /*
+ * Handle throttling once per transaction by sleeping.
+ */
+ case CSTATE_START_THROTTLE:
- /*
- * This ensures that a throttling delay is inserted before proceeding with
- * sql commands, after the first transaction. The first transaction
- * throttling is performed when first entering doCustom.
- */
- if (trans_needs_throttle)
- {
- trans_needs_throttle = false;
- goto top;
- }
+ /*
+ * Generate a delay such that the series of delays will
+ * approximate a Poisson distribution centered on the
+ * throttle_delay time.
+ *
+ * If transactions are too slow or a given wait is shorter
+ * than a transaction, the next transaction will start right
+ * away.
+ */
+ Assert(throttle_delay > 0);
+ wait = getPoissonRand(thread, throttle_delay);
- /* Record transaction start time under logging, progress or throttling */
- if ((use_log || progress || throttle_delay || latency_limit ||
- per_script_stats) && st->state == 0)
- {
- INSTR_TIME_SET_CURRENT(st->txn_begin);
+ thread->throttle_trigger += wait;
+ st->txn_scheduled = thread->throttle_trigger;
- /*
- * When not throttling, this is also the transaction's scheduled start
- * time.
- */
- if (!throttle_delay)
- st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin);
- }
+ /*
+ * stop client if next transaction is beyond pgbench end of
+ * execution
+ */
+ if (duration > 0 && st->txn_scheduled > end_time)
+ {
+ st->state = CSTATE_FINISHED;
+ break;
+ }
- /* Record statement start time if per-command latencies are requested */
- if (is_latencies)
- INSTR_TIME_SET_CURRENT(st->stmt_begin);
+ /*
+ * If this --latency-limit is used, and this slot is already
+ * late so that the transaction will miss the latency limit
+ * even if it completed immediately, we skip this time slot
+ * and iterate till the next slot that isn't late yet.
+ */
+ if (latency_limit)
+ {
+ int64 now_us;
- if (commands[st->state]->type == SQL_COMMAND)
- {
- const Command *command = commands[st->state];
- int r;
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ now_us = INSTR_TIME_GET_MICROSEC(now);
+ while (thread->throttle_trigger < now_us - latency_limit)
+ {
+ processXactStats(thread, st, &now, true, agg);
+ /* next rendez-vous */
+ wait = getPoissonRand(thread, throttle_delay);
+ thread->throttle_trigger += wait;
+ st->txn_scheduled = thread->throttle_trigger;
+ }
+ }
- if (querymode == QUERY_SIMPLE)
- {
- char *sql;
+ st->state = CSTATE_THROTTLE;
+ if (debug)
+ fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
+ st->id, wait);
+ break;
- sql = pg_strdup(command->argv[0]);
- sql = assignVariables(st, sql);
+ /*
+ * Wait until it's time to start next transaction.
+ */
+ case CSTATE_THROTTLE:
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
+ return; /* Still sleeping, nothing to do here */
+
+ /* Else done sleeping, start the transaction */
+ st->state = CSTATE_START_TX;
+ break;
- if (debug)
- fprintf(stderr, "client %d sending %s\n", st->id, sql);
- r = PQsendQuery(st->con, sql);
- free(sql);
- }
- else if (querymode == QUERY_EXTENDED)
- {
- const char *sql = command->argv[0];
- const char *params[MAX_ARGS];
+ /* Start new transaction */
+ case CSTATE_START_TX:
- getQueryParams(st, command, params);
+ /*
+ * Establish connection on first call, or if is_connect is
+ * true.
+ */
+ if (st->con == NULL)
+ {
+ instr_time start;
- if (debug)
- fprintf(stderr, "client %d sending %s\n", st->id, sql);
- r = PQsendQueryParams(st->con, sql, command->argc - 1,
- NULL, params, NULL, NULL, 0);
- }
- else if (querymode == QUERY_PREPARED)
- {
- char name[MAX_PREPARE_NAME];
- const char *params[MAX_ARGS];
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ start = now;
+ if ((st->con = doConnect()) == NULL)
+ {
+ fprintf(stderr, "client %d aborted while establishing connection\n",
+ st->id);
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ INSTR_TIME_SET_CURRENT(now);
+ INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
- if (!st->prepared[st->use_file])
- {
- int j;
+ /* Reset session-local state */
+ memset(st->prepared, 0, sizeof(st->prepared));
+ }
- for (j = 0; commands[j] != NULL; j++)
+ /*
+ * Record transaction start time under logging, progress or
+ * throttling.
+ */
+ if (use_log || progress || throttle_delay || latency_limit ||
+ per_script_stats)
{
- PGresult *res;
- char name[MAX_PREPARE_NAME];
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ st->txn_begin = now;
+
+ /*
+ * When not throttling, this is also the transaction's
+ * scheduled start time.
+ */
+ if (!throttle_delay)
+ st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
+ }
- if (commands[j]->type != SQL_COMMAND)
- continue;
- preparedStatementName(name, st->use_file, j);
- res = PQprepare(st->con, name,
- commands[j]->argv[0], commands[j]->argc - 1, NULL);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- fprintf(stderr, "%s", PQerrorMessage(st->con));
- PQclear(res);
+ /* Begin with the first command */
+ st->command = 0;
+ st->state = CSTATE_START_COMMAND;
+ break;
+
+ /*
+ * Send a command to server (or execute a meta-command)
+ */
+ case CSTATE_START_COMMAND:
+ command = sql_script[st->use_file].commands[st->command];
+
+ /*
+ * If we reached the end of the script, move to end-of-xact
+ * processing.
+ */
+ if (command == NULL)
+ {
+ st->state = CSTATE_END_TX;
+ break;
}
- st->prepared[st->use_file] = true;
- }
- getQueryParams(st, command, params);
- preparedStatementName(name, st->use_file, st->state);
+ /*
+ * Record statement start time if per-command latencies are
+ * requested
+ */
+ if (is_latencies)
+ {
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ st->stmt_begin = now;
+ }
- if (debug)
- fprintf(stderr, "client %d sending %s\n", st->id, name);
- r = PQsendQueryPrepared(st->con, name, command->argc - 1,
- params, NULL, NULL, 0);
- }
- else /* unknown sql mode */
- r = 0;
+ if (command->type == SQL_COMMAND)
+ {
+ if (!sendCommand(st, command))
+ {
+ /*
+ * Failed. Stay in CSTATE_START_COMMAND state, to
+ * retry. ??? What the point or retrying? Should
+ * rather abort?
+ */
+ return;
+ }
+ else
+ st->state = CSTATE_WAIT_RESULT;
+ }
+ else if (command->type == META_COMMAND)
+ {
+ int argc = command->argc,
+ i;
+ char **argv = command->argv;
- if (r == 0)
- {
- if (debug)
- fprintf(stderr, "client %d could not send %s\n",
- st->id, command->argv[0]);
- st->ecnt++;
- }
- else
- st->listen = true; /* flags that should be listened */
- }
- else if (commands[st->state]->type == META_COMMAND)
- {
- int argc = commands[st->state]->argc,
- i;
- char **argv = commands[st->state]->argv;
+ if (debug)
+ {
+ fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
+ for (i = 1; i < argc; i++)
+ fprintf(stderr, " %s", argv[i]);
+ fprintf(stderr, "\n");
+ }
- if (debug)
- {
- fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
- for (i = 1; i < argc; i++)
- fprintf(stderr, " %s", argv[i]);
- fprintf(stderr, "\n");
- }
+ if (pg_strcasecmp(argv[0], "sleep") == 0)
+ {
+ /*
+ * A \sleep doesn't execute anything, we just get the
+ * delay from the argument, and enter the CSTATE_SLEEP
+ * state. (The per-command latency will be recorded
+ * in CSTATE_SLEEP state, not here, after the delay
+ * has elapsed.)
+ */
+ int usec;
+
+ if (!evaluateSleep(st, argc, argv, &usec))
+ {
+ commandFailed(st, "execution of meta-command 'sleep' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
- if (pg_strcasecmp(argv[0], "set") == 0)
- {
- PgBenchExpr *expr = commands[st->state]->expr;
- PgBenchValue result;
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
+ st->state = CSTATE_SLEEP;
+ break;
+ }
+ else
+ {
+ if (pg_strcasecmp(argv[0], "set") == 0)
+ {
+ PgBenchExpr *expr = command->expr;
+ PgBenchValue result;
- if (!evaluateExpr(thread, st, expr, &result))
- {
- st->ecnt++;
- return true;
- }
+ if (!evaluateExpr(thread, st, expr, &result))
+ {
+ commandFailed(st, "evaluation of meta-command 'set' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
- if (!putVariableNumber(st, argv[0], argv[1], &result))
- {
- st->ecnt++;
- return true;
- }
+ if (!putVariableNumber(st, argv[0], argv[1], &result))
+ {
+ commandFailed(st, "assignment of meta-command 'set' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ }
+ else if (pg_strcasecmp(argv[0], "setshell") == 0)
+ {
+ bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
- st->listen = true;
- }
- else if (pg_strcasecmp(argv[0], "sleep") == 0)
- {
- char *var;
- int usec;
- instr_time now;
+ if (timer_exceeded) /* timeout */
+ {
+ st->state = CSTATE_FINISHED;
+ break;
+ }
+ else if (!ret) /* on error */
+ {
+ commandFailed(st, "execution of meta-command 'setshell' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ else
+ {
+ /* succeeded */
+ }
+ }
+ else if (pg_strcasecmp(argv[0], "shell") == 0)
+ {
+ bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
- if (*argv[1] == ':')
- {
- if ((var = getVariable(st, argv[1] + 1)) == NULL)
+ if (timer_exceeded) /* timeout */
+ {
+ st->state = CSTATE_FINISHED;
+ break;
+ }
+ else if (!ret) /* on error */
+ {
+ commandFailed(st, "execution of meta-command 'shell' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ else
+ {
+ /* succeeded */
+ }
+ }
+
+ /*
+ * executing the expression or shell command might
+ * take a non-negligible amount of time, so reset
+ * 'now'
+ */
+ INSTR_TIME_SET_ZERO(now);
+
+ st->state = CSTATE_END_COMMAND;
+ }
+ }
+ break;
+
+ /*
+ * Wait for the current SQL command to complete
+ */
+ case CSTATE_WAIT_RESULT:
+ command = sql_script[st->use_file].commands[st->command];
+ if (debug)
+ fprintf(stderr, "client %d receiving\n", st->id);
+ if (!PQconsumeInput(st->con))
+ { /* there's something wrong */
+ commandFailed(st, "perhaps the backend died while processing");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ if (PQisBusy(st->con))
+ return; /* don't have the whole result yet */
+
+ /*
+ * Read and discard the query result;
+ */
+ res = PQgetResult(st->con);
+ switch (PQresultStatus(res))
{
- fprintf(stderr, "%s: undefined variable \"%s\"\n",
- argv[0], argv[1]);
- st->ecnt++;
- return true;
+ case PGRES_COMMAND_OK:
+ case PGRES_TUPLES_OK:
+ case PGRES_EMPTY_QUERY:
+ /* OK */
+ PQclear(res);
+ discard_response(st);
+ st->state = CSTATE_END_COMMAND;
+ break;
+ default:
+ commandFailed(st, PQerrorMessage(st->con));
+ PQclear(res);
+ st->state = CSTATE_ABORTED;
+ break;
}
- usec = atoi(var);
- }
- else
- usec = atoi(argv[1]);
+ break;
- if (argc > 2)
- {
- if (pg_strcasecmp(argv[2], "ms") == 0)
- usec *= 1000;
- else if (pg_strcasecmp(argv[2], "s") == 0)
- usec *= 1000000;
- }
- else
- usec *= 1000000;
+ /*
+ * Wait until sleep is done. This state is entered after a
+ * \sleep metacommand. The behavior is similar to
+ * CSTATE_THROTTLE, but proceeds to CSTATE_START_COMMAND
+ * instead of CSTATE_START_TX.
+ */
+ case CSTATE_SLEEP:
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
+ return; /* Still sleeping, nothing to do here */
+ /* Else done sleeping. */
+ st->state = CSTATE_END_COMMAND;
+ break;
- INSTR_TIME_SET_CURRENT(now);
- st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
- st->sleeping = true;
+ /*
+ * End of command: record stats and proceed to next command.
+ */
+ case CSTATE_END_COMMAND:
- st->listen = true;
- }
- else if (pg_strcasecmp(argv[0], "setshell") == 0)
- {
- bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
+ /*
+ * command completed: accumulate per-command execution times
+ * in thread-local data structure, if per-command latencies
+ * are requested.
+ */
+ if (is_latencies)
+ {
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
- if (timer_exceeded) /* timeout */
- return clientDone(st);
- else if (!ret) /* on error */
- {
- st->ecnt++;
- return true;
- }
- else /* succeeded */
- st->listen = true;
- }
- else if (pg_strcasecmp(argv[0], "shell") == 0)
- {
- bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
+ /* XXX could use a mutex here, but we choose not to */
+ command = sql_script[st->use_file].commands[st->command];
+ addToSimpleStats(&command->stats,
+ INSTR_TIME_GET_DOUBLE(now) -
+ INSTR_TIME_GET_DOUBLE(st->stmt_begin));
+ }
- if (timer_exceeded) /* timeout */
- return clientDone(st);
- else if (!ret) /* on error */
- {
- st->ecnt++;
- return true;
- }
- else /* succeeded */
- st->listen = true;
- }
+ /* Go ahead with next command */
+ st->command++;
+ st->state = CSTATE_START_COMMAND;
+ break;
- /* after a meta command, immediately proceed with next command */
- goto top;
- }
+ /*
+ * End of transaction.
+ */
+ case CSTATE_END_TX:
- return true;
+ /*
+ * transaction finished: calculate latency and log the
+ * transaction
+ */
+ if (progress || throttle_delay || latency_limit ||
+ per_script_stats || use_log)
+ processXactStats(thread, st, &now, false, agg);
+ else
+ thread->stats.cnt++;
+
+ if (is_connect)
+ {
+ PQfinish(st->con);
+ st->con = NULL;
+ INSTR_TIME_SET_ZERO(now);
+ }
+
+ ++st->cnt;
+ if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
+ {
+ /* exit success */
+ st->state = CSTATE_FINISHED;
+ break;
+ }
+
+ /*
+ * No transaction is underway anymore.
+ */
+ st->state = CSTATE_CHOOSE_SCRIPT;
+
+ /*
+ * If we paced through all commands in the script in this
+ * loop, without returning to the caller even once, do it now.
+ * This gives the thread a chance to process other
+ * connections, and to do progress reporting. This can
+ * currently only happen if the script consists entirely of
+ * meta-commands.
+ */
+ if (end_tx_processed)
+ return;
+ else
+ {
+ end_tx_processed = true;
+ break;
+ }
+
+ /*
+ * Final states. Close the connection if it's still open.
+ */
+ case CSTATE_ABORTED:
+ case CSTATE_FINISHED:
+ if (st->con != NULL)
+ {
+ PQfinish(st->con);
+ st->con = NULL;
+ }
+ return;
+ }
+ }
}
/*
initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
last = aggs;
- /* send start up queries in async manner */
+ /* initialize explicitely the state machines */
for (i = 0; i < nstate; i++)
{
- CState *st = &state[i];
- int prev_ecnt = st->ecnt;
- Command **commands;
-
- st->use_file = chooseScript(thread);
- commands = sql_script[st->use_file].commands;
- if (debug)
- fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
- sql_script[st->use_file].desc);
- if (!doCustom(thread, st, &aggs))
- remains--; /* I've aborted */
-
- if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
- {
- fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
- i, st->state);
- remains--; /* I've aborted */
- PQfinish(st->con);
- st->con = NULL;
- }
+ state[i].state = CSTATE_CHOOSE_SCRIPT;
}
while (remains > 0)
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
- Command **commands = sql_script[st->use_file].commands;
int sock;
- if (st->con == NULL)
+ if (st->state == CSTATE_THROTTLE && timer_exceeded)
{
+ /* interrupt client which has not started a transaction */
+ st->state = CSTATE_FINISHED;
+ remains--;
+ PQfinish(st->con);
+ st->con = NULL;
continue;
}
- else if (st->sleeping)
+ else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
{
- if (st->throttling && timer_exceeded)
- {
- /* interrupt client which has not started a transaction */
- remains--;
- st->sleeping = false;
- st->throttling = false;
- PQfinish(st->con);
- st->con = NULL;
- continue;
- }
- else /* just a nap from the script */
- {
- int this_usec;
+ /* a nap from the script, or under throttling */
+ int this_usec;
- if (min_usec == PG_INT64_MAX)
- {
- instr_time now;
-
- INSTR_TIME_SET_CURRENT(now);
- now_usec = INSTR_TIME_GET_MICROSEC(now);
- }
+ if (min_usec == PG_INT64_MAX)
+ {
+ instr_time now;
- this_usec = st->txn_scheduled - now_usec;
- if (min_usec > this_usec)
- min_usec = this_usec;
+ INSTR_TIME_SET_CURRENT(now);
+ now_usec = INSTR_TIME_GET_MICROSEC(now);
}
+
+ this_usec = (st->state == CSTATE_SLEEP ?
+ st->sleep_until : st->txn_scheduled) - now_usec;
+ if (min_usec > this_usec)
+ min_usec = this_usec;
}
- else if (commands[st->state]->type == META_COMMAND)
+ else if (st->state == CSTATE_WAIT_RESULT)
{
- min_usec = 0; /* the connection is ready to run */
+ /*
+ * waiting for result from server - nothing to do unless the
+ * socket is readable
+ */
+ sock = PQsocket(st->con);
+ if (sock < 0)
+ {
+ fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
+ goto done;
+ }
+
+ FD_SET(sock, &input_mask);
+
+ if (maxsock < sock)
+ maxsock = sock;
break;
}
-
- sock = PQsocket(st->con);
- if (sock < 0)
+ else if (st->state != CSTATE_ABORTED && st->state != CSTATE_FINISHED)
{
- fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
- goto done;
+ /* the connection is ready to run */
+ min_usec = 0;
+ break;
}
-
- FD_SET(sock, &input_mask);
-
- if (maxsock < sock)
- maxsock = sock;
}
/* also wake up to print the next progress report on time */
}
}
- /* ok, backend returns reply */
+ /* ok, advance the state machine of each connection */
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
- Command **commands = sql_script[st->use_file].commands;
- int prev_ecnt = st->ecnt;
+ bool ready;
- if (st->con)
+ if (st->state == CSTATE_WAIT_RESULT && st->con)
{
int sock = PQsocket(st->con);
PQerrorMessage(st->con));
goto done;
}
- if (FD_ISSET(sock, &input_mask) ||
- commands[st->state]->type == META_COMMAND)
- {
- if (!doCustom(thread, st, &aggs))
- remains--; /* I've aborted */
- }
+
+ ready = FD_ISSET(sock, &input_mask);
}
+ else if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+ ready = false;
+ else
+ ready = true;
- if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
+ if (ready)
{
- fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
- i, st->state);
- remains--; /* I've aborted */
- PQfinish(st->con);
- st->con = NULL;
+ doCustom(thread, st, &aggs);
+ if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+ remains--;
}
}