]> granicus.if.org Git - postgresql/commitdiff
Refactor script execution state machine in pgbench.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 26 Sep 2016 07:56:02 +0000 (10:56 +0300)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 26 Sep 2016 07:56:02 +0000 (10:56 +0300)
The doCustom() function had grown into quite a mess. Rewrite it, in a more
explicit state machine style, for readability.

This also fixes one minor bug: if a script consisted entirely of meta
commands, doCustom() never returned to the caller, so progress reports
with the -P option were not printed. I don't want to backpatch this
refactoring, and the bug is quite insignificant, so only commit this to
master, and leave the bug unfixed in back-branches.

Review and original bug report by Fabien Coelho.

Discussion: <alpine.DEB.2.20.1607090850120.3412@sto>

src/bin/pgbench/pgbench.c

index 8b24ad50e7ab36468126bb10170bbc83eab850ad..1fb4ae46d56bd571afa48dd76dc8dbd94cad4c31 100644 (file)
@@ -235,25 +235,95 @@ typedef struct StatsData
 } StatsData;
 
 /*
- * Connection state
+ * Connection state machine states.
+ */
+typedef enum
+{
+       /*
+        * The client must first choose a script to execute.  Once chosen, it can
+        * either be throttled (state CSTATE_START_THROTTLE under --rate) or start
+        * right away (state CSTATE_START_TX).
+        */
+       CSTATE_CHOOSE_SCRIPT,
+
+       /*
+        * In CSTATE_START_THROTTLE state, we calculate when to begin the next
+        * transaction, and advance to CSTATE_THROTTLE.  CSTATE_THROTTLE state
+        * sleeps until that moment.  (If throttling is not enabled, doCustom()
+        * falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.)
+        */
+       CSTATE_START_THROTTLE,
+       CSTATE_THROTTLE,
+
+       /*
+        * CSTATE_START_TX performs start-of-transaction processing.  Establishes
+        * a new connection for the transaction, in --connect mode, and records
+        * the transaction start time.
+        */
+       CSTATE_START_TX,
+
+       /*
+        * We loop through these states, to process each command in the script:
+        *
+        * CSTATE_START_COMMAND starts the execution of a command.  On a SQL
+        * command, the command is sent to the server, and we move to
+        * CSTATE_WAIT_RESULT state.  On a \sleep meta-command, the timer is set,
+        * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
+        * meta-commands are executed immediately.
+        *
+        * CSTATE_WAIT_RESULT waits until we get a result set back from the server
+        * for the current command.
+        *
+        * CSTATE_SLEEP waits until the end of \sleep.
+        *
+        * CSTATE_END_COMMAND records the end-of-command timestamp, increments the
+        * command counter, and loops back to CSTATE_START_COMMAND state.
+        */
+       CSTATE_START_COMMAND,
+       CSTATE_WAIT_RESULT,
+       CSTATE_SLEEP,
+       CSTATE_END_COMMAND,
+
+       /*
+        * CSTATE_END_TX performs end-of-transaction processing.  Calculates
+        * latency, and logs the transaction.  In --connect mode, closes the
+        * current connection.  Chooses the next script to execute and starts over
+        * in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we have no
+        * more work to do.
+        */
+       CSTATE_END_TX,
+
+       /*
+        * Final states.  CSTATE_ABORTED means that the script execution was
+        * aborted because a command failed, CSTATE_FINISHED means success.
+        */
+       CSTATE_ABORTED,
+       CSTATE_FINISHED
+}      ConnectionStateEnum;
+
+/*
+ * Connection state.
  */
 typedef struct
 {
        PGconn     *con;                        /* connection handle to DB */
        int                     id;                             /* client No. */
-       int                     state;                  /* state No. */
-       bool            listen;                 /* whether an async query has been sent */
-       bool            sleeping;               /* whether the client is napping */
-       bool            throttling;             /* whether nap is for throttling */
-       bool            is_throttled;   /* whether transaction throttling is done */
+       ConnectionStateEnum state;      /* state machine's current state. */
+
+       int                     use_file;               /* index in sql_script for this client */
+       int                     command;                /* command number in script */
+
+       /* client variables */
        Variable   *variables;          /* array of variable definitions */
        int                     nvariables;             /* number of variables */
        bool            vars_sorted;    /* are variables sorted by name? */
+
+       /* various times about current transaction */
        int64           txn_scheduled;  /* scheduled start time of transaction (usec) */
        int64           sleep_until;    /* scheduled start time of next cmd (usec) */
        instr_time      txn_begin;              /* used for measuring schedule lag times */
        instr_time      stmt_begin;             /* used for measuring statement latencies */
-       int                     use_file;               /* index in sql_scripts for this client */
+
        bool            prepared[MAX_SCRIPTS];  /* whether client prepared the script */
 
        /* per client collected stats */
@@ -1382,7 +1452,7 @@ evalFunc(TState *thread, CState *st,
                                Assert(nargs == 1);
 
                                fprintf(stderr, "debug(script=%d,command=%d): ",
-                                               st->use_file, st->state + 1);
+                                               st->use_file, st->command + 1);
 
                                if (varg->type == PGBT_INT)
                                        fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
@@ -1733,15 +1803,12 @@ preparedStatementName(char *buffer, int file, int state)
        sprintf(buffer, "P%d_%d", file, state);
 }
 
-static bool
-clientDone(CState *st)
+static void
+commandFailed(CState *st, char *message)
 {
-       if (st->con != NULL)
-       {
-               PQfinish(st->con);
-               st->con = NULL;
-       }
-       return false;                           /* always false */
+       fprintf(stderr,
+                       "client %d aborted in command %d of script %d; %s\n",
+                       st->id, st->command, st->use_file, message);
 }
 
 /* return a script number with a weighted choice. */
@@ -1763,425 +1830,595 @@ chooseScript(TState *thread)
        return i - 1;
 }
 
-/* return false iff client should be disconnected */
+/* Send a SQL command, using the chosen querymode */
 static bool
-doCustom(TState *thread, CState *st, StatsData *agg)
+sendCommand(CState *st, Command *command)
 {
-       PGresult   *res;
-       Command   **commands;
-       bool            trans_needs_throttle = false;
-       instr_time      now;
+       int                     r;
 
-       /*
-        * gettimeofday() isn't free, so we get the current timestamp lazily the
-        * first time it's needed, and reuse the same value throughout this
-        * function after that. This also ensures that e.g. the calculated latency
-        * reported in the log file and in the totals are the same. Zero means
-        * "not set yet". Reset "now" when we step to the next command with "goto
-        * top", though.
-        */
-top:
-       INSTR_TIME_SET_ZERO(now);
+       if (querymode == QUERY_SIMPLE)
+       {
+               char       *sql;
 
-       commands = sql_script[st->use_file].commands;
+               sql = pg_strdup(command->argv[0]);
+               sql = assignVariables(st, sql);
 
-       /*
-        * Handle throttling once per transaction by sleeping.  It is simpler to
-        * do this here rather than at the end, because so much complicated logic
-        * happens below when statements finish.
-        */
-       if (throttle_delay && !st->is_throttled)
+               if (debug)
+                       fprintf(stderr, "client %d sending %s\n", st->id, sql);
+               r = PQsendQuery(st->con, sql);
+               free(sql);
+       }
+       else if (querymode == QUERY_EXTENDED)
        {
-               /*
-                * Generate a delay such that the series of delays will approximate a
-                * Poisson distribution centered on the throttle_delay time.
-                *
-                * If transactions are too slow or a given wait is shorter than a
-                * transaction, the next transaction will start right away.
-                */
-               int64           wait = getPoissonRand(thread, throttle_delay);
+               const char *sql = command->argv[0];
+               const char *params[MAX_ARGS];
 
-               thread->throttle_trigger += wait;
-               st->txn_scheduled = thread->throttle_trigger;
+               getQueryParams(st, command, params);
 
-               /* stop client if next transaction is beyond pgbench end of execution */
-               if (duration > 0 && st->txn_scheduled > end_time)
-                       return clientDone(st);
+               if (debug)
+                       fprintf(stderr, "client %d sending %s\n", st->id, sql);
+               r = PQsendQueryParams(st->con, sql, command->argc - 1,
+                                                         NULL, params, NULL, NULL, 0);
+       }
+       else if (querymode == QUERY_PREPARED)
+       {
+               char            name[MAX_PREPARE_NAME];
+               const char *params[MAX_ARGS];
 
-               /*
-                * If this --latency-limit is used, and this slot is already late so
-                * that the transaction will miss the latency limit even if it
-                * completed immediately, we skip this time slot and iterate till the
-                * next slot that isn't late yet.
-                */
-               if (latency_limit)
+               if (!st->prepared[st->use_file])
                {
-                       int64           now_us;
+                       int                     j;
+                       Command   **commands = sql_script[st->use_file].commands;
 
-                       if (INSTR_TIME_IS_ZERO(now))
-                               INSTR_TIME_SET_CURRENT(now);
-                       now_us = INSTR_TIME_GET_MICROSEC(now);
-                       while (thread->throttle_trigger < now_us - latency_limit)
+                       for (j = 0; commands[j] != NULL; j++)
                        {
-                               processXactStats(thread, st, &now, true, agg);
-                               /* next rendez-vous */
-                               wait = getPoissonRand(thread, throttle_delay);
-                               thread->throttle_trigger += wait;
-                               st->txn_scheduled = thread->throttle_trigger;
+                               PGresult   *res;
+                               char            name[MAX_PREPARE_NAME];
+
+                               if (commands[j]->type != SQL_COMMAND)
+                                       continue;
+                               preparedStatementName(name, st->use_file, j);
+                               res = PQprepare(st->con, name,
+                                                 commands[j]->argv[0], commands[j]->argc - 1, NULL);
+                               if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                                       fprintf(stderr, "%s", PQerrorMessage(st->con));
+                               PQclear(res);
                        }
+                       st->prepared[st->use_file] = true;
                }
 
-               st->sleep_until = st->txn_scheduled;
-               st->sleeping = true;
-               st->throttling = true;
-               st->is_throttled = true;
+               getQueryParams(st, command, params);
+               preparedStatementName(name, st->use_file, st->command);
+
                if (debug)
-                       fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
-                                       st->id, wait);
+                       fprintf(stderr, "client %d sending %s\n", st->id, name);
+               r = PQsendQueryPrepared(st->con, name, command->argc - 1,
+                                                               params, NULL, NULL, 0);
        }
+       else    /* unknown sql mode */
+               r = 0;
 
-       if (st->sleeping)
-       {                                                       /* are we sleeping? */
-               if (INSTR_TIME_IS_ZERO(now))
-                       INSTR_TIME_SET_CURRENT(now);
-               if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
-                       return true;            /* Still sleeping, nothing to do here */
-               /* Else done sleeping, go ahead with next command */
-               st->sleeping = false;
-               st->throttling = false;
+       if (r == 0)
+       {
+               if (debug)
+                       fprintf(stderr, "client %d could not send %s\n",
+                                       st->id, command->argv[0]);
+               st->ecnt++;
+               return false;
        }
+       else
+               return true;
+}
+
+/*
+ * Parse the argument to a \sleep command, and return the requested amount
+ * of delay, in microseconds.  Returns true on success, false on error.
+ */
+static bool
+evaluateSleep(CState *st, int argc, char **argv, int *usecs)
+{
+       char       *var;
+       int                     usec;
 
-       if (st->listen)
-       {                                                       /* are we receiver? */
-               if (commands[st->state]->type == SQL_COMMAND)
+       if (*argv[1] == ':')
+       {
+               if ((var = getVariable(st, argv[1] + 1)) == NULL)
                {
-                       if (debug)
-                               fprintf(stderr, "client %d receiving\n", st->id);
-                       if (!PQconsumeInput(st->con))
-                       {                                       /* there's something wrong */
-                               fprintf(stderr, "client %d aborted in state %d; perhaps the backend died while processing\n", st->id, st->state);
-                               return clientDone(st);
-                       }
-                       if (PQisBusy(st->con))
-                               return true;    /* don't have the whole result yet */
+                       fprintf(stderr, "%s: undefined variable \"%s\"\n",
+                                       argv[0], argv[1]);
+                       return false;
                }
+               usec = atoi(var);
+       }
+       else
+               usec = atoi(argv[1]);
 
-               /*
-                * command finished: accumulate per-command execution times in
-                * thread-local data structure, if per-command latencies are requested
-                */
-               if (is_latencies)
-               {
-                       if (INSTR_TIME_IS_ZERO(now))
-                               INSTR_TIME_SET_CURRENT(now);
+       if (argc > 2)
+       {
+               if (pg_strcasecmp(argv[2], "ms") == 0)
+                       usec *= 1000;
+               else if (pg_strcasecmp(argv[2], "s") == 0)
+                       usec *= 1000000;
+       }
+       else
+               usec *= 1000000;
 
-                       /* XXX could use a mutex here, but we choose not to */
-                       addToSimpleStats(&commands[st->state]->stats,
-                                                        INSTR_TIME_GET_DOUBLE(now) -
-                                                        INSTR_TIME_GET_DOUBLE(st->stmt_begin));
-               }
+       *usecs = usec;
+       return true;
+}
 
-               /* transaction finished: calculate latency and log the transaction */
-               if (commands[st->state + 1] == NULL)
-               {
-                       if (progress || throttle_delay || latency_limit ||
-                               per_script_stats || use_log)
-                               processXactStats(thread, st, &now, false, agg);
-                       else
-                               thread->stats.cnt++;
-               }
+/*
+ * Advance the state machine of a connection, if possible.
+ */
+static void
+doCustom(TState *thread, CState *st, StatsData *agg)
+{
+       PGresult   *res;
+       Command    *command;
+       instr_time      now;
+       bool            end_tx_processed = false;
+       int64           wait;
 
-               if (commands[st->state]->type == SQL_COMMAND)
-               {
-                       /*
-                        * Read and discard the query result; note this is not included in
-                        * the statement latency numbers.
-                        */
-                       res = PQgetResult(st->con);
-                       switch (PQresultStatus(res))
-                       {
-                               case PGRES_COMMAND_OK:
-                               case PGRES_TUPLES_OK:
-                               case PGRES_EMPTY_QUERY:
-                                       break;          /* OK */
-                               default:
-                                       fprintf(stderr, "client %d aborted in state %d: %s",
-                                                       st->id, st->state, PQerrorMessage(st->con));
-                                       PQclear(res);
-                                       return clientDone(st);
-                       }
-                       PQclear(res);
-                       discard_response(st);
-               }
+       /*
+        * gettimeofday() isn't free, so we get the current timestamp lazily the
+        * first time it's needed, and reuse the same value throughout this
+        * function after that.  This also ensures that e.g. the calculated
+        * latency reported in the log file and in the totals are the same. Zero
+        * means "not set yet".  Reset "now" when we execute shell commands or
+        * expressions, which might take a non-negligible amount of time, though.
+        */
+       INSTR_TIME_SET_ZERO(now);
 
-               if (commands[st->state + 1] == NULL)
+       /*
+        * Loop in the state machine, until we have to wait for a result from the
+        * server (or have to sleep, for throttling or for \sleep).
+        *
+        * Note: In the switch-statement below, 'break' will loop back here,
+        * meaning "continue in the state machine".  Return is used to return to
+        * the caller.
+        */
+       for (;;)
+       {
+               switch (st->state)
                {
-                       if (is_connect)
-                       {
-                               PQfinish(st->con);
-                               st->con = NULL;
-                       }
+                               /*
+                                * Select transaction to run.
+                                */
+                       case CSTATE_CHOOSE_SCRIPT:
 
-                       ++st->cnt;
-                       if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
-                               return clientDone(st);  /* exit success */
-               }
+                               st->use_file = chooseScript(thread);
 
-               /* increment state counter */
-               st->state++;
-               if (commands[st->state] == NULL)
-               {
-                       st->state = 0;
-                       st->use_file = chooseScript(thread);
-                       commands = sql_script[st->use_file].commands;
-                       if (debug)
-                               fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
-                                               sql_script[st->use_file].desc);
-                       st->is_throttled = false;
-
-                       /*
-                        * No transaction is underway anymore, which means there is
-                        * nothing to listen to right now.  When throttling rate limits
-                        * are active, a sleep will happen next, as the next transaction
-                        * starts.  And then in any case the next SQL command will set
-                        * listen back to true.
-                        */
-                       st->listen = false;
-                       trans_needs_throttle = (throttle_delay > 0);
-               }
-       }
+                               if (debug)
+                                       fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
+                                                       sql_script[st->use_file].desc);
 
-       if (st->con == NULL)
-       {
-               instr_time      start,
-                                       end;
+                               if (throttle_delay > 0)
+                                       st->state = CSTATE_START_THROTTLE;
+                               else
+                                       st->state = CSTATE_START_TX;
+                               break;
 
-               INSTR_TIME_SET_CURRENT(start);
-               if ((st->con = doConnect()) == NULL)
-               {
-                       fprintf(stderr, "client %d aborted while establishing connection\n",
-                                       st->id);
-                       return clientDone(st);
-               }
-               INSTR_TIME_SET_CURRENT(end);
-               INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
-
-               /* Reset session-local state */
-               st->listen = false;
-               st->sleeping = false;
-               st->throttling = false;
-               st->is_throttled = false;
-               memset(st->prepared, 0, sizeof(st->prepared));
-       }
+                               /*
+                                * Handle throttling once per transaction by sleeping.
+                                */
+                       case CSTATE_START_THROTTLE:
 
-       /*
-        * This ensures that a throttling delay is inserted before proceeding with
-        * sql commands, after the first transaction. The first transaction
-        * throttling is performed when first entering doCustom.
-        */
-       if (trans_needs_throttle)
-       {
-               trans_needs_throttle = false;
-               goto top;
-       }
+                               /*
+                                * Generate a delay such that the series of delays will
+                                * approximate a Poisson distribution centered on the
+                                * throttle_delay time.
+                                *
+                                * If transactions are too slow or a given wait is shorter
+                                * than a transaction, the next transaction will start right
+                                * away.
+                                */
+                               Assert(throttle_delay > 0);
+                               wait = getPoissonRand(thread, throttle_delay);
 
-       /* Record transaction start time under logging, progress or throttling */
-       if ((use_log || progress || throttle_delay || latency_limit ||
-                per_script_stats) && st->state == 0)
-       {
-               INSTR_TIME_SET_CURRENT(st->txn_begin);
+                               thread->throttle_trigger += wait;
+                               st->txn_scheduled = thread->throttle_trigger;
 
-               /*
-                * When not throttling, this is also the transaction's scheduled start
-                * time.
-                */
-               if (!throttle_delay)
-                       st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin);
-       }
+                               /*
+                                * stop client if next transaction is beyond pgbench end of
+                                * execution
+                                */
+                               if (duration > 0 && st->txn_scheduled > end_time)
+                               {
+                                       st->state = CSTATE_FINISHED;
+                                       break;
+                               }
 
-       /* Record statement start time if per-command latencies are requested */
-       if (is_latencies)
-               INSTR_TIME_SET_CURRENT(st->stmt_begin);
+                               /*
+                                * If this --latency-limit is used, and this slot is already
+                                * late so that the transaction will miss the latency limit
+                                * even if it completed immediately, we skip this time slot
+                                * and iterate till the next slot that isn't late yet.
+                                */
+                               if (latency_limit)
+                               {
+                                       int64           now_us;
 
-       if (commands[st->state]->type == SQL_COMMAND)
-       {
-               const Command *command = commands[st->state];
-               int                     r;
+                                       if (INSTR_TIME_IS_ZERO(now))
+                                               INSTR_TIME_SET_CURRENT(now);
+                                       now_us = INSTR_TIME_GET_MICROSEC(now);
+                                       while (thread->throttle_trigger < now_us - latency_limit)
+                                       {
+                                               processXactStats(thread, st, &now, true, agg);
+                                               /* next rendez-vous */
+                                               wait = getPoissonRand(thread, throttle_delay);
+                                               thread->throttle_trigger += wait;
+                                               st->txn_scheduled = thread->throttle_trigger;
+                                       }
+                               }
 
-               if (querymode == QUERY_SIMPLE)
-               {
-                       char       *sql;
+                               st->state = CSTATE_THROTTLE;
+                               if (debug)
+                                       fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
+                                                       st->id, wait);
+                               break;
 
-                       sql = pg_strdup(command->argv[0]);
-                       sql = assignVariables(st, sql);
+                               /*
+                                * Wait until it's time to start next transaction.
+                                */
+                       case CSTATE_THROTTLE:
+                               if (INSTR_TIME_IS_ZERO(now))
+                                       INSTR_TIME_SET_CURRENT(now);
+                               if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
+                                       return;         /* Still sleeping, nothing to do here */
+
+                               /* Else done sleeping, start the transaction */
+                               st->state = CSTATE_START_TX;
+                               break;
 
-                       if (debug)
-                               fprintf(stderr, "client %d sending %s\n", st->id, sql);
-                       r = PQsendQuery(st->con, sql);
-                       free(sql);
-               }
-               else if (querymode == QUERY_EXTENDED)
-               {
-                       const char *sql = command->argv[0];
-                       const char *params[MAX_ARGS];
+                               /* Start new transaction */
+                       case CSTATE_START_TX:
 
-                       getQueryParams(st, command, params);
+                               /*
+                                * Establish connection on first call, or if is_connect is
+                                * true.
+                                */
+                               if (st->con == NULL)
+                               {
+                                       instr_time      start;
 
-                       if (debug)
-                               fprintf(stderr, "client %d sending %s\n", st->id, sql);
-                       r = PQsendQueryParams(st->con, sql, command->argc - 1,
-                                                                 NULL, params, NULL, NULL, 0);
-               }
-               else if (querymode == QUERY_PREPARED)
-               {
-                       char            name[MAX_PREPARE_NAME];
-                       const char *params[MAX_ARGS];
+                                       if (INSTR_TIME_IS_ZERO(now))
+                                               INSTR_TIME_SET_CURRENT(now);
+                                       start = now;
+                                       if ((st->con = doConnect()) == NULL)
+                                       {
+                                               fprintf(stderr, "client %d aborted while establishing connection\n",
+                                                               st->id);
+                                               st->state = CSTATE_ABORTED;
+                                               break;
+                                       }
+                                       INSTR_TIME_SET_CURRENT(now);
+                                       INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
 
-                       if (!st->prepared[st->use_file])
-                       {
-                               int                     j;
+                                       /* Reset session-local state */
+                                       memset(st->prepared, 0, sizeof(st->prepared));
+                               }
 
-                               for (j = 0; commands[j] != NULL; j++)
+                               /*
+                                * Record transaction start time under logging, progress or
+                                * throttling.
+                                */
+                               if (use_log || progress || throttle_delay || latency_limit ||
+                                       per_script_stats)
                                {
-                                       PGresult   *res;
-                                       char            name[MAX_PREPARE_NAME];
+                                       if (INSTR_TIME_IS_ZERO(now))
+                                               INSTR_TIME_SET_CURRENT(now);
+                                       st->txn_begin = now;
+
+                                       /*
+                                        * When not throttling, this is also the transaction's
+                                        * scheduled start time.
+                                        */
+                                       if (!throttle_delay)
+                                               st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
+                               }
 
-                                       if (commands[j]->type != SQL_COMMAND)
-                                               continue;
-                                       preparedStatementName(name, st->use_file, j);
-                                       res = PQprepare(st->con, name,
-                                                 commands[j]->argv[0], commands[j]->argc - 1, NULL);
-                                       if (PQresultStatus(res) != PGRES_COMMAND_OK)
-                                               fprintf(stderr, "%s", PQerrorMessage(st->con));
-                                       PQclear(res);
+                               /* Begin with the first command */
+                               st->command = 0;
+                               st->state = CSTATE_START_COMMAND;
+                               break;
+
+                               /*
+                                * Send a command to server (or execute a meta-command)
+                                */
+                       case CSTATE_START_COMMAND:
+                               command = sql_script[st->use_file].commands[st->command];
+
+                               /*
+                                * If we reached the end of the script, move to end-of-xact
+                                * processing.
+                                */
+                               if (command == NULL)
+                               {
+                                       st->state = CSTATE_END_TX;
+                                       break;
                                }
-                               st->prepared[st->use_file] = true;
-                       }
 
-                       getQueryParams(st, command, params);
-                       preparedStatementName(name, st->use_file, st->state);
+                               /*
+                                * Record statement start time if per-command latencies are
+                                * requested
+                                */
+                               if (is_latencies)
+                               {
+                                       if (INSTR_TIME_IS_ZERO(now))
+                                               INSTR_TIME_SET_CURRENT(now);
+                                       st->stmt_begin = now;
+                               }
 
-                       if (debug)
-                               fprintf(stderr, "client %d sending %s\n", st->id, name);
-                       r = PQsendQueryPrepared(st->con, name, command->argc - 1,
-                                                                       params, NULL, NULL, 0);
-               }
-               else    /* unknown sql mode */
-                       r = 0;
+                               if (command->type == SQL_COMMAND)
+                               {
+                                       if (!sendCommand(st, command))
+                                       {
+                                               /*
+                                                * Failed. Stay in CSTATE_START_COMMAND state, to
+                                                * retry. ??? What the point or retrying? Should
+                                                * rather abort?
+                                                */
+                                               return;
+                                       }
+                                       else
+                                               st->state = CSTATE_WAIT_RESULT;
+                               }
+                               else if (command->type == META_COMMAND)
+                               {
+                                       int                     argc = command->argc,
+                                                               i;
+                                       char      **argv = command->argv;
 
-               if (r == 0)
-               {
-                       if (debug)
-                               fprintf(stderr, "client %d could not send %s\n",
-                                               st->id, command->argv[0]);
-                       st->ecnt++;
-               }
-               else
-                       st->listen = true;      /* flags that should be listened */
-       }
-       else if (commands[st->state]->type == META_COMMAND)
-       {
-               int                     argc = commands[st->state]->argc,
-                                       i;
-               char      **argv = commands[st->state]->argv;
+                                       if (debug)
+                                       {
+                                               fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
+                                               for (i = 1; i < argc; i++)
+                                                       fprintf(stderr, " %s", argv[i]);
+                                               fprintf(stderr, "\n");
+                                       }
 
-               if (debug)
-               {
-                       fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
-                       for (i = 1; i < argc; i++)
-                               fprintf(stderr, " %s", argv[i]);
-                       fprintf(stderr, "\n");
-               }
+                                       if (pg_strcasecmp(argv[0], "sleep") == 0)
+                                       {
+                                               /*
+                                                * A \sleep doesn't execute anything, we just get the
+                                                * delay from the argument, and enter the CSTATE_SLEEP
+                                                * state.  (The per-command latency will be recorded
+                                                * in CSTATE_SLEEP state, not here, after the delay
+                                                * has elapsed.)
+                                                */
+                                               int                     usec;
+
+                                               if (!evaluateSleep(st, argc, argv, &usec))
+                                               {
+                                                       commandFailed(st, "execution of meta-command 'sleep' failed");
+                                                       st->state = CSTATE_ABORTED;
+                                                       break;
+                                               }
 
-               if (pg_strcasecmp(argv[0], "set") == 0)
-               {
-                       PgBenchExpr *expr = commands[st->state]->expr;
-                       PgBenchValue result;
+                                               if (INSTR_TIME_IS_ZERO(now))
+                                                       INSTR_TIME_SET_CURRENT(now);
+                                               st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
+                                               st->state = CSTATE_SLEEP;
+                                               break;
+                                       }
+                                       else
+                                       {
+                                               if (pg_strcasecmp(argv[0], "set") == 0)
+                                               {
+                                                       PgBenchExpr *expr = command->expr;
+                                                       PgBenchValue result;
 
-                       if (!evaluateExpr(thread, st, expr, &result))
-                       {
-                               st->ecnt++;
-                               return true;
-                       }
+                                                       if (!evaluateExpr(thread, st, expr, &result))
+                                                       {
+                                                               commandFailed(st, "evaluation of meta-command 'set' failed");
+                                                               st->state = CSTATE_ABORTED;
+                                                               break;
+                                                       }
 
-                       if (!putVariableNumber(st, argv[0], argv[1], &result))
-                       {
-                               st->ecnt++;
-                               return true;
-                       }
+                                                       if (!putVariableNumber(st, argv[0], argv[1], &result))
+                                                       {
+                                                               commandFailed(st, "assignment of meta-command 'set' failed");
+                                                               st->state = CSTATE_ABORTED;
+                                                               break;
+                                                       }
+                                               }
+                                               else if (pg_strcasecmp(argv[0], "setshell") == 0)
+                                               {
+                                                       bool            ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
 
-                       st->listen = true;
-               }
-               else if (pg_strcasecmp(argv[0], "sleep") == 0)
-               {
-                       char       *var;
-                       int                     usec;
-                       instr_time      now;
+                                                       if (timer_exceeded) /* timeout */
+                                                       {
+                                                               st->state = CSTATE_FINISHED;
+                                                               break;
+                                                       }
+                                                       else if (!ret)          /* on error */
+                                                       {
+                                                               commandFailed(st, "execution of meta-command 'setshell' failed");
+                                                               st->state = CSTATE_ABORTED;
+                                                               break;
+                                                       }
+                                                       else
+                                                       {
+                                                               /* succeeded */
+                                                       }
+                                               }
+                                               else if (pg_strcasecmp(argv[0], "shell") == 0)
+                                               {
+                                                       bool            ret = runShellCommand(st, NULL, argv + 1, argc - 1);
 
-                       if (*argv[1] == ':')
-                       {
-                               if ((var = getVariable(st, argv[1] + 1)) == NULL)
+                                                       if (timer_exceeded) /* timeout */
+                                                       {
+                                                               st->state = CSTATE_FINISHED;
+                                                               break;
+                                                       }
+                                                       else if (!ret)          /* on error */
+                                                       {
+                                                               commandFailed(st, "execution of meta-command 'shell' failed");
+                                                               st->state = CSTATE_ABORTED;
+                                                               break;
+                                                       }
+                                                       else
+                                                       {
+                                                               /* succeeded */
+                                                       }
+                                               }
+
+                                               /*
+                                                * executing the expression or shell command might
+                                                * take a non-negligible amount of time, so reset
+                                                * 'now'
+                                                */
+                                               INSTR_TIME_SET_ZERO(now);
+
+                                               st->state = CSTATE_END_COMMAND;
+                                       }
+                               }
+                               break;
+
+                               /*
+                                * Wait for the current SQL command to complete
+                                */
+                       case CSTATE_WAIT_RESULT:
+                               command = sql_script[st->use_file].commands[st->command];
+                               if (debug)
+                                       fprintf(stderr, "client %d receiving\n", st->id);
+                               if (!PQconsumeInput(st->con))
+                               {                               /* there's something wrong */
+                                       commandFailed(st, "perhaps the backend died while processing");
+                                       st->state = CSTATE_ABORTED;
+                                       break;
+                               }
+                               if (PQisBusy(st->con))
+                                       return;         /* don't have the whole result yet */
+
+                               /*
+                                * Read and discard the query result;
+                                */
+                               res = PQgetResult(st->con);
+                               switch (PQresultStatus(res))
                                {
-                                       fprintf(stderr, "%s: undefined variable \"%s\"\n",
-                                                       argv[0], argv[1]);
-                                       st->ecnt++;
-                                       return true;
+                                       case PGRES_COMMAND_OK:
+                                       case PGRES_TUPLES_OK:
+                                       case PGRES_EMPTY_QUERY:
+                                               /* OK */
+                                               PQclear(res);
+                                               discard_response(st);
+                                               st->state = CSTATE_END_COMMAND;
+                                               break;
+                                       default:
+                                               commandFailed(st, PQerrorMessage(st->con));
+                                               PQclear(res);
+                                               st->state = CSTATE_ABORTED;
+                                               break;
                                }
-                               usec = atoi(var);
-                       }
-                       else
-                               usec = atoi(argv[1]);
+                               break;
 
-                       if (argc > 2)
-                       {
-                               if (pg_strcasecmp(argv[2], "ms") == 0)
-                                       usec *= 1000;
-                               else if (pg_strcasecmp(argv[2], "s") == 0)
-                                       usec *= 1000000;
-                       }
-                       else
-                               usec *= 1000000;
+                               /*
+                                * Wait until sleep is done. This state is entered after a
+                                * \sleep metacommand. The behavior is similar to
+                                * CSTATE_THROTTLE, but proceeds to CSTATE_START_COMMAND
+                                * instead of CSTATE_START_TX.
+                                */
+                       case CSTATE_SLEEP:
+                               if (INSTR_TIME_IS_ZERO(now))
+                                       INSTR_TIME_SET_CURRENT(now);
+                               if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
+                                       return;         /* Still sleeping, nothing to do here */
+                               /* Else done sleeping. */
+                               st->state = CSTATE_END_COMMAND;
+                               break;
 
-                       INSTR_TIME_SET_CURRENT(now);
-                       st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
-                       st->sleeping = true;
+                               /*
+                                * End of command: record stats and proceed to next command.
+                                */
+                       case CSTATE_END_COMMAND:
 
-                       st->listen = true;
-               }
-               else if (pg_strcasecmp(argv[0], "setshell") == 0)
-               {
-                       bool            ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
+                               /*
+                                * command completed: accumulate per-command execution times
+                                * in thread-local data structure, if per-command latencies
+                                * are requested.
+                                */
+                               if (is_latencies)
+                               {
+                                       if (INSTR_TIME_IS_ZERO(now))
+                                               INSTR_TIME_SET_CURRENT(now);
 
-                       if (timer_exceeded) /* timeout */
-                               return clientDone(st);
-                       else if (!ret)          /* on error */
-                       {
-                               st->ecnt++;
-                               return true;
-                       }
-                       else    /* succeeded */
-                               st->listen = true;
-               }
-               else if (pg_strcasecmp(argv[0], "shell") == 0)
-               {
-                       bool            ret = runShellCommand(st, NULL, argv + 1, argc - 1);
+                                       /* XXX could use a mutex here, but we choose not to */
+                                       command = sql_script[st->use_file].commands[st->command];
+                                       addToSimpleStats(&command->stats,
+                                                                        INSTR_TIME_GET_DOUBLE(now) -
+                                                                        INSTR_TIME_GET_DOUBLE(st->stmt_begin));
+                               }
 
-                       if (timer_exceeded) /* timeout */
-                               return clientDone(st);
-                       else if (!ret)          /* on error */
-                       {
-                               st->ecnt++;
-                               return true;
-                       }
-                       else    /* succeeded */
-                               st->listen = true;
-               }
+                               /* Go ahead with next command */
+                               st->command++;
+                               st->state = CSTATE_START_COMMAND;
+                               break;
 
-               /* after a meta command, immediately proceed with next command */
-               goto top;
-       }
+                               /*
+                                * End of transaction.
+                                */
+                       case CSTATE_END_TX:
 
-       return true;
+                               /*
+                                * transaction finished: calculate latency and log the
+                                * transaction
+                                */
+                               if (progress || throttle_delay || latency_limit ||
+                                       per_script_stats || use_log)
+                                       processXactStats(thread, st, &now, false, agg);
+                               else
+                                       thread->stats.cnt++;
+
+                               if (is_connect)
+                               {
+                                       PQfinish(st->con);
+                                       st->con = NULL;
+                                       INSTR_TIME_SET_ZERO(now);
+                               }
+
+                               ++st->cnt;
+                               if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
+                               {
+                                       /* exit success */
+                                       st->state = CSTATE_FINISHED;
+                                       break;
+                               }
+
+                               /*
+                                * No transaction is underway anymore.
+                                */
+                               st->state = CSTATE_CHOOSE_SCRIPT;
+
+                               /*
+                                * If we paced through all commands in the script in this
+                                * loop, without returning to the caller even once, do it now.
+                                * This gives the thread a chance to process other
+                                * connections, and to do progress reporting.  This can
+                                * currently only happen if the script consists entirely of
+                                * meta-commands.
+                                */
+                               if (end_tx_processed)
+                                       return;
+                               else
+                               {
+                                       end_tx_processed = true;
+                                       break;
+                               }
+
+                               /*
+                                * Final states.  Close the connection if it's still open.
+                                */
+                       case CSTATE_ABORTED:
+                       case CSTATE_FINISHED:
+                               if (st->con != NULL)
+                               {
+                                       PQfinish(st->con);
+                                       st->con = NULL;
+                               }
+                               return;
+               }
+       }
 }
 
 /*
@@ -4183,29 +4420,10 @@ threadRun(void *arg)
        initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
        last = aggs;
 
-       /* send start up queries in async manner */
+       /* initialize explicitely the state machines */
        for (i = 0; i < nstate; i++)
        {
-               CState     *st = &state[i];
-               int                     prev_ecnt = st->ecnt;
-               Command   **commands;
-
-               st->use_file = chooseScript(thread);
-               commands = sql_script[st->use_file].commands;
-               if (debug)
-                       fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
-                                       sql_script[st->use_file].desc);
-               if (!doCustom(thread, st, &aggs))
-                       remains--;                      /* I've aborted */
-
-               if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
-               {
-                       fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
-                                       i, st->state);
-                       remains--;                      /* I've aborted */
-                       PQfinish(st->con);
-                       st->con = NULL;
-               }
+               state[i].state = CSTATE_CHOOSE_SCRIPT;
        }
 
        while (remains > 0)
@@ -4222,59 +4440,60 @@ threadRun(void *arg)
                for (i = 0; i < nstate; i++)
                {
                        CState     *st = &state[i];
-                       Command   **commands = sql_script[st->use_file].commands;
                        int                     sock;
 
-                       if (st->con == NULL)
+                       if (st->state == CSTATE_THROTTLE && timer_exceeded)
                        {
+                               /* interrupt client which has not started a transaction */
+                               st->state = CSTATE_FINISHED;
+                               remains--;
+                               PQfinish(st->con);
+                               st->con = NULL;
                                continue;
                        }
-                       else if (st->sleeping)
+                       else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
                        {
-                               if (st->throttling && timer_exceeded)
-                               {
-                                       /* interrupt client which has not started a transaction */
-                                       remains--;
-                                       st->sleeping = false;
-                                       st->throttling = false;
-                                       PQfinish(st->con);
-                                       st->con = NULL;
-                                       continue;
-                               }
-                               else    /* just a nap from the script */
-                               {
-                                       int                     this_usec;
+                               /* a nap from the script, or under throttling */
+                               int                     this_usec;
 
-                                       if (min_usec == PG_INT64_MAX)
-                                       {
-                                               instr_time      now;
-
-                                               INSTR_TIME_SET_CURRENT(now);
-                                               now_usec = INSTR_TIME_GET_MICROSEC(now);
-                                       }
+                               if (min_usec == PG_INT64_MAX)
+                               {
+                                       instr_time      now;
 
-                                       this_usec = st->txn_scheduled - now_usec;
-                                       if (min_usec > this_usec)
-                                               min_usec = this_usec;
+                                       INSTR_TIME_SET_CURRENT(now);
+                                       now_usec = INSTR_TIME_GET_MICROSEC(now);
                                }
+
+                               this_usec = (st->state == CSTATE_SLEEP ?
+                                                        st->sleep_until : st->txn_scheduled) - now_usec;
+                               if (min_usec > this_usec)
+                                       min_usec = this_usec;
                        }
-                       else if (commands[st->state]->type == META_COMMAND)
+                       else if (st->state == CSTATE_WAIT_RESULT)
                        {
-                               min_usec = 0;   /* the connection is ready to run */
+                               /*
+                                * waiting for result from server - nothing to do unless the
+                                * socket is readable
+                                */
+                               sock = PQsocket(st->con);
+                               if (sock < 0)
+                               {
+                                       fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
+                                       goto done;
+                               }
+
+                               FD_SET(sock, &input_mask);
+
+                               if (maxsock < sock)
+                                       maxsock = sock;
                                break;
                        }
-
-                       sock = PQsocket(st->con);
-                       if (sock < 0)
+                       else if (st->state != CSTATE_ABORTED && st->state != CSTATE_FINISHED)
                        {
-                               fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
-                               goto done;
+                               /* the connection is ready to run */
+                               min_usec = 0;
+                               break;
                        }
-
-                       FD_SET(sock, &input_mask);
-
-                       if (maxsock < sock)
-                               maxsock = sock;
                }
 
                /* also wake up to print the next progress report on time */
@@ -4324,14 +4543,13 @@ threadRun(void *arg)
                        }
                }
 
-               /* ok, backend returns reply */
+               /* ok, advance the state machine of each connection */
                for (i = 0; i < nstate; i++)
                {
                        CState     *st = &state[i];
-                       Command   **commands = sql_script[st->use_file].commands;
-                       int                     prev_ecnt = st->ecnt;
+                       bool            ready;
 
-                       if (st->con)
+                       if (st->state == CSTATE_WAIT_RESULT && st->con)
                        {
                                int                     sock = PQsocket(st->con);
 
@@ -4341,21 +4559,19 @@ threadRun(void *arg)
                                                        PQerrorMessage(st->con));
                                        goto done;
                                }
-                               if (FD_ISSET(sock, &input_mask) ||
-                                       commands[st->state]->type == META_COMMAND)
-                               {
-                                       if (!doCustom(thread, st, &aggs))
-                                               remains--;              /* I've aborted */
-                               }
+
+                               ready = FD_ISSET(sock, &input_mask);
                        }
+                       else if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+                               ready = false;
+                       else
+                               ready = true;
 
-                       if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
+                       if (ready)
                        {
-                               fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
-                                               i, st->state);
-                               remains--;              /* I've aborted */
-                               PQfinish(st->con);
-                               st->con = NULL;
+                               doCustom(thread, st, &aggs);
+                               if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+                                       remains--;
                        }
                }