]> granicus.if.org Git - postgresql/commitdiff
Improve test coverage for LISTEN/NOTIFY.
authorTom Lane <tgl@sss.pgh.pa.us>
Sun, 28 Jul 2019 16:02:27 +0000 (12:02 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Sun, 28 Jul 2019 16:02:27 +0000 (12:02 -0400)
We had no actual end-to-end test of NOTIFY message delivery.  In the
core async.sql regression test, testing this is problematic because psql
traditionally prints the PID of the sending backend, making the output
unstable.  We also have an isolation test script, but it likewise
failed to prove that delivery worked, because isolationtester.c had
no provisions for detecting/reporting NOTIFY messages.

Hence, add such provisions to isolationtester.c, and extend
async-notify.spec to include direct tests of basic NOTIFY functionality.

I also added tests showing that NOTIFY de-duplicates messages normally,
but not across subtransaction boundaries.  (That's the historical
behavior since we introduced subtransactions, though perhaps we ought
to change it.)

Patch by me, with suggestions/review by Andres Freund.

Discussion: https://postgr.es/m/31304.1564246011@sss.pgh.pa.us

src/test/isolation/expected/async-notify.out
src/test/isolation/isolationtester.c
src/test/isolation/specs/async-notify.spec

index 92d281a7d1f3cd6192120c15dd777af39610b7a1..60ba50658dd277d19cb252acac0e12812373836b 100644 (file)
 Parsed test spec with 2 sessions
 
-starting permutation: listen begin check notify check
-step listen: LISTEN a;
-step begin: BEGIN;
-step check: SELECT pg_notification_queue_usage() > 0 AS nonzero;
+starting permutation: listenc notify1 notify2 notify3 notifyf
+step listenc: LISTEN c1; LISTEN c2;
+step notify1: NOTIFY c1;
+notifier: NOTIFY "c1" with payload "" from notifier
+step notify2: NOTIFY c2, 'payload';
+notifier: NOTIFY "c2" with payload "payload" from notifier
+step notify3: NOTIFY c3, 'payload3';
+step notifyf: SELECT pg_notify('c2', NULL);
+pg_notify      
+
+               
+notifier: NOTIFY "c2" with payload "" from notifier
+
+starting permutation: listenc notifyd1 notifyd2 notifys1
+step listenc: LISTEN c1; LISTEN c2;
+step notifyd1: NOTIFY c2, 'payload'; NOTIFY c1; NOTIFY "c2", 'payload';
+notifier: NOTIFY "c2" with payload "payload" from notifier
+notifier: NOTIFY "c1" with payload "" from notifier
+step notifyd2: NOTIFY c1; NOTIFY c1; NOTIFY c1, 'p1'; NOTIFY c1, 'p2';
+notifier: NOTIFY "c1" with payload "" from notifier
+notifier: NOTIFY "c1" with payload "p1" from notifier
+notifier: NOTIFY "c1" with payload "p2" from notifier
+step notifys1: 
+       BEGIN;
+       NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
+       NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
+       SAVEPOINT s1;
+       NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
+       NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads';
+       NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
+       NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads';
+       RELEASE SAVEPOINT s1;
+       SAVEPOINT s2;
+       NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload';
+       NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads';
+       NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload';
+       NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads';
+       ROLLBACK TO SAVEPOINT s2;
+       COMMIT;
+
+notifier: NOTIFY "c1" with payload "payload" from notifier
+notifier: NOTIFY "c2" with payload "payload" from notifier
+notifier: NOTIFY "c1" with payload "payload" from notifier
+notifier: NOTIFY "c2" with payload "payload" from notifier
+notifier: NOTIFY "c1" with payload "payloads" from notifier
+notifier: NOTIFY "c2" with payload "payloads" from notifier
+
+starting permutation: llisten notify1 notify2 notify3 notifyf lcheck
+step llisten: LISTEN c1; LISTEN c2;
+step notify1: NOTIFY c1;
+step notify2: NOTIFY c2, 'payload';
+step notify3: NOTIFY c3, 'payload3';
+step notifyf: SELECT pg_notify('c2', NULL);
+pg_notify      
+
+               
+step lcheck: SELECT 1 AS x;
+x              
+
+1              
+listener: NOTIFY "c1" with payload "" from notifier
+listener: NOTIFY "c2" with payload "payload" from notifier
+listener: NOTIFY "c2" with payload "" from notifier
+
+starting permutation: listenc llisten notify1 notify2 notify3 notifyf lcheck
+step listenc: LISTEN c1; LISTEN c2;
+step llisten: LISTEN c1; LISTEN c2;
+step notify1: NOTIFY c1;
+notifier: NOTIFY "c1" with payload "" from notifier
+step notify2: NOTIFY c2, 'payload';
+notifier: NOTIFY "c2" with payload "payload" from notifier
+step notify3: NOTIFY c3, 'payload3';
+step notifyf: SELECT pg_notify('c2', NULL);
+pg_notify      
+
+               
+notifier: NOTIFY "c2" with payload "" from notifier
+step lcheck: SELECT 1 AS x;
+x              
+
+1              
+listener: NOTIFY "c1" with payload "" from notifier
+listener: NOTIFY "c2" with payload "payload" from notifier
+listener: NOTIFY "c2" with payload "" from notifier
+
+starting permutation: llisten lbegin usage bignotify usage
+step llisten: LISTEN c1; LISTEN c2;
+step lbegin: BEGIN;
+step usage: SELECT pg_notification_queue_usage() > 0 AS nonzero;
 nonzero        
 
 f              
-step notify: SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s;
+step bignotify: SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s;
 count          
 
 1000           
-step check: SELECT pg_notification_queue_usage() > 0 AS nonzero;
+step usage: SELECT pg_notification_queue_usage() > 0 AS nonzero;
 nonzero        
 
 t              
index 3ed055106b4e4da88df6c4a7d563b2cf733fa2f9..f98bb1cf64b27c4f4a889a4c72924f36b7d73e68 100644 (file)
 
 /*
  * conns[0] is the global setup, teardown, and watchdog connection.  Additional
- * connections represent spec-defined sessions.
+ * connections represent spec-defined sessions.  We also track the backend
+ * PID, in numeric and string formats, for each connection.
  */
 static PGconn **conns = NULL;
-static const char **backend_pids = NULL;
+static int *backend_pids = NULL;
+static const char **backend_pid_strs = NULL;
 static int     nconns = 0;
 
 /* In dry run only output permutations to be run by the tester. */
@@ -41,7 +43,7 @@ static void run_permutation(TestSpec *testspec, int nsteps, Step **steps);
 
 #define STEP_NONBLOCK  0x1             /* return 0 as soon as cmd waits for a lock */
 #define STEP_RETRY             0x2             /* this is a retry of a previously-waiting cmd */
-static bool try_complete_step(Step *step, int flags);
+static bool try_complete_step(TestSpec *testspec, Step *step, int flags);
 
 static int     step_qsort_cmp(const void *a, const void *b);
 static int     step_bsearch_cmp(const void *a, const void *b);
@@ -159,9 +161,11 @@ main(int argc, char **argv)
         * extra for lock wait detection and global work.
         */
        nconns = 1 + testspec->nsessions;
-       conns = calloc(nconns, sizeof(PGconn *));
+       conns = (PGconn **) pg_malloc0(nconns * sizeof(PGconn *));
+       backend_pids = pg_malloc0(nconns * sizeof(*backend_pids));
+       backend_pid_strs = pg_malloc0(nconns * sizeof(*backend_pid_strs));
        atexit(disconnect_atexit);
-       backend_pids = calloc(nconns, sizeof(*backend_pids));
+
        for (i = 0; i < nconns; i++)
        {
                conns[i] = PQconnectdb(conninfo);
@@ -187,26 +191,9 @@ main(int argc, char **argv)
                                                                 blackholeNoticeProcessor,
                                                                 NULL);
 
-               /* Get the backend pid for lock wait checking. */
-               res = PQexec(conns[i], "SELECT pg_catalog.pg_backend_pid()");
-               if (PQresultStatus(res) == PGRES_TUPLES_OK)
-               {
-                       if (PQntuples(res) == 1 && PQnfields(res) == 1)
-                               backend_pids[i] = pg_strdup(PQgetvalue(res, 0, 0));
-                       else
-                       {
-                               fprintf(stderr, "backend pid query returned %d rows and %d columns, expected 1 row and 1 column",
-                                               PQntuples(res), PQnfields(res));
-                               exit(1);
-                       }
-               }
-               else
-               {
-                       fprintf(stderr, "backend pid query failed: %s",
-                                       PQerrorMessage(conns[i]));
-                       exit(1);
-               }
-               PQclear(res);
+               /* Save each connection's backend PID for subsequent use. */
+               backend_pids[i] = PQbackendPID(conns[i]);
+               backend_pid_strs[i] = psprintf("%d", backend_pids[i]);
        }
 
        /* Set the session index fields in steps. */
@@ -231,9 +218,9 @@ main(int argc, char **argv)
        appendPQExpBufferStr(&wait_query,
                                                 "SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{");
        /* The spec syntax requires at least one session; assume that here. */
-       appendPQExpBufferStr(&wait_query, backend_pids[1]);
+       appendPQExpBufferStr(&wait_query, backend_pid_strs[1]);
        for (i = 2; i < nconns; i++)
-               appendPQExpBuffer(&wait_query, ",%s", backend_pids[i]);
+               appendPQExpBuffer(&wait_query, ",%s", backend_pid_strs[i]);
        appendPQExpBufferStr(&wait_query, "}')");
 
        res = PQprepare(conns[0], PREP_WAITING, wait_query.data, 0, NULL);
@@ -549,7 +536,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
                                oldstep = waiting[w];
 
                                /* Wait for previous step on this connection. */
-                               try_complete_step(oldstep, STEP_RETRY);
+                               try_complete_step(testspec, oldstep, STEP_RETRY);
 
                                /* Remove that step from the waiting[] array. */
                                if (w + 1 < nwaiting)
@@ -571,7 +558,8 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
                        nerrorstep = 0;
                        while (w < nwaiting)
                        {
-                               if (try_complete_step(waiting[w], STEP_NONBLOCK | STEP_RETRY))
+                               if (try_complete_step(testspec, waiting[w],
+                                                                         STEP_NONBLOCK | STEP_RETRY))
                                {
                                        /* Still blocked on a lock, leave it alone. */
                                        w++;
@@ -600,14 +588,15 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
                }
 
                /* Try to complete this step without blocking.  */
-               mustwait = try_complete_step(step, STEP_NONBLOCK);
+               mustwait = try_complete_step(testspec, step, STEP_NONBLOCK);
 
                /* Check for completion of any steps that were previously waiting. */
                w = 0;
                nerrorstep = 0;
                while (w < nwaiting)
                {
-                       if (try_complete_step(waiting[w], STEP_NONBLOCK | STEP_RETRY))
+                       if (try_complete_step(testspec, waiting[w],
+                                                                 STEP_NONBLOCK | STEP_RETRY))
                                w++;
                        else
                        {
@@ -630,7 +619,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
        /* Wait for any remaining queries. */
        for (w = 0; w < nwaiting; ++w)
        {
-               try_complete_step(waiting[w], STEP_RETRY);
+               try_complete_step(testspec, waiting[w], STEP_RETRY);
                report_error_message(waiting[w]);
        }
 
@@ -693,7 +682,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
  * a lock, returns true.  Otherwise, returns false.
  */
 static bool
-try_complete_step(Step *step, int flags)
+try_complete_step(TestSpec *testspec, Step *step, int flags)
 {
        PGconn     *conn = conns[1 + step->session];
        fd_set          read_set;
@@ -702,6 +691,7 @@ try_complete_step(Step *step, int flags)
        int                     sock = PQsocket(conn);
        int                     ret;
        PGresult   *res;
+       PGnotify   *notify;
        bool            canceled = false;
 
        if (sock < 0)
@@ -738,7 +728,7 @@ try_complete_step(Step *step, int flags)
                                bool            waiting;
 
                                res = PQexecPrepared(conns[0], PREP_WAITING, 1,
-                                                                        &backend_pids[step->session + 1],
+                                                                        &backend_pid_strs[step->session + 1],
                                                                         NULL, NULL, 0);
                                if (PQresultStatus(res) != PGRES_TUPLES_OK ||
                                        PQntuples(res) != 1)
@@ -880,6 +870,35 @@ try_complete_step(Step *step, int flags)
                PQclear(res);
        }
 
+       /* Report any available NOTIFY messages, too */
+       PQconsumeInput(conn);
+       while ((notify = PQnotifies(conn)) != NULL)
+       {
+               /* Try to identify which session it came from */
+               const char *sendername = NULL;
+               char            pidstring[32];
+
+               for (int i = 0; i < testspec->nsessions; i++)
+               {
+                       if (notify->be_pid == backend_pids[i + 1])
+                       {
+                               sendername = testspec->sessions[i]->name;
+                               break;
+                       }
+               }
+               if (sendername == NULL)
+               {
+                       /* Doesn't seem to be any test session, so show the hard way */
+                       snprintf(pidstring, sizeof(pidstring), "PID %d", notify->be_pid);
+                       sendername = pidstring;
+               }
+               printf("%s: NOTIFY \"%s\" with payload \"%s\" from %s\n",
+                          testspec->sessions[step->session]->name,
+                          notify->relname, notify->extra, sendername);
+               PQfreemem(notify);
+               PQconsumeInput(conn);
+       }
+
        return false;
 }
 
index 8adad42c7c846b704443d5657081df14af30074d..daf7bef903c0df32a29aea05304e9fec61359a54 100644 (file)
@@ -1,14 +1,70 @@
-# Verify that pg_notification_queue_usage correctly reports a non-zero result,
-# after submitting notifications while another connection is listening for
-# those notifications and waiting inside an active transaction.
+# Tests for LISTEN/NOTIFY
 
-session "listener"
-step "listen"  { LISTEN a; }
-step "begin"   { BEGIN; }
-teardown               { ROLLBACK; UNLISTEN *; }
+# Most of these tests use only the "notifier" session and hence exercise only
+# self-notifies, which are convenient because they minimize timing concerns.
+# Note we assume that each step is delivered to the backend as a single Query
+# message so it will run as one transaction.
 
 session "notifier"
-step "check"   { SELECT pg_notification_queue_usage() > 0 AS nonzero; }
-step "notify"  { SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s; }
+step "listenc" { LISTEN c1; LISTEN c2; }
+step "notify1" { NOTIFY c1; }
+step "notify2" { NOTIFY c2, 'payload'; }
+step "notify3" { NOTIFY c3, 'payload3'; }  # not listening to c3
+step "notifyf" { SELECT pg_notify('c2', NULL); }
+step "notifyd1"        { NOTIFY c2, 'payload'; NOTIFY c1; NOTIFY "c2", 'payload'; }
+step "notifyd2"        { NOTIFY c1; NOTIFY c1; NOTIFY c1, 'p1'; NOTIFY c1, 'p2'; }
+step "notifys1"        {
+       BEGIN;
+       NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
+       NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
+       SAVEPOINT s1;
+       NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
+       NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads';
+       NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
+       NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads';
+       RELEASE SAVEPOINT s1;
+       SAVEPOINT s2;
+       NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload';
+       NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads';
+       NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload';
+       NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads';
+       ROLLBACK TO SAVEPOINT s2;
+       COMMIT;
+}
+step "usage"   { SELECT pg_notification_queue_usage() > 0 AS nonzero; }
+step "bignotify"       { SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; }
+teardown               { UNLISTEN *; }
+
+# The listener session is used for cross-backend notify checks.
+
+session "listener"
+step "llisten" { LISTEN c1; LISTEN c2; }
+step "lcheck"  { SELECT 1 AS x; }
+step "lbegin"  { BEGIN; }
+teardown               { UNLISTEN *; }
+
+
+# Trivial cases.
+permutation "listenc" "notify1" "notify2" "notify3" "notifyf"
+
+# Check simple and less-simple deduplication.
+permutation "listenc" "notifyd1" "notifyd2" "notifys1"
+
+# Cross-backend notification delivery.  We use a "select 1" to force the
+# listener session to check for notifies.  In principle we could just wait
+# for delivery, but that would require extra support in isolationtester
+# and might have portability-of-timing issues.
+permutation "llisten" "notify1" "notify2" "notify3" "notifyf" "lcheck"
+
+# Again, with local delivery too.
+permutation "listenc" "llisten" "notify1" "notify2" "notify3" "notifyf" "lcheck"
+
+# Verify that pg_notification_queue_usage correctly reports a non-zero result,
+# after submitting notifications while another connection is listening for
+# those notifications and waiting inside an active transaction.  We have to
+# fill a page of the notify SLRU to make this happen, which is a good deal
+# of traffic.  To not bloat the expected output, we intentionally don't
+# commit the listener's transaction, so that it never reports these events.
+# Hence, this should be the last test in this script.
 
-permutation "listen" "begin" "check" "notify" "check"
+permutation "llisten" "lbegin" "usage" "bignotify" "usage"