From 9779bda86c026e645773a3308f9169c7c0791f7c Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Mon, 26 Sep 2016 20:23:50 -0400 Subject: [PATCH] Fix newly-introduced issues in pgbench. The result of FD_ISSET() doesn't necessarily fit in a bool, though assigning it to one might accidentally work depending on platform and which socket FD number is being inquired of. Rewrite to test it with if(), rather than making any specific assumption about the result width, to match the way every other such call in PG is written. Don't break out of the input_mask-filling loop after finding the first client that we're waiting for results from. That mostly breaks parallel query management. Also, if we choose not to call select(), be sure to clear out any bits the mask-filling loop might have set, so that we don't accidentally call doCustom for clients we don't know have input. Doing so would likely be harmless, but it's a waste of cycles and doesn't seem to be intended. Make this_usec wide enough. (Yeah, the value would usually fit in an int, but then why are we using int64 everywhere else?) Minor cosmetic adjustments, mostly comment improvements. Problems introduced by commit 12788ae49. The first issue was discovered by buildfarm testing, the others by code review. --- src/bin/pgbench/pgbench.c | 82 +++++++++++++++++++++++---------------- 1 file changed, 49 insertions(+), 33 deletions(-) diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 1fb4ae46d5..d44cfdab49 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -299,7 +299,7 @@ typedef enum */ CSTATE_ABORTED, CSTATE_FINISHED -} ConnectionStateEnum; +} ConnectionStateEnum; /* * Connection state. @@ -4420,43 +4420,43 @@ threadRun(void *arg) initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time)); last = aggs; - /* initialize explicitely the state machines */ + /* explicitly initialize the state machines */ for (i = 0; i < nstate; i++) { state[i].state = CSTATE_CHOOSE_SCRIPT; } + /* loop till all clients have terminated */ while (remains > 0) { fd_set input_mask; - int maxsock; /* max socket number to be waited */ - int64 now_usec = 0; + int maxsock; /* max socket number to be waited for */ int64 min_usec; + int64 now_usec = 0; /* set this only if needed */ + /* identify which client sockets should be checked for input */ FD_ZERO(&input_mask); - maxsock = -1; min_usec = PG_INT64_MAX; for (i = 0; i < nstate; i++) { CState *st = &state[i]; - int sock; if (st->state == CSTATE_THROTTLE && timer_exceeded) { - /* interrupt client which has not started a transaction */ + /* interrupt client that has not started a transaction */ st->state = CSTATE_FINISHED; - remains--; PQfinish(st->con); st->con = NULL; - continue; + remains--; } else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE) { /* a nap from the script, or under throttling */ - int this_usec; + int64 this_usec; - if (min_usec == PG_INT64_MAX) + /* get current time if needed */ + if (now_usec == 0) { instr_time now; @@ -4464,6 +4464,7 @@ threadRun(void *arg) now_usec = INSTR_TIME_GET_MICROSEC(now); } + /* min_usec should be the minimum delay across all clients */ this_usec = (st->state == CSTATE_SLEEP ? st->sleep_until : st->txn_scheduled) - now_usec; if (min_usec > this_usec) @@ -4475,22 +4476,26 @@ threadRun(void *arg) * waiting for result from server - nothing to do unless the * socket is readable */ - sock = PQsocket(st->con); + int sock = PQsocket(st->con); + if (sock < 0) { - fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con)); + fprintf(stderr, "invalid socket: %s", + PQerrorMessage(st->con)); goto done; } FD_SET(sock, &input_mask); - if (maxsock < sock) maxsock = sock; - break; } - else if (st->state != CSTATE_ABORTED && st->state != CSTATE_FINISHED) + else if (st->state != CSTATE_ABORTED && + st->state != CSTATE_FINISHED) { - /* the connection is ready to run */ + /* + * This client thread is ready to do something, so we don't + * want to wait. No need to examine additional clients. + */ min_usec = 0; break; } @@ -4515,9 +4520,10 @@ threadRun(void *arg) } /* - * Sleep until we receive data from the server, or a nap-time - * specified in the script ends, or it's time to print a progress - * report. + * If no clients are ready to execute actions, sleep until we receive + * data from the server, or a nap-time specified in the script ends, + * or it's time to print a progress report. Update input_mask to show + * which client(s) received data. */ if (min_usec > 0 && maxsock != -1) { @@ -4536,21 +4542,29 @@ threadRun(void *arg) if (nsocks < 0) { if (errno == EINTR) + { + /* On EINTR, go back to top of loop */ continue; + } /* must be something wrong */ fprintf(stderr, "select() failed: %s\n", strerror(errno)); goto done; } } + else + { + /* If we didn't call select(), don't try to read any data */ + FD_ZERO(&input_mask); + } /* ok, advance the state machine of each connection */ for (i = 0; i < nstate; i++) { CState *st = &state[i]; - bool ready; - if (st->state == CSTATE_WAIT_RESULT && st->con) + if (st->state == CSTATE_WAIT_RESULT) { + /* don't call doCustom unless data is available */ int sock = PQsocket(st->con); if (sock < 0) @@ -4560,22 +4574,24 @@ threadRun(void *arg) goto done; } - ready = FD_ISSET(sock, &input_mask); + if (!FD_ISSET(sock, &input_mask)) + continue; } - else if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED) - ready = false; - else - ready = true; - - if (ready) + else if (st->state == CSTATE_FINISHED || + st->state == CSTATE_ABORTED) { - doCustom(thread, st, &aggs); - if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED) - remains--; + /* this client is done, no need to consider it anymore */ + continue; } + + doCustom(thread, st, &aggs); + + /* If doCustom changed client to finished state, reduce remains */ + if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED) + remains--; } - /* progress report by thread 0 for all threads */ + /* progress report is made by thread 0 for all threads */ if (progress && thread->tid == 0) { instr_time now_time; -- 2.40.0