]> granicus.if.org Git - postgresql/commitdiff
Use a hash table to de-duplicate NOTIFY events faster.
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 15 Aug 2019 16:22:12 +0000 (12:22 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 15 Aug 2019 16:22:12 +0000 (12:22 -0400)
Previously, async.c got rid of duplicate notifications by scanning
the list of pending events to compare each one to the proposed new
event.  This works okay for very small numbers of distinct events,
but degrades as O(N^2) for many events.  We can improve matters by
using a hash table to probe for duplicates.  So as not to add a
lot of overhead for the simple cases that the code did handle well
before, create the hash table only once a (sub)transaction has
queued more than 16 distinct notify events.

A downside is that we now have to do per-event work to propagate
a successful subtransaction's notify events up to its parent.
(But this isn't significant unless the subtransaction had many
events, in which case the O(N^2) behavior would have been in
play already, so we still come out ahead.)

We can make some lemonade out of this lemon, though: since we must
examine each event anyway, it's now possible to de-duplicate events
fully, rather than skipping that for events merged up from
subtransactions.  Hence, remove the old weasel wording in notify.sgml
about whether de-duplication happens or not, and adjust the test
case in async-notify.spec that exhibited the old behavior.

While at it, rearrange the definition of struct Notification to make
it more compact and require just one palloc per event, rather than
two or three.  This saves space when there are a lot of events,
in fact more than enough to buy back the space needed for the hash
table.

Patch by me, based on discussions around a different patch
submitted by Filip RembiaƂkowski.

Discussion: https://postgr.es/m/17822.1564186806@sss.pgh.pa.us

doc/src/sgml/ref/notify.sgml
src/backend/commands/async.c
src/test/isolation/expected/async-notify.out

index e0e125a2a2db11b9cbc9e6e4f7022fb2f5d52f1a..d7dcbea02d45ec467bc9201f8a0a03c6015d7023 100644 (file)
@@ -94,9 +94,9 @@ NOTIFY <replaceable class="parameter">channel</replaceable> [ , <replaceable cla
   </para>
 
   <para>
-   If the same channel name is signaled multiple times from the same
-   transaction with identical payload strings, the
-   database server can decide to deliver a single notification only.
+   If the same channel name is signaled multiple times with identical
+   payload strings within the same transaction, only one instance of the
+   notification event is delivered to listeners.
    On the other hand, notifications with distinct payload strings will
    always be delivered as distinct notifications. Similarly, notifications from
    different transactions will never get folded into one notification.
index 6e9c580ec6d04029ddf2b0e40f2caca3f4b59815..6cb2d445f0dfab8ad8677ac4ec7c4a5ca2a88ddf 100644 (file)
 #include "storage/sinval.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
+#include "utils/hashutils.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/snapmgr.h"
@@ -323,14 +324,25 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */
 
 /*
  * State for outbound notifies consists of a list of all channels+payloads
- * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
- * until and unless the transaction commits.  pendingNotifies is NIL if no
- * NOTIFYs have been done in the current transaction.
+ * NOTIFYed in the current transaction.  We do not actually perform a NOTIFY
+ * until and unless the transaction commits.  pendingNotifies is NULL if no
+ * NOTIFYs have been done in the current (sub) transaction.
+ *
+ * We discard duplicate notify events issued in the same transaction.
+ * Hence, in addition to the list proper (which we need to track the order
+ * of the events, since we guarantee to deliver them in order), we build a
+ * hash table which we can probe to detect duplicates.  Since building the
+ * hash table is somewhat expensive, we do so only once we have at least
+ * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
+ * before that we just scan the events linearly.
  *
  * The list is kept in CurTransactionContext.  In subtransactions, each
  * subtransaction has its own list in its own CurTransactionContext, but
- * successful subtransactions attach their lists to their parent's list.
- * Failed subtransactions simply discard their lists.
+ * successful subtransactions add their entries to their parent's list.
+ * Failed subtransactions simply discard their lists.  Since these lists
+ * are independent, there may be notify events in a subtransaction's list
+ * that duplicate events in some ancestor (sub) transaction; we get rid of
+ * the dups when merging the subtransaction's list into its parent's.
  *
  * Note: the action and notify lists do not interact within a transaction.
  * In particular, if a transaction does NOTIFY and then LISTEN on the same
@@ -339,11 +351,26 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */
  */
 typedef struct Notification
 {
-       char       *channel;            /* channel name */
-       char       *payload;            /* payload string (can be empty) */
+       uint16          channel_len;    /* length of channel-name string */
+       uint16          payload_len;    /* length of payload string */
+       /* null-terminated channel name, then null-terminated payload follow */
+       char            data[FLEXIBLE_ARRAY_MEMBER];
 } Notification;
 
-static List *pendingNotifies = NIL; /* list of Notifications */
+typedef struct NotificationList
+{
+       List       *events;                     /* list of Notification structs */
+       HTAB       *hashtab;            /* hash of NotificationHash structs, or NULL */
+} NotificationList;
+
+#define MIN_HASHABLE_NOTIFIES 16       /* threshold to build hashtab */
+
+typedef struct NotificationHash
+{
+       Notification *event;            /* => the actual Notification struct */
+} NotificationHash;
+
+static NotificationList *pendingNotifies = NULL;       /* current list, if any */
 
 static List *upperPendingNotifies = NIL;       /* list of upper-xact lists */
 
@@ -392,7 +419,10 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
                                                                                 Snapshot snapshot);
 static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(void);
-static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
+static bool AsyncExistsPendingNotify(Notification *n);
+static void AddEventToPendingNotifies(Notification *n);
+static uint32 notification_hash(const void *key, Size keysize);
+static int     notification_match(const void *key1, const void *key2, Size keysize);
 static void ClearPendingActionsAndNotifies(void);
 
 /*
@@ -541,6 +571,8 @@ pg_notify(PG_FUNCTION_ARGS)
 void
 Async_Notify(const char *channel, const char *payload)
 {
+       size_t          channel_len;
+       size_t          payload_len;
        Notification *n;
        MemoryContext oldcontext;
 
@@ -550,47 +582,67 @@ Async_Notify(const char *channel, const char *payload)
        if (Trace_notify)
                elog(DEBUG1, "Async_Notify(%s)", channel);
 
+       channel_len = channel ? strlen(channel) : 0;
+       payload_len = payload ? strlen(payload) : 0;
+
        /* a channel name must be specified */
-       if (!channel || !strlen(channel))
+       if (channel_len == 0)
                ereport(ERROR,
                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                 errmsg("channel name cannot be empty")));
 
-       if (strlen(channel) >= NAMEDATALEN)
+       /* enforce length limits */
+       if (channel_len >= NAMEDATALEN)
                ereport(ERROR,
                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                 errmsg("channel name too long")));
 
-       if (payload)
-       {
-               if (strlen(payload) >= NOTIFY_PAYLOAD_MAX_LENGTH)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                                        errmsg("payload string too long")));
-       }
-
-       /* no point in making duplicate entries in the list ... */
-       if (AsyncExistsPendingNotify(channel, payload))
-               return;
+       if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                errmsg("payload string too long")));
 
        /*
+        * We must construct the Notification entry, even if we end up not using
+        * it, in order to compare it cheaply to existing list entries.
+        *
         * The notification list needs to live until end of transaction, so store
         * it in the transaction context.
         */
        oldcontext = MemoryContextSwitchTo(CurTransactionContext);
 
-       n = (Notification *) palloc(sizeof(Notification));
-       n->channel = pstrdup(channel);
+       n = (Notification *) palloc(offsetof(Notification, data) +
+                                                               channel_len + payload_len + 2);
+       n->channel_len = channel_len;
+       n->payload_len = payload_len;
+       strcpy(n->data, channel);
        if (payload)
-               n->payload = pstrdup(payload);
+               strcpy(n->data + channel_len + 1, payload);
        else
-               n->payload = "";
+               n->data[channel_len + 1] = '\0';
 
-       /*
-        * We want to preserve the order so we need to append every notification.
-        * See comments at AsyncExistsPendingNotify().
-        */
-       pendingNotifies = lappend(pendingNotifies, n);
+       /* Now check for duplicates */
+       if (AsyncExistsPendingNotify(n))
+       {
+               /* It's a dup, so forget it */
+               pfree(n);
+               MemoryContextSwitchTo(oldcontext);
+               return;
+       }
+
+       if (pendingNotifies == NULL)
+       {
+               /* First notify event in current (sub)xact */
+               pendingNotifies = (NotificationList *) palloc(sizeof(NotificationList));
+               pendingNotifies->events = list_make1(n);
+               /* We certainly don't need a hashtable yet */
+               pendingNotifies->hashtab = NULL;
+       }
+       else
+       {
+               /* Append more events to existing list */
+               AddEventToPendingNotifies(n);
+       }
 
        MemoryContextSwitchTo(oldcontext);
 }
@@ -761,7 +813,7 @@ PreCommit_Notify(void)
 {
        ListCell   *p;
 
-       if (pendingActions == NIL && pendingNotifies == NIL)
+       if (!pendingActions && !pendingNotifies)
                return;                                 /* no relevant statements in this xact */
 
        if (Trace_notify)
@@ -821,7 +873,7 @@ PreCommit_Notify(void)
                /* Now push the notifications into the queue */
                backendHasSentNotifications = true;
 
-               nextNotify = list_head(pendingNotifies);
+               nextNotify = list_head(pendingNotifies->events);
                while (nextNotify != NULL)
                {
                        /*
@@ -1267,8 +1319,8 @@ asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
 static void
 asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
 {
-       size_t          channellen = strlen(n->channel);
-       size_t          payloadlen = strlen(n->payload);
+       size_t          channellen = n->channel_len;
+       size_t          payloadlen = n->payload_len;
        int                     entryLength;
 
        Assert(channellen < NAMEDATALEN);
@@ -1281,8 +1333,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
        qe->dboid = MyDatabaseId;
        qe->xid = GetCurrentTransactionId();
        qe->srcPid = MyProcPid;
-       memcpy(qe->data, n->channel, channellen + 1);
-       memcpy(qe->data + channellen + 1, n->payload, payloadlen + 1);
+       memcpy(qe->data, n->data, channellen + payloadlen + 2);
 }
 
 /*
@@ -1294,7 +1345,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
  * database OID in order to fill the page. So every page is always used up to
  * the last byte which simplifies reading the page later.
  *
- * We are passed the list cell (in pendingNotifies) containing the next
+ * We are passed the list cell (in pendingNotifies->events) containing the next
  * notification to write and return the first still-unwritten cell back.
  * Eventually we will return NULL indicating all is done.
  *
@@ -1345,7 +1396,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
                if (offset + qe.length <= QUEUE_PAGESIZE)
                {
                        /* OK, so advance nextNotify past this item */
-                       nextNotify = lnext(pendingNotifies, nextNotify);
+                       nextNotify = lnext(pendingNotifies->events, nextNotify);
                }
                else
                {
@@ -1607,7 +1658,7 @@ AtSubStart_Notify(void)
        Assert(list_length(upperPendingNotifies) ==
                   GetCurrentTransactionNestLevel() - 1);
 
-       pendingNotifies = NIL;
+       pendingNotifies = NULL;
 
        MemoryContextSwitchTo(old_cxt);
 }
@@ -1621,7 +1672,7 @@ void
 AtSubCommit_Notify(void)
 {
        List       *parentPendingActions;
-       List       *parentPendingNotifies;
+       NotificationList *parentPendingNotifies;
 
        parentPendingActions = linitial_node(List, upperPendingActions);
        upperPendingActions = list_delete_first(upperPendingActions);
@@ -1634,16 +1685,41 @@ AtSubCommit_Notify(void)
         */
        pendingActions = list_concat(parentPendingActions, pendingActions);
 
-       parentPendingNotifies = linitial_node(List, upperPendingNotifies);
+       parentPendingNotifies = (NotificationList *) linitial(upperPendingNotifies);
        upperPendingNotifies = list_delete_first(upperPendingNotifies);
 
        Assert(list_length(upperPendingNotifies) ==
                   GetCurrentTransactionNestLevel() - 2);
 
-       /*
-        * We could try to eliminate duplicates here, but it seems not worthwhile.
-        */
-       pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
+       if (pendingNotifies == NULL)
+       {
+               /* easy, no notify events happened in current subxact */
+               pendingNotifies = parentPendingNotifies;
+       }
+       else if (parentPendingNotifies == NULL)
+       {
+               /* easy, subxact's list becomes parent's */
+       }
+       else
+       {
+               /*
+                * Formerly, we didn't bother to eliminate duplicates here, but now we
+                * must, else we fall foul of "Assert(!found)", either here or during
+                * a later attempt to build the parent-level hashtable.
+                */
+               NotificationList *childPendingNotifies = pendingNotifies;
+               ListCell   *l;
+
+               pendingNotifies = parentPendingNotifies;
+               /* Insert all the subxact's events into parent, except for dups */
+               foreach(l, childPendingNotifies->events)
+               {
+                       Notification *childn = (Notification *) lfirst(l);
+
+                       if (!AsyncExistsPendingNotify(childn))
+                               AddEventToPendingNotifies(childn);
+               }
+       }
 }
 
 /*
@@ -1672,7 +1748,7 @@ AtSubAbort_Notify(void)
 
        while (list_length(upperPendingNotifies) > my_level - 2)
        {
-               pendingNotifies = linitial_node(List, upperPendingNotifies);
+               pendingNotifies = (NotificationList *) linitial(upperPendingNotifies);
                upperPendingNotifies = list_delete_first(upperPendingNotifies);
        }
 }
@@ -2098,52 +2174,139 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
                elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
 }
 
-/* Does pendingNotifies include the given channel/payload? */
+/* Does pendingNotifies include a match for the given event? */
 static bool
-AsyncExistsPendingNotify(const char *channel, const char *payload)
+AsyncExistsPendingNotify(Notification *n)
 {
-       ListCell   *p;
-       Notification *n;
-
-       if (pendingNotifies == NIL)
+       if (pendingNotifies == NULL)
                return false;
 
-       if (payload == NULL)
-               payload = "";
+       if (pendingNotifies->hashtab != NULL)
+       {
+               /* Use the hash table to probe for a match */
+               if (hash_search(pendingNotifies->hashtab,
+                                               &n,
+                                               HASH_FIND,
+                                               NULL))
+                       return true;
+       }
+       else
+       {
+               /* Must scan the event list */
+               ListCell   *l;
 
-       /*----------
-        * We need to append new elements to the end of the list in order to keep
-        * the order. However, on the other hand we'd like to check the list
-        * backwards in order to make duplicate-elimination a tad faster when the
-        * same condition is signaled many times in a row. So as a compromise we
-        * check the tail element first which we can access directly. If this
-        * doesn't match, we check the whole list.
-        *
-        * As we are not checking our parents' lists, we can still get duplicates
-        * in combination with subtransactions, like in:
-        *
-        * begin;
-        * notify foo '1';
-        * savepoint foo;
-        * notify foo '1';
-        * commit;
-        *----------
-        */
-       n = (Notification *) llast(pendingNotifies);
-       if (strcmp(n->channel, channel) == 0 &&
-               strcmp(n->payload, payload) == 0)
-               return true;
+               foreach(l, pendingNotifies->events)
+               {
+                       Notification *oldn = (Notification *) lfirst(l);
+
+                       if (n->channel_len == oldn->channel_len &&
+                               n->payload_len == oldn->payload_len &&
+                               memcmp(n->data, oldn->data,
+                                          n->channel_len + n->payload_len + 2) == 0)
+                               return true;
+               }
+       }
+
+       return false;
+}
+
+/*
+ * Add a notification event to a pre-existing pendingNotifies list.
+ *
+ * Because pendingNotifies->events is already nonempty, this works
+ * correctly no matter what CurrentMemoryContext is.
+ */
+static void
+AddEventToPendingNotifies(Notification *n)
+{
+       Assert(pendingNotifies->events != NIL);
 
-       foreach(p, pendingNotifies)
+       /* Create the hash table if it's time to */
+       if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES &&
+               pendingNotifies->hashtab == NULL)
        {
-               n = (Notification *) lfirst(p);
+               HASHCTL         hash_ctl;
+               ListCell   *l;
+
+               /* Create the hash table */
+               MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+               hash_ctl.keysize = sizeof(Notification *);
+               hash_ctl.entrysize = sizeof(NotificationHash);
+               hash_ctl.hash = notification_hash;
+               hash_ctl.match = notification_match;
+               hash_ctl.hcxt = CurTransactionContext;
+               pendingNotifies->hashtab =
+                       hash_create("Pending Notifies",
+                                               256L,
+                                               &hash_ctl,
+                                               HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT);
+
+               /* Insert all the already-existing events */
+               foreach(l, pendingNotifies->events)
+               {
+                       Notification *oldn = (Notification *) lfirst(l);
+                       NotificationHash *hentry;
+                       bool            found;
+
+                       hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab,
+                                                                                                         &oldn,
+                                                                                                         HASH_ENTER,
+                                                                                                         &found);
+                       Assert(!found);
+                       hentry->event = oldn;
+               }
+       }
 
-               if (strcmp(n->channel, channel) == 0 &&
-                       strcmp(n->payload, payload) == 0)
-                       return true;
+       /* Add new event to the list, in order */
+       pendingNotifies->events = lappend(pendingNotifies->events, n);
+
+       /* Add event to the hash table if needed */
+       if (pendingNotifies->hashtab != NULL)
+       {
+               NotificationHash *hentry;
+               bool            found;
+
+               hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab,
+                                                                                                 &n,
+                                                                                                 HASH_ENTER,
+                                                                                                 &found);
+               Assert(!found);
+               hentry->event = n;
        }
+}
 
-       return false;
+/*
+ * notification_hash: hash function for notification hash table
+ *
+ * The hash "keys" are pointers to Notification structs.
+ */
+static uint32
+notification_hash(const void *key, Size keysize)
+{
+       const Notification *k = *(const Notification *const *) key;
+
+       Assert(keysize == sizeof(Notification *));
+       /* We don't bother to include the payload's trailing null in the hash */
+       return DatumGetUInt32(hash_any((const unsigned char *) k->data,
+                                                                  k->channel_len + k->payload_len + 1));
+}
+
+/*
+ * notification_match: match function to use with notification_hash
+ */
+static int
+notification_match(const void *key1, const void *key2, Size keysize)
+{
+       const Notification *k1 = *(const Notification *const *) key1;
+       const Notification *k2 = *(const Notification *const *) key2;
+
+       Assert(keysize == sizeof(Notification *));
+       if (k1->channel_len == k2->channel_len &&
+               k1->payload_len == k2->payload_len &&
+               memcmp(k1->data, k2->data,
+                          k1->channel_len + k1->payload_len + 2) == 0)
+               return 0;                               /* equal */
+       return 1;                                       /* not equal */
 }
 
 /* Clear the pendingActions and pendingNotifies lists. */
@@ -2158,5 +2321,5 @@ ClearPendingActionsAndNotifies(void)
         * pointers.
         */
        pendingActions = NIL;
-       pendingNotifies = NIL;
+       pendingNotifies = NULL;
 }
index 60ba50658dd277d19cb252acac0e12812373836b..7ad26b7d8f20e726fdc79904ead04dfeba5cb7d1 100644 (file)
@@ -40,8 +40,6 @@ step notifys1:
        ROLLBACK TO SAVEPOINT s2;
        COMMIT;
 
-notifier: NOTIFY "c1" with payload "payload" from notifier
-notifier: NOTIFY "c2" with payload "payload" from notifier
 notifier: NOTIFY "c1" with payload "payload" from notifier
 notifier: NOTIFY "c2" with payload "payload" from notifier
 notifier: NOTIFY "c1" with payload "payloads" from notifier