From: Tom Lane Date: Sun, 28 Jul 2019 16:02:27 +0000 (-0400) Subject: Improve test coverage for LISTEN/NOTIFY. X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=b10f40bf0e4516d7832db8ccbe5f76319ad08682;p=postgresql Improve test coverage for LISTEN/NOTIFY. We had no actual end-to-end test of NOTIFY message delivery. In the core async.sql regression test, testing this is problematic because psql traditionally prints the PID of the sending backend, making the output unstable. We also have an isolation test script, but it likewise failed to prove that delivery worked, because isolationtester.c had no provisions for detecting/reporting NOTIFY messages. Hence, add such provisions to isolationtester.c, and extend async-notify.spec to include direct tests of basic NOTIFY functionality. I also added tests showing that NOTIFY de-duplicates messages normally, but not across subtransaction boundaries. (That's the historical behavior since we introduced subtransactions, though perhaps we ought to change it.) Patch by me, with suggestions/review by Andres Freund. Discussion: https://postgr.es/m/31304.1564246011@sss.pgh.pa.us --- diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out index 92d281a7d1..60ba50658d 100644 --- a/src/test/isolation/expected/async-notify.out +++ b/src/test/isolation/expected/async-notify.out @@ -1,17 +1,102 @@ Parsed test spec with 2 sessions -starting permutation: listen begin check notify check -step listen: LISTEN a; -step begin: BEGIN; -step check: SELECT pg_notification_queue_usage() > 0 AS nonzero; +starting permutation: listenc notify1 notify2 notify3 notifyf +step listenc: LISTEN c1; LISTEN c2; +step notify1: NOTIFY c1; +notifier: NOTIFY "c1" with payload "" from notifier +step notify2: NOTIFY c2, 'payload'; +notifier: NOTIFY "c2" with payload "payload" from notifier +step notify3: NOTIFY c3, 'payload3'; +step notifyf: SELECT pg_notify('c2', NULL); +pg_notify + + +notifier: NOTIFY "c2" with payload "" from notifier + +starting permutation: listenc notifyd1 notifyd2 notifys1 +step listenc: LISTEN c1; LISTEN c2; +step notifyd1: NOTIFY c2, 'payload'; NOTIFY c1; NOTIFY "c2", 'payload'; +notifier: NOTIFY "c2" with payload "payload" from notifier +notifier: NOTIFY "c1" with payload "" from notifier +step notifyd2: NOTIFY c1; NOTIFY c1; NOTIFY c1, 'p1'; NOTIFY c1, 'p2'; +notifier: NOTIFY "c1" with payload "" from notifier +notifier: NOTIFY "c1" with payload "p1" from notifier +notifier: NOTIFY "c1" with payload "p2" from notifier +step notifys1: + BEGIN; + NOTIFY c1, 'payload'; NOTIFY "c2", 'payload'; + NOTIFY c1, 'payload'; NOTIFY "c2", 'payload'; + SAVEPOINT s1; + NOTIFY c1, 'payload'; NOTIFY "c2", 'payload'; + NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads'; + NOTIFY c1, 'payload'; NOTIFY "c2", 'payload'; + NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads'; + RELEASE SAVEPOINT s1; + SAVEPOINT s2; + NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload'; + NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads'; + NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload'; + NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads'; + ROLLBACK TO SAVEPOINT s2; + COMMIT; + +notifier: NOTIFY "c1" with payload "payload" from notifier +notifier: NOTIFY "c2" with payload "payload" from notifier +notifier: NOTIFY "c1" with payload "payload" from notifier +notifier: NOTIFY "c2" with payload "payload" from notifier +notifier: NOTIFY "c1" with payload "payloads" from notifier +notifier: NOTIFY "c2" with payload "payloads" from notifier + +starting permutation: llisten notify1 notify2 notify3 notifyf lcheck +step llisten: LISTEN c1; LISTEN c2; +step notify1: NOTIFY c1; +step notify2: NOTIFY c2, 'payload'; +step notify3: NOTIFY c3, 'payload3'; +step notifyf: SELECT pg_notify('c2', NULL); +pg_notify + + +step lcheck: SELECT 1 AS x; +x + +1 +listener: NOTIFY "c1" with payload "" from notifier +listener: NOTIFY "c2" with payload "payload" from notifier +listener: NOTIFY "c2" with payload "" from notifier + +starting permutation: listenc llisten notify1 notify2 notify3 notifyf lcheck +step listenc: LISTEN c1; LISTEN c2; +step llisten: LISTEN c1; LISTEN c2; +step notify1: NOTIFY c1; +notifier: NOTIFY "c1" with payload "" from notifier +step notify2: NOTIFY c2, 'payload'; +notifier: NOTIFY "c2" with payload "payload" from notifier +step notify3: NOTIFY c3, 'payload3'; +step notifyf: SELECT pg_notify('c2', NULL); +pg_notify + + +notifier: NOTIFY "c2" with payload "" from notifier +step lcheck: SELECT 1 AS x; +x + +1 +listener: NOTIFY "c1" with payload "" from notifier +listener: NOTIFY "c2" with payload "payload" from notifier +listener: NOTIFY "c2" with payload "" from notifier + +starting permutation: llisten lbegin usage bignotify usage +step llisten: LISTEN c1; LISTEN c2; +step lbegin: BEGIN; +step usage: SELECT pg_notification_queue_usage() > 0 AS nonzero; nonzero f -step notify: SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s; +step bignotify: SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; count 1000 -step check: SELECT pg_notification_queue_usage() > 0 AS nonzero; +step usage: SELECT pg_notification_queue_usage() > 0 AS nonzero; nonzero t diff --git a/src/test/isolation/isolationtester.c b/src/test/isolation/isolationtester.c index 3ed055106b..f98bb1cf64 100644 --- a/src/test/isolation/isolationtester.c +++ b/src/test/isolation/isolationtester.c @@ -23,10 +23,12 @@ /* * conns[0] is the global setup, teardown, and watchdog connection. Additional - * connections represent spec-defined sessions. + * connections represent spec-defined sessions. We also track the backend + * PID, in numeric and string formats, for each connection. */ static PGconn **conns = NULL; -static const char **backend_pids = NULL; +static int *backend_pids = NULL; +static const char **backend_pid_strs = NULL; static int nconns = 0; /* In dry run only output permutations to be run by the tester. */ @@ -41,7 +43,7 @@ static void run_permutation(TestSpec *testspec, int nsteps, Step **steps); #define STEP_NONBLOCK 0x1 /* return 0 as soon as cmd waits for a lock */ #define STEP_RETRY 0x2 /* this is a retry of a previously-waiting cmd */ -static bool try_complete_step(Step *step, int flags); +static bool try_complete_step(TestSpec *testspec, Step *step, int flags); static int step_qsort_cmp(const void *a, const void *b); static int step_bsearch_cmp(const void *a, const void *b); @@ -159,9 +161,11 @@ main(int argc, char **argv) * extra for lock wait detection and global work. */ nconns = 1 + testspec->nsessions; - conns = calloc(nconns, sizeof(PGconn *)); + conns = (PGconn **) pg_malloc0(nconns * sizeof(PGconn *)); + backend_pids = pg_malloc0(nconns * sizeof(*backend_pids)); + backend_pid_strs = pg_malloc0(nconns * sizeof(*backend_pid_strs)); atexit(disconnect_atexit); - backend_pids = calloc(nconns, sizeof(*backend_pids)); + for (i = 0; i < nconns; i++) { conns[i] = PQconnectdb(conninfo); @@ -187,26 +191,9 @@ main(int argc, char **argv) blackholeNoticeProcessor, NULL); - /* Get the backend pid for lock wait checking. */ - res = PQexec(conns[i], "SELECT pg_catalog.pg_backend_pid()"); - if (PQresultStatus(res) == PGRES_TUPLES_OK) - { - if (PQntuples(res) == 1 && PQnfields(res) == 1) - backend_pids[i] = pg_strdup(PQgetvalue(res, 0, 0)); - else - { - fprintf(stderr, "backend pid query returned %d rows and %d columns, expected 1 row and 1 column", - PQntuples(res), PQnfields(res)); - exit(1); - } - } - else - { - fprintf(stderr, "backend pid query failed: %s", - PQerrorMessage(conns[i])); - exit(1); - } - PQclear(res); + /* Save each connection's backend PID for subsequent use. */ + backend_pids[i] = PQbackendPID(conns[i]); + backend_pid_strs[i] = psprintf("%d", backend_pids[i]); } /* Set the session index fields in steps. */ @@ -231,9 +218,9 @@ main(int argc, char **argv) appendPQExpBufferStr(&wait_query, "SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{"); /* The spec syntax requires at least one session; assume that here. */ - appendPQExpBufferStr(&wait_query, backend_pids[1]); + appendPQExpBufferStr(&wait_query, backend_pid_strs[1]); for (i = 2; i < nconns; i++) - appendPQExpBuffer(&wait_query, ",%s", backend_pids[i]); + appendPQExpBuffer(&wait_query, ",%s", backend_pid_strs[i]); appendPQExpBufferStr(&wait_query, "}')"); res = PQprepare(conns[0], PREP_WAITING, wait_query.data, 0, NULL); @@ -549,7 +536,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) oldstep = waiting[w]; /* Wait for previous step on this connection. */ - try_complete_step(oldstep, STEP_RETRY); + try_complete_step(testspec, oldstep, STEP_RETRY); /* Remove that step from the waiting[] array. */ if (w + 1 < nwaiting) @@ -571,7 +558,8 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) nerrorstep = 0; while (w < nwaiting) { - if (try_complete_step(waiting[w], STEP_NONBLOCK | STEP_RETRY)) + if (try_complete_step(testspec, waiting[w], + STEP_NONBLOCK | STEP_RETRY)) { /* Still blocked on a lock, leave it alone. */ w++; @@ -600,14 +588,15 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) } /* Try to complete this step without blocking. */ - mustwait = try_complete_step(step, STEP_NONBLOCK); + mustwait = try_complete_step(testspec, step, STEP_NONBLOCK); /* Check for completion of any steps that were previously waiting. */ w = 0; nerrorstep = 0; while (w < nwaiting) { - if (try_complete_step(waiting[w], STEP_NONBLOCK | STEP_RETRY)) + if (try_complete_step(testspec, waiting[w], + STEP_NONBLOCK | STEP_RETRY)) w++; else { @@ -630,7 +619,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) /* Wait for any remaining queries. */ for (w = 0; w < nwaiting; ++w) { - try_complete_step(waiting[w], STEP_RETRY); + try_complete_step(testspec, waiting[w], STEP_RETRY); report_error_message(waiting[w]); } @@ -693,7 +682,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) * a lock, returns true. Otherwise, returns false. */ static bool -try_complete_step(Step *step, int flags) +try_complete_step(TestSpec *testspec, Step *step, int flags) { PGconn *conn = conns[1 + step->session]; fd_set read_set; @@ -702,6 +691,7 @@ try_complete_step(Step *step, int flags) int sock = PQsocket(conn); int ret; PGresult *res; + PGnotify *notify; bool canceled = false; if (sock < 0) @@ -738,7 +728,7 @@ try_complete_step(Step *step, int flags) bool waiting; res = PQexecPrepared(conns[0], PREP_WAITING, 1, - &backend_pids[step->session + 1], + &backend_pid_strs[step->session + 1], NULL, NULL, 0); if (PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) != 1) @@ -880,6 +870,35 @@ try_complete_step(Step *step, int flags) PQclear(res); } + /* Report any available NOTIFY messages, too */ + PQconsumeInput(conn); + while ((notify = PQnotifies(conn)) != NULL) + { + /* Try to identify which session it came from */ + const char *sendername = NULL; + char pidstring[32]; + + for (int i = 0; i < testspec->nsessions; i++) + { + if (notify->be_pid == backend_pids[i + 1]) + { + sendername = testspec->sessions[i]->name; + break; + } + } + if (sendername == NULL) + { + /* Doesn't seem to be any test session, so show the hard way */ + snprintf(pidstring, sizeof(pidstring), "PID %d", notify->be_pid); + sendername = pidstring; + } + printf("%s: NOTIFY \"%s\" with payload \"%s\" from %s\n", + testspec->sessions[step->session]->name, + notify->relname, notify->extra, sendername); + PQfreemem(notify); + PQconsumeInput(conn); + } + return false; } diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec index 8adad42c7c..daf7bef903 100644 --- a/src/test/isolation/specs/async-notify.spec +++ b/src/test/isolation/specs/async-notify.spec @@ -1,14 +1,70 @@ -# Verify that pg_notification_queue_usage correctly reports a non-zero result, -# after submitting notifications while another connection is listening for -# those notifications and waiting inside an active transaction. +# Tests for LISTEN/NOTIFY -session "listener" -step "listen" { LISTEN a; } -step "begin" { BEGIN; } -teardown { ROLLBACK; UNLISTEN *; } +# Most of these tests use only the "notifier" session and hence exercise only +# self-notifies, which are convenient because they minimize timing concerns. +# Note we assume that each step is delivered to the backend as a single Query +# message so it will run as one transaction. session "notifier" -step "check" { SELECT pg_notification_queue_usage() > 0 AS nonzero; } -step "notify" { SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s; } +step "listenc" { LISTEN c1; LISTEN c2; } +step "notify1" { NOTIFY c1; } +step "notify2" { NOTIFY c2, 'payload'; } +step "notify3" { NOTIFY c3, 'payload3'; } # not listening to c3 +step "notifyf" { SELECT pg_notify('c2', NULL); } +step "notifyd1" { NOTIFY c2, 'payload'; NOTIFY c1; NOTIFY "c2", 'payload'; } +step "notifyd2" { NOTIFY c1; NOTIFY c1; NOTIFY c1, 'p1'; NOTIFY c1, 'p2'; } +step "notifys1" { + BEGIN; + NOTIFY c1, 'payload'; NOTIFY "c2", 'payload'; + NOTIFY c1, 'payload'; NOTIFY "c2", 'payload'; + SAVEPOINT s1; + NOTIFY c1, 'payload'; NOTIFY "c2", 'payload'; + NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads'; + NOTIFY c1, 'payload'; NOTIFY "c2", 'payload'; + NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads'; + RELEASE SAVEPOINT s1; + SAVEPOINT s2; + NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload'; + NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads'; + NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload'; + NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads'; + ROLLBACK TO SAVEPOINT s2; + COMMIT; +} +step "usage" { SELECT pg_notification_queue_usage() > 0 AS nonzero; } +step "bignotify" { SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; } +teardown { UNLISTEN *; } + +# The listener session is used for cross-backend notify checks. + +session "listener" +step "llisten" { LISTEN c1; LISTEN c2; } +step "lcheck" { SELECT 1 AS x; } +step "lbegin" { BEGIN; } +teardown { UNLISTEN *; } + + +# Trivial cases. +permutation "listenc" "notify1" "notify2" "notify3" "notifyf" + +# Check simple and less-simple deduplication. +permutation "listenc" "notifyd1" "notifyd2" "notifys1" + +# Cross-backend notification delivery. We use a "select 1" to force the +# listener session to check for notifies. In principle we could just wait +# for delivery, but that would require extra support in isolationtester +# and might have portability-of-timing issues. +permutation "llisten" "notify1" "notify2" "notify3" "notifyf" "lcheck" + +# Again, with local delivery too. +permutation "listenc" "llisten" "notify1" "notify2" "notify3" "notifyf" "lcheck" + +# Verify that pg_notification_queue_usage correctly reports a non-zero result, +# after submitting notifications while another connection is listening for +# those notifications and waiting inside an active transaction. We have to +# fill a page of the notify SLRU to make this happen, which is a good deal +# of traffic. To not bloat the expected output, we intentionally don't +# commit the listener's transaction, so that it never reports these events. +# Hence, this should be the last test in this script. -permutation "listen" "begin" "check" "notify" "check" +permutation "llisten" "lbegin" "usage" "bignotify" "usage"