From bb5ae8f6c4161e1742a90f27b697eeb14812e65f Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Thu, 15 Aug 2019 12:22:12 -0400 Subject: [PATCH] Use a hash table to de-duplicate NOTIFY events faster. MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Previously, async.c got rid of duplicate notifications by scanning the list of pending events to compare each one to the proposed new event. This works okay for very small numbers of distinct events, but degrades as O(N^2) for many events. We can improve matters by using a hash table to probe for duplicates. So as not to add a lot of overhead for the simple cases that the code did handle well before, create the hash table only once a (sub)transaction has queued more than 16 distinct notify events. A downside is that we now have to do per-event work to propagate a successful subtransaction's notify events up to its parent. (But this isn't significant unless the subtransaction had many events, in which case the O(N^2) behavior would have been in play already, so we still come out ahead.) We can make some lemonade out of this lemon, though: since we must examine each event anyway, it's now possible to de-duplicate events fully, rather than skipping that for events merged up from subtransactions. Hence, remove the old weasel wording in notify.sgml about whether de-duplication happens or not, and adjust the test case in async-notify.spec that exhibited the old behavior. While at it, rearrange the definition of struct Notification to make it more compact and require just one palloc per event, rather than two or three. This saves space when there are a lot of events, in fact more than enough to buy back the space needed for the hash table. Patch by me, based on discussions around a different patch submitted by Filip Rembiałkowski. Discussion: https://postgr.es/m/17822.1564186806@sss.pgh.pa.us --- doc/src/sgml/ref/notify.sgml | 6 +- src/backend/commands/async.c | 331 ++++++++++++++----- src/test/isolation/expected/async-notify.out | 2 - 3 files changed, 250 insertions(+), 89 deletions(-) diff --git a/doc/src/sgml/ref/notify.sgml b/doc/src/sgml/ref/notify.sgml index e0e125a2a2..d7dcbea02d 100644 --- a/doc/src/sgml/ref/notify.sgml +++ b/doc/src/sgml/ref/notify.sgml @@ -94,9 +94,9 @@ NOTIFY channel [ , - If the same channel name is signaled multiple times from the same - transaction with identical payload strings, the - database server can decide to deliver a single notification only. + If the same channel name is signaled multiple times with identical + payload strings within the same transaction, only one instance of the + notification event is delivered to listeners. On the other hand, notifications with distinct payload strings will always be delivered as distinct notifications. Similarly, notifications from different transactions will never get folded into one notification. diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 6e9c580ec6..6cb2d445f0 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -135,6 +135,7 @@ #include "storage/sinval.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/hashutils.h" #include "utils/memutils.h" #include "utils/ps_status.h" #include "utils/snapmgr.h" @@ -323,14 +324,25 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */ /* * State for outbound notifies consists of a list of all channels+payloads - * NOTIFYed in the current transaction. We do not actually perform a NOTIFY - * until and unless the transaction commits. pendingNotifies is NIL if no - * NOTIFYs have been done in the current transaction. + * NOTIFYed in the current transaction. We do not actually perform a NOTIFY + * until and unless the transaction commits. pendingNotifies is NULL if no + * NOTIFYs have been done in the current (sub) transaction. + * + * We discard duplicate notify events issued in the same transaction. + * Hence, in addition to the list proper (which we need to track the order + * of the events, since we guarantee to deliver them in order), we build a + * hash table which we can probe to detect duplicates. Since building the + * hash table is somewhat expensive, we do so only once we have at least + * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction; + * before that we just scan the events linearly. * * The list is kept in CurTransactionContext. In subtransactions, each * subtransaction has its own list in its own CurTransactionContext, but - * successful subtransactions attach their lists to their parent's list. - * Failed subtransactions simply discard their lists. + * successful subtransactions add their entries to their parent's list. + * Failed subtransactions simply discard their lists. Since these lists + * are independent, there may be notify events in a subtransaction's list + * that duplicate events in some ancestor (sub) transaction; we get rid of + * the dups when merging the subtransaction's list into its parent's. * * Note: the action and notify lists do not interact within a transaction. * In particular, if a transaction does NOTIFY and then LISTEN on the same @@ -339,11 +351,26 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */ */ typedef struct Notification { - char *channel; /* channel name */ - char *payload; /* payload string (can be empty) */ + uint16 channel_len; /* length of channel-name string */ + uint16 payload_len; /* length of payload string */ + /* null-terminated channel name, then null-terminated payload follow */ + char data[FLEXIBLE_ARRAY_MEMBER]; } Notification; -static List *pendingNotifies = NIL; /* list of Notifications */ +typedef struct NotificationList +{ + List *events; /* list of Notification structs */ + HTAB *hashtab; /* hash of NotificationHash structs, or NULL */ +} NotificationList; + +#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */ + +typedef struct NotificationHash +{ + Notification *event; /* => the actual Notification struct */ +} NotificationHash; + +static NotificationList *pendingNotifies = NULL; /* current list, if any */ static List *upperPendingNotifies = NIL; /* list of upper-xact lists */ @@ -392,7 +419,10 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, Snapshot snapshot); static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(void); -static bool AsyncExistsPendingNotify(const char *channel, const char *payload); +static bool AsyncExistsPendingNotify(Notification *n); +static void AddEventToPendingNotifies(Notification *n); +static uint32 notification_hash(const void *key, Size keysize); +static int notification_match(const void *key1, const void *key2, Size keysize); static void ClearPendingActionsAndNotifies(void); /* @@ -541,6 +571,8 @@ pg_notify(PG_FUNCTION_ARGS) void Async_Notify(const char *channel, const char *payload) { + size_t channel_len; + size_t payload_len; Notification *n; MemoryContext oldcontext; @@ -550,47 +582,67 @@ Async_Notify(const char *channel, const char *payload) if (Trace_notify) elog(DEBUG1, "Async_Notify(%s)", channel); + channel_len = channel ? strlen(channel) : 0; + payload_len = payload ? strlen(payload) : 0; + /* a channel name must be specified */ - if (!channel || !strlen(channel)) + if (channel_len == 0) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("channel name cannot be empty"))); - if (strlen(channel) >= NAMEDATALEN) + /* enforce length limits */ + if (channel_len >= NAMEDATALEN) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("channel name too long"))); - if (payload) - { - if (strlen(payload) >= NOTIFY_PAYLOAD_MAX_LENGTH) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("payload string too long"))); - } - - /* no point in making duplicate entries in the list ... */ - if (AsyncExistsPendingNotify(channel, payload)) - return; + if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("payload string too long"))); /* + * We must construct the Notification entry, even if we end up not using + * it, in order to compare it cheaply to existing list entries. + * * The notification list needs to live until end of transaction, so store * it in the transaction context. */ oldcontext = MemoryContextSwitchTo(CurTransactionContext); - n = (Notification *) palloc(sizeof(Notification)); - n->channel = pstrdup(channel); + n = (Notification *) palloc(offsetof(Notification, data) + + channel_len + payload_len + 2); + n->channel_len = channel_len; + n->payload_len = payload_len; + strcpy(n->data, channel); if (payload) - n->payload = pstrdup(payload); + strcpy(n->data + channel_len + 1, payload); else - n->payload = ""; + n->data[channel_len + 1] = '\0'; - /* - * We want to preserve the order so we need to append every notification. - * See comments at AsyncExistsPendingNotify(). - */ - pendingNotifies = lappend(pendingNotifies, n); + /* Now check for duplicates */ + if (AsyncExistsPendingNotify(n)) + { + /* It's a dup, so forget it */ + pfree(n); + MemoryContextSwitchTo(oldcontext); + return; + } + + if (pendingNotifies == NULL) + { + /* First notify event in current (sub)xact */ + pendingNotifies = (NotificationList *) palloc(sizeof(NotificationList)); + pendingNotifies->events = list_make1(n); + /* We certainly don't need a hashtable yet */ + pendingNotifies->hashtab = NULL; + } + else + { + /* Append more events to existing list */ + AddEventToPendingNotifies(n); + } MemoryContextSwitchTo(oldcontext); } @@ -761,7 +813,7 @@ PreCommit_Notify(void) { ListCell *p; - if (pendingActions == NIL && pendingNotifies == NIL) + if (!pendingActions && !pendingNotifies) return; /* no relevant statements in this xact */ if (Trace_notify) @@ -821,7 +873,7 @@ PreCommit_Notify(void) /* Now push the notifications into the queue */ backendHasSentNotifications = true; - nextNotify = list_head(pendingNotifies); + nextNotify = list_head(pendingNotifies->events); while (nextNotify != NULL) { /* @@ -1267,8 +1319,8 @@ asyncQueueAdvance(volatile QueuePosition *position, int entryLength) static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) { - size_t channellen = strlen(n->channel); - size_t payloadlen = strlen(n->payload); + size_t channellen = n->channel_len; + size_t payloadlen = n->payload_len; int entryLength; Assert(channellen < NAMEDATALEN); @@ -1281,8 +1333,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) qe->dboid = MyDatabaseId; qe->xid = GetCurrentTransactionId(); qe->srcPid = MyProcPid; - memcpy(qe->data, n->channel, channellen + 1); - memcpy(qe->data + channellen + 1, n->payload, payloadlen + 1); + memcpy(qe->data, n->data, channellen + payloadlen + 2); } /* @@ -1294,7 +1345,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) * database OID in order to fill the page. So every page is always used up to * the last byte which simplifies reading the page later. * - * We are passed the list cell (in pendingNotifies) containing the next + * We are passed the list cell (in pendingNotifies->events) containing the next * notification to write and return the first still-unwritten cell back. * Eventually we will return NULL indicating all is done. * @@ -1345,7 +1396,7 @@ asyncQueueAddEntries(ListCell *nextNotify) if (offset + qe.length <= QUEUE_PAGESIZE) { /* OK, so advance nextNotify past this item */ - nextNotify = lnext(pendingNotifies, nextNotify); + nextNotify = lnext(pendingNotifies->events, nextNotify); } else { @@ -1607,7 +1658,7 @@ AtSubStart_Notify(void) Assert(list_length(upperPendingNotifies) == GetCurrentTransactionNestLevel() - 1); - pendingNotifies = NIL; + pendingNotifies = NULL; MemoryContextSwitchTo(old_cxt); } @@ -1621,7 +1672,7 @@ void AtSubCommit_Notify(void) { List *parentPendingActions; - List *parentPendingNotifies; + NotificationList *parentPendingNotifies; parentPendingActions = linitial_node(List, upperPendingActions); upperPendingActions = list_delete_first(upperPendingActions); @@ -1634,16 +1685,41 @@ AtSubCommit_Notify(void) */ pendingActions = list_concat(parentPendingActions, pendingActions); - parentPendingNotifies = linitial_node(List, upperPendingNotifies); + parentPendingNotifies = (NotificationList *) linitial(upperPendingNotifies); upperPendingNotifies = list_delete_first(upperPendingNotifies); Assert(list_length(upperPendingNotifies) == GetCurrentTransactionNestLevel() - 2); - /* - * We could try to eliminate duplicates here, but it seems not worthwhile. - */ - pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies); + if (pendingNotifies == NULL) + { + /* easy, no notify events happened in current subxact */ + pendingNotifies = parentPendingNotifies; + } + else if (parentPendingNotifies == NULL) + { + /* easy, subxact's list becomes parent's */ + } + else + { + /* + * Formerly, we didn't bother to eliminate duplicates here, but now we + * must, else we fall foul of "Assert(!found)", either here or during + * a later attempt to build the parent-level hashtable. + */ + NotificationList *childPendingNotifies = pendingNotifies; + ListCell *l; + + pendingNotifies = parentPendingNotifies; + /* Insert all the subxact's events into parent, except for dups */ + foreach(l, childPendingNotifies->events) + { + Notification *childn = (Notification *) lfirst(l); + + if (!AsyncExistsPendingNotify(childn)) + AddEventToPendingNotifies(childn); + } + } } /* @@ -1672,7 +1748,7 @@ AtSubAbort_Notify(void) while (list_length(upperPendingNotifies) > my_level - 2) { - pendingNotifies = linitial_node(List, upperPendingNotifies); + pendingNotifies = (NotificationList *) linitial(upperPendingNotifies); upperPendingNotifies = list_delete_first(upperPendingNotifies); } } @@ -2098,52 +2174,139 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid) elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload); } -/* Does pendingNotifies include the given channel/payload? */ +/* Does pendingNotifies include a match for the given event? */ static bool -AsyncExistsPendingNotify(const char *channel, const char *payload) +AsyncExistsPendingNotify(Notification *n) { - ListCell *p; - Notification *n; - - if (pendingNotifies == NIL) + if (pendingNotifies == NULL) return false; - if (payload == NULL) - payload = ""; + if (pendingNotifies->hashtab != NULL) + { + /* Use the hash table to probe for a match */ + if (hash_search(pendingNotifies->hashtab, + &n, + HASH_FIND, + NULL)) + return true; + } + else + { + /* Must scan the event list */ + ListCell *l; - /*---------- - * We need to append new elements to the end of the list in order to keep - * the order. However, on the other hand we'd like to check the list - * backwards in order to make duplicate-elimination a tad faster when the - * same condition is signaled many times in a row. So as a compromise we - * check the tail element first which we can access directly. If this - * doesn't match, we check the whole list. - * - * As we are not checking our parents' lists, we can still get duplicates - * in combination with subtransactions, like in: - * - * begin; - * notify foo '1'; - * savepoint foo; - * notify foo '1'; - * commit; - *---------- - */ - n = (Notification *) llast(pendingNotifies); - if (strcmp(n->channel, channel) == 0 && - strcmp(n->payload, payload) == 0) - return true; + foreach(l, pendingNotifies->events) + { + Notification *oldn = (Notification *) lfirst(l); + + if (n->channel_len == oldn->channel_len && + n->payload_len == oldn->payload_len && + memcmp(n->data, oldn->data, + n->channel_len + n->payload_len + 2) == 0) + return true; + } + } + + return false; +} + +/* + * Add a notification event to a pre-existing pendingNotifies list. + * + * Because pendingNotifies->events is already nonempty, this works + * correctly no matter what CurrentMemoryContext is. + */ +static void +AddEventToPendingNotifies(Notification *n) +{ + Assert(pendingNotifies->events != NIL); - foreach(p, pendingNotifies) + /* Create the hash table if it's time to */ + if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES && + pendingNotifies->hashtab == NULL) { - n = (Notification *) lfirst(p); + HASHCTL hash_ctl; + ListCell *l; + + /* Create the hash table */ + MemSet(&hash_ctl, 0, sizeof(hash_ctl)); + hash_ctl.keysize = sizeof(Notification *); + hash_ctl.entrysize = sizeof(NotificationHash); + hash_ctl.hash = notification_hash; + hash_ctl.match = notification_match; + hash_ctl.hcxt = CurTransactionContext; + pendingNotifies->hashtab = + hash_create("Pending Notifies", + 256L, + &hash_ctl, + HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT); + + /* Insert all the already-existing events */ + foreach(l, pendingNotifies->events) + { + Notification *oldn = (Notification *) lfirst(l); + NotificationHash *hentry; + bool found; + + hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab, + &oldn, + HASH_ENTER, + &found); + Assert(!found); + hentry->event = oldn; + } + } - if (strcmp(n->channel, channel) == 0 && - strcmp(n->payload, payload) == 0) - return true; + /* Add new event to the list, in order */ + pendingNotifies->events = lappend(pendingNotifies->events, n); + + /* Add event to the hash table if needed */ + if (pendingNotifies->hashtab != NULL) + { + NotificationHash *hentry; + bool found; + + hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab, + &n, + HASH_ENTER, + &found); + Assert(!found); + hentry->event = n; } +} - return false; +/* + * notification_hash: hash function for notification hash table + * + * The hash "keys" are pointers to Notification structs. + */ +static uint32 +notification_hash(const void *key, Size keysize) +{ + const Notification *k = *(const Notification *const *) key; + + Assert(keysize == sizeof(Notification *)); + /* We don't bother to include the payload's trailing null in the hash */ + return DatumGetUInt32(hash_any((const unsigned char *) k->data, + k->channel_len + k->payload_len + 1)); +} + +/* + * notification_match: match function to use with notification_hash + */ +static int +notification_match(const void *key1, const void *key2, Size keysize) +{ + const Notification *k1 = *(const Notification *const *) key1; + const Notification *k2 = *(const Notification *const *) key2; + + Assert(keysize == sizeof(Notification *)); + if (k1->channel_len == k2->channel_len && + k1->payload_len == k2->payload_len && + memcmp(k1->data, k2->data, + k1->channel_len + k1->payload_len + 2) == 0) + return 0; /* equal */ + return 1; /* not equal */ } /* Clear the pendingActions and pendingNotifies lists. */ @@ -2158,5 +2321,5 @@ ClearPendingActionsAndNotifies(void) * pointers. */ pendingActions = NIL; - pendingNotifies = NIL; + pendingNotifies = NULL; } diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out index 60ba50658d..7ad26b7d8f 100644 --- a/src/test/isolation/expected/async-notify.out +++ b/src/test/isolation/expected/async-notify.out @@ -40,8 +40,6 @@ step notifys1: ROLLBACK TO SAVEPOINT s2; COMMIT; -notifier: NOTIFY "c1" with payload "payload" from notifier -notifier: NOTIFY "c2" with payload "payload" from notifier notifier: NOTIFY "c1" with payload "payload" from notifier notifier: NOTIFY "c2" with payload "payload" from notifier notifier: NOTIFY "c1" with payload "payloads" from notifier -- 2.40.0