#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
+#include "utils/hashutils.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
/*
* State for outbound notifies consists of a list of all channels+payloads
- * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
- * until and unless the transaction commits. pendingNotifies is NIL if no
- * NOTIFYs have been done in the current transaction.
+ * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
+ * until and unless the transaction commits. pendingNotifies is NULL if no
+ * NOTIFYs have been done in the current (sub) transaction.
+ *
+ * We discard duplicate notify events issued in the same transaction.
+ * Hence, in addition to the list proper (which we need to track the order
+ * of the events, since we guarantee to deliver them in order), we build a
+ * hash table which we can probe to detect duplicates. Since building the
+ * hash table is somewhat expensive, we do so only once we have at least
+ * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
+ * before that we just scan the events linearly.
*
* The list is kept in CurTransactionContext. In subtransactions, each
* subtransaction has its own list in its own CurTransactionContext, but
- * successful subtransactions attach their lists to their parent's list.
- * Failed subtransactions simply discard their lists.
+ * successful subtransactions add their entries to their parent's list.
+ * Failed subtransactions simply discard their lists. Since these lists
+ * are independent, there may be notify events in a subtransaction's list
+ * that duplicate events in some ancestor (sub) transaction; we get rid of
+ * the dups when merging the subtransaction's list into its parent's.
*
* Note: the action and notify lists do not interact within a transaction.
* In particular, if a transaction does NOTIFY and then LISTEN on the same
*/
typedef struct Notification
{
- char *channel; /* channel name */
- char *payload; /* payload string (can be empty) */
+ uint16 channel_len; /* length of channel-name string */
+ uint16 payload_len; /* length of payload string */
+ /* null-terminated channel name, then null-terminated payload follow */
+ char data[FLEXIBLE_ARRAY_MEMBER];
} Notification;
-static List *pendingNotifies = NIL; /* list of Notifications */
+typedef struct NotificationList
+{
+ List *events; /* list of Notification structs */
+ HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
+} NotificationList;
+
+#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */
+
+typedef struct NotificationHash
+{
+ Notification *event; /* => the actual Notification struct */
+} NotificationHash;
+
+static NotificationList *pendingNotifies = NULL; /* current list, if any */
static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
Snapshot snapshot);
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(void);
-static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
+static bool AsyncExistsPendingNotify(Notification *n);
+static void AddEventToPendingNotifies(Notification *n);
+static uint32 notification_hash(const void *key, Size keysize);
+static int notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies(void);
/*
void
Async_Notify(const char *channel, const char *payload)
{
+ size_t channel_len;
+ size_t payload_len;
Notification *n;
MemoryContext oldcontext;
if (Trace_notify)
elog(DEBUG1, "Async_Notify(%s)", channel);
+ channel_len = channel ? strlen(channel) : 0;
+ payload_len = payload ? strlen(payload) : 0;
+
/* a channel name must be specified */
- if (!channel || !strlen(channel))
+ if (channel_len == 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("channel name cannot be empty")));
- if (strlen(channel) >= NAMEDATALEN)
+ /* enforce length limits */
+ if (channel_len >= NAMEDATALEN)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("channel name too long")));
- if (payload)
- {
- if (strlen(payload) >= NOTIFY_PAYLOAD_MAX_LENGTH)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("payload string too long")));
- }
-
- /* no point in making duplicate entries in the list ... */
- if (AsyncExistsPendingNotify(channel, payload))
- return;
+ if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("payload string too long")));
/*
+ * We must construct the Notification entry, even if we end up not using
+ * it, in order to compare it cheaply to existing list entries.
+ *
* The notification list needs to live until end of transaction, so store
* it in the transaction context.
*/
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
- n = (Notification *) palloc(sizeof(Notification));
- n->channel = pstrdup(channel);
+ n = (Notification *) palloc(offsetof(Notification, data) +
+ channel_len + payload_len + 2);
+ n->channel_len = channel_len;
+ n->payload_len = payload_len;
+ strcpy(n->data, channel);
if (payload)
- n->payload = pstrdup(payload);
+ strcpy(n->data + channel_len + 1, payload);
else
- n->payload = "";
+ n->data[channel_len + 1] = '\0';
- /*
- * We want to preserve the order so we need to append every notification.
- * See comments at AsyncExistsPendingNotify().
- */
- pendingNotifies = lappend(pendingNotifies, n);
+ /* Now check for duplicates */
+ if (AsyncExistsPendingNotify(n))
+ {
+ /* It's a dup, so forget it */
+ pfree(n);
+ MemoryContextSwitchTo(oldcontext);
+ return;
+ }
+
+ if (pendingNotifies == NULL)
+ {
+ /* First notify event in current (sub)xact */
+ pendingNotifies = (NotificationList *) palloc(sizeof(NotificationList));
+ pendingNotifies->events = list_make1(n);
+ /* We certainly don't need a hashtable yet */
+ pendingNotifies->hashtab = NULL;
+ }
+ else
+ {
+ /* Append more events to existing list */
+ AddEventToPendingNotifies(n);
+ }
MemoryContextSwitchTo(oldcontext);
}
{
ListCell *p;
- if (pendingActions == NIL && pendingNotifies == NIL)
+ if (!pendingActions && !pendingNotifies)
return; /* no relevant statements in this xact */
if (Trace_notify)
/* Now push the notifications into the queue */
backendHasSentNotifications = true;
- nextNotify = list_head(pendingNotifies);
+ nextNotify = list_head(pendingNotifies->events);
while (nextNotify != NULL)
{
/*
static void
asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
{
- size_t channellen = strlen(n->channel);
- size_t payloadlen = strlen(n->payload);
+ size_t channellen = n->channel_len;
+ size_t payloadlen = n->payload_len;
int entryLength;
Assert(channellen < NAMEDATALEN);
qe->dboid = MyDatabaseId;
qe->xid = GetCurrentTransactionId();
qe->srcPid = MyProcPid;
- memcpy(qe->data, n->channel, channellen + 1);
- memcpy(qe->data + channellen + 1, n->payload, payloadlen + 1);
+ memcpy(qe->data, n->data, channellen + payloadlen + 2);
}
/*
* database OID in order to fill the page. So every page is always used up to
* the last byte which simplifies reading the page later.
*
- * We are passed the list cell (in pendingNotifies) containing the next
+ * We are passed the list cell (in pendingNotifies->events) containing the next
* notification to write and return the first still-unwritten cell back.
* Eventually we will return NULL indicating all is done.
*
if (offset + qe.length <= QUEUE_PAGESIZE)
{
/* OK, so advance nextNotify past this item */
- nextNotify = lnext(pendingNotifies, nextNotify);
+ nextNotify = lnext(pendingNotifies->events, nextNotify);
}
else
{
Assert(list_length(upperPendingNotifies) ==
GetCurrentTransactionNestLevel() - 1);
- pendingNotifies = NIL;
+ pendingNotifies = NULL;
MemoryContextSwitchTo(old_cxt);
}
AtSubCommit_Notify(void)
{
List *parentPendingActions;
- List *parentPendingNotifies;
+ NotificationList *parentPendingNotifies;
parentPendingActions = linitial_node(List, upperPendingActions);
upperPendingActions = list_delete_first(upperPendingActions);
*/
pendingActions = list_concat(parentPendingActions, pendingActions);
- parentPendingNotifies = linitial_node(List, upperPendingNotifies);
+ parentPendingNotifies = (NotificationList *) linitial(upperPendingNotifies);
upperPendingNotifies = list_delete_first(upperPendingNotifies);
Assert(list_length(upperPendingNotifies) ==
GetCurrentTransactionNestLevel() - 2);
- /*
- * We could try to eliminate duplicates here, but it seems not worthwhile.
- */
- pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
+ if (pendingNotifies == NULL)
+ {
+ /* easy, no notify events happened in current subxact */
+ pendingNotifies = parentPendingNotifies;
+ }
+ else if (parentPendingNotifies == NULL)
+ {
+ /* easy, subxact's list becomes parent's */
+ }
+ else
+ {
+ /*
+ * Formerly, we didn't bother to eliminate duplicates here, but now we
+ * must, else we fall foul of "Assert(!found)", either here or during
+ * a later attempt to build the parent-level hashtable.
+ */
+ NotificationList *childPendingNotifies = pendingNotifies;
+ ListCell *l;
+
+ pendingNotifies = parentPendingNotifies;
+ /* Insert all the subxact's events into parent, except for dups */
+ foreach(l, childPendingNotifies->events)
+ {
+ Notification *childn = (Notification *) lfirst(l);
+
+ if (!AsyncExistsPendingNotify(childn))
+ AddEventToPendingNotifies(childn);
+ }
+ }
}
/*
while (list_length(upperPendingNotifies) > my_level - 2)
{
- pendingNotifies = linitial_node(List, upperPendingNotifies);
+ pendingNotifies = (NotificationList *) linitial(upperPendingNotifies);
upperPendingNotifies = list_delete_first(upperPendingNotifies);
}
}
elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
}
-/* Does pendingNotifies include the given channel/payload? */
+/* Does pendingNotifies include a match for the given event? */
static bool
-AsyncExistsPendingNotify(const char *channel, const char *payload)
+AsyncExistsPendingNotify(Notification *n)
{
- ListCell *p;
- Notification *n;
-
- if (pendingNotifies == NIL)
+ if (pendingNotifies == NULL)
return false;
- if (payload == NULL)
- payload = "";
+ if (pendingNotifies->hashtab != NULL)
+ {
+ /* Use the hash table to probe for a match */
+ if (hash_search(pendingNotifies->hashtab,
+ &n,
+ HASH_FIND,
+ NULL))
+ return true;
+ }
+ else
+ {
+ /* Must scan the event list */
+ ListCell *l;
- /*----------
- * We need to append new elements to the end of the list in order to keep
- * the order. However, on the other hand we'd like to check the list
- * backwards in order to make duplicate-elimination a tad faster when the
- * same condition is signaled many times in a row. So as a compromise we
- * check the tail element first which we can access directly. If this
- * doesn't match, we check the whole list.
- *
- * As we are not checking our parents' lists, we can still get duplicates
- * in combination with subtransactions, like in:
- *
- * begin;
- * notify foo '1';
- * savepoint foo;
- * notify foo '1';
- * commit;
- *----------
- */
- n = (Notification *) llast(pendingNotifies);
- if (strcmp(n->channel, channel) == 0 &&
- strcmp(n->payload, payload) == 0)
- return true;
+ foreach(l, pendingNotifies->events)
+ {
+ Notification *oldn = (Notification *) lfirst(l);
+
+ if (n->channel_len == oldn->channel_len &&
+ n->payload_len == oldn->payload_len &&
+ memcmp(n->data, oldn->data,
+ n->channel_len + n->payload_len + 2) == 0)
+ return true;
+ }
+ }
+
+ return false;
+}
+
+/*
+ * Add a notification event to a pre-existing pendingNotifies list.
+ *
+ * Because pendingNotifies->events is already nonempty, this works
+ * correctly no matter what CurrentMemoryContext is.
+ */
+static void
+AddEventToPendingNotifies(Notification *n)
+{
+ Assert(pendingNotifies->events != NIL);
- foreach(p, pendingNotifies)
+ /* Create the hash table if it's time to */
+ if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES &&
+ pendingNotifies->hashtab == NULL)
{
- n = (Notification *) lfirst(p);
+ HASHCTL hash_ctl;
+ ListCell *l;
+
+ /* Create the hash table */
+ MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+ hash_ctl.keysize = sizeof(Notification *);
+ hash_ctl.entrysize = sizeof(NotificationHash);
+ hash_ctl.hash = notification_hash;
+ hash_ctl.match = notification_match;
+ hash_ctl.hcxt = CurTransactionContext;
+ pendingNotifies->hashtab =
+ hash_create("Pending Notifies",
+ 256L,
+ &hash_ctl,
+ HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT);
+
+ /* Insert all the already-existing events */
+ foreach(l, pendingNotifies->events)
+ {
+ Notification *oldn = (Notification *) lfirst(l);
+ NotificationHash *hentry;
+ bool found;
+
+ hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab,
+ &oldn,
+ HASH_ENTER,
+ &found);
+ Assert(!found);
+ hentry->event = oldn;
+ }
+ }
- if (strcmp(n->channel, channel) == 0 &&
- strcmp(n->payload, payload) == 0)
- return true;
+ /* Add new event to the list, in order */
+ pendingNotifies->events = lappend(pendingNotifies->events, n);
+
+ /* Add event to the hash table if needed */
+ if (pendingNotifies->hashtab != NULL)
+ {
+ NotificationHash *hentry;
+ bool found;
+
+ hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab,
+ &n,
+ HASH_ENTER,
+ &found);
+ Assert(!found);
+ hentry->event = n;
}
+}
- return false;
+/*
+ * notification_hash: hash function for notification hash table
+ *
+ * The hash "keys" are pointers to Notification structs.
+ */
+static uint32
+notification_hash(const void *key, Size keysize)
+{
+ const Notification *k = *(const Notification *const *) key;
+
+ Assert(keysize == sizeof(Notification *));
+ /* We don't bother to include the payload's trailing null in the hash */
+ return DatumGetUInt32(hash_any((const unsigned char *) k->data,
+ k->channel_len + k->payload_len + 1));
+}
+
+/*
+ * notification_match: match function to use with notification_hash
+ */
+static int
+notification_match(const void *key1, const void *key2, Size keysize)
+{
+ const Notification *k1 = *(const Notification *const *) key1;
+ const Notification *k2 = *(const Notification *const *) key2;
+
+ Assert(keysize == sizeof(Notification *));
+ if (k1->channel_len == k2->channel_len &&
+ k1->payload_len == k2->payload_len &&
+ memcmp(k1->data, k2->data,
+ k1->channel_len + k1->payload_len + 2) == 0)
+ return 0; /* equal */
+ return 1; /* not equal */
}
/* Clear the pendingActions and pendingNotifies lists. */
* pointers.
*/
pendingActions = NIL;
- pendingNotifies = NIL;
+ pendingNotifies = NULL;
}