From d1e027221d0243b7b57eabb0e482923dd7d1c8eb Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Tue, 16 Feb 2010 22:34:57 +0000 Subject: [PATCH] Replace the pg_listener-based LISTEN/NOTIFY mechanism with an in-memory queue. In addition, add support for a "payload" string to be passed along with each notify event. This implementation should be significantly more efficient than the old one, and is also more compatible with Hot Standby usage. There is not yet any facility for HS slaves to receive notifications generated on the master, although such a thing is possible in future. Joachim Wieland, reviewed by Jeff Davis; also hacked on by me. --- doc/src/sgml/catalogs.sgml | 69 +- doc/src/sgml/func.sgml | 18 +- doc/src/sgml/libpq.sgml | 52 +- doc/src/sgml/protocol.sgml | 23 +- doc/src/sgml/ref/listen.sgml | 38 +- doc/src/sgml/ref/notify.sgml | 121 +- doc/src/sgml/ref/unlisten.sgml | 19 +- doc/src/sgml/storage.sgml | 7 +- src/backend/access/transam/slru.c | 23 +- src/backend/access/transam/twophase_rmgr.c | 7 +- src/backend/access/transam/xact.c | 11 +- src/backend/catalog/Makefile | 4 +- src/backend/commands/async.c | 1969 +++++++++++++++----- src/backend/nodes/copyfuncs.c | 3 +- src/backend/nodes/equalfuncs.c | 3 +- src/backend/nodes/outfuncs.c | 3 +- src/backend/nodes/readfuncs.c | 3 +- src/backend/parser/gram.y | 12 +- src/backend/storage/ipc/ipci.c | 5 +- src/backend/storage/lmgr/lwlock.c | 6 +- src/backend/tcop/postgres.c | 6 +- src/backend/tcop/utility.c | 12 +- src/backend/utils/adt/ruleutils.c | 7 +- src/bin/initdb/initdb.c | 3 +- src/bin/psql/common.c | 11 +- src/bin/psql/tab-complete.c | 6 +- src/include/access/slru.h | 27 +- src/include/access/twophase_rmgr.h | 7 +- src/include/catalog/catversion.h | 4 +- src/include/catalog/pg_listener.h | 59 - src/include/catalog/pg_proc.h | 8 +- src/include/commands/async.h | 27 +- src/include/nodes/parsenodes.h | 3 +- src/include/storage/lwlock.h | 4 +- src/test/regress/expected/guc.out | 12 +- src/test/regress/expected/sanity_check.out | 3 +- src/test/regress/sql/guc.sql | 4 +- 37 files changed, 1843 insertions(+), 756 deletions(-) delete mode 100644 src/include/catalog/pg_listener.h diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 3503bc852c..e5bef18d7d 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -1,4 +1,4 @@ - + @@ -168,11 +168,6 @@ metadata for large objects - - pg_listener - asynchronous notification support - - pg_namespace schemas @@ -3253,68 +3248,6 @@ - - <structname>pg_listener</structname> - - - pg_listener - - - - The catalog pg_listener supports the - and - - commands. A listener creates an entry in - pg_listener for each notification name - it is listening for. A notifier scans pg_listener - and updates each matching entry to show that a notification has occurred. - The notifier also sends a signal (using the PID recorded in the table) - to awaken the listener from sleep. - - - - <structname>pg_listener</> Columns - - - - - Name - Type - Description - - - - - - relname - name - - Notify condition name. (The name need not match any actual - relation in the database; the name relname is historical.) - - - - - listenerpid - int4 - PID of the server process that created this entry - - - - notification - int4 - - Zero if no event is pending for this listener. If an event is - pending, the PID of the server process that sent the notification - - - - -
- -
- - <structname>pg_namespace</structname> diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 71952ee1fc..5436971115 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -1,4 +1,4 @@ - + Functions and Operators @@ -11529,6 +11529,12 @@ postgres=# select * from unnest2(array[[1,2],[3,4]]);
+ + pg_listening_channels() + setof text + channel names that the session is currently listening on + + inet_client_addr() inet @@ -11674,6 +11680,16 @@ SET search_path TO schema , schema, .. + + pg_listening_channels + + + + pg_listening_channels returns a set of names of + channels that the current session is listening to. See for more information. + + inet_client_addr diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 4972a8c259..0bdb6401c3 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -1,4 +1,4 @@ - + <application>libpq</application> - C Library @@ -307,28 +307,28 @@ Description - + - + disable only try a non-SSL connection - + allow first try a non-SSL connection; if that fails, try an SSL connection - + prefer (default) first try an SSL connection; if that fails, try a non-SSL connection - + require only try an SSL connection @@ -481,7 +481,7 @@ - If expand_dbname is non-zero and + If expand_dbname is non-zero and dbname contains an = sign, it is taken as a conninfo string in exactly the same way as if it had been passed to PQconnectdb(see below). Previously @@ -4111,50 +4111,48 @@ typedef struct { PostgreSQL offers asynchronous notification via the LISTEN and NOTIFY commands. A client session registers its interest in a particular - notification condition with the LISTEN command (and + notification channel with the LISTEN command (and can stop listening with the UNLISTEN command). All - sessions listening on a particular condition will be notified + sessions listening on a particular channel will be notified asynchronously when a NOTIFY command with that - condition name is executed by any session. No additional information - is passed from the notifier to the listener. Thus, typically, any - actual data that needs to be communicated is transferred through a - database table. Commonly, the condition name is the same as the - associated table, but it is not necessary for there to be any associated - table. + channel name is executed by any session. A payload string can + be passed to communicate additional data to the listeners. libpq applications submit - LISTEN and UNLISTEN commands as + LISTEN, UNLISTEN, + and NOTIFY commands as ordinary SQL commands. The arrival of NOTIFY messages can subsequently be detected by calling PQnotifies.PQnotifies - The function PQnotifies - returns the next notification from a list of unhandled - notification messages received from the server. It returns a null pointer if - there are no pending notifications. Once a notification is - returned from PQnotifies, it is considered handled and will be - removed from the list of notifications. + The function PQnotifies returns the next notification + from a list of unhandled notification messages received from the server. + It returns a null pointer if there are no pending notifications. Once a + notification is returned from PQnotifies, it is considered + handled and will be removed from the list of notifications. + PGnotify *PQnotifies(PGconn *conn); typedef struct pgNotify { - char *relname; /* notification condition name */ + char *relname; /* notification channel name */ int be_pid; /* process ID of notifying server process */ - char *extra; /* notification parameter */ + char *extra; /* notification payload string */ } PGnotify; + After processing a PGnotify object returned by PQnotifies, be sure to free it with PQfreemem. It is sufficient to free the PGnotify pointer; the relname and extra - fields do not represent separate allocations. (At present, the - extra field is unused and will always point - to an empty string.) + fields do not represent separate allocations. (The names of these fields + are historical; in particular, channel names need not have anything to + do with relation names.) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index e4364ec305..f0a2aeba03 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1,4 +1,4 @@ - + Frontend/Backend Protocol @@ -354,7 +354,7 @@ This message contains the response data from the previous step of GSSAPI or SSPI negotiation (AuthenticationGSS, AuthenticationSSPI - or a previous AuthenticationGSSContinue). If the GSSAPI + or a previous AuthenticationGSSContinue). If the GSSAPI or SSPI data in this message indicates more data is needed to complete the authentication, the frontend must send that data as another PasswordMessage. If @@ -992,7 +992,7 @@ In the event of a backend-detected error during copy-in mode (including - receipt of a CopyFail message), the backend will issue an ErrorResponse + receipt of a CopyFail message), the backend will issue an ErrorResponse message. If the COPY command was issued via an extended-query message, the backend will now discard frontend messages until a Sync message is received, then it will issue ReadyForQuery and return to normal @@ -1117,7 +1117,7 @@ backend will send a NotificationResponse message (not to be confused with NoticeResponse!) whenever a NOTIFY command is executed for the same - notification name. + channel name. @@ -1340,7 +1340,7 @@ This section describes the base data types used in messages. value that will appear, otherwise the value is variable. Eg. String, String("user"). - + There is no predefined limit on the length of a string @@ -1951,7 +1951,7 @@ Bind (F) (denoted R below). This can be zero to indicate that there are no result columns or that the result columns should all use the default format - (text); + (text); or one, in which case the specified format code is applied to all result columns (if any); or it can equal the actual number of result columns of the query. @@ -2500,7 +2500,7 @@ CopyOutResponse (B) separated by separator characters, etc). 1 indicates the overall copy format is binary (similar to DataRow format). See for more information. + endterm="sql-copy-title"> for more information. @@ -3187,7 +3187,7 @@ NotificationResponse (B) - The name of the condition that the notify has been raised on. + The name of the channel that the notify has been raised on. @@ -3197,9 +3197,7 @@ NotificationResponse (B) - Additional information passed from the notifying process. - (Currently, this feature is unimplemented so the field - is always an empty string.) + The payload string passed from the notifying process. @@ -4353,7 +4351,7 @@ the backend. The NotificationResponse ('A') message has an additional string -field, which is presently empty but might someday carry additional data passed +field, which can carry a payload string passed from the NOTIFY event sender. @@ -4364,5 +4362,4 @@ string parameter; this has been removed. - diff --git a/doc/src/sgml/ref/listen.sgml b/doc/src/sgml/ref/listen.sgml index 93ca74c572..57577c1f6a 100644 --- a/doc/src/sgml/ref/listen.sgml +++ b/doc/src/sgml/ref/listen.sgml @@ -1,5 +1,5 @@ @@ -21,7 +21,7 @@ PostgreSQL documentation -LISTEN name +LISTEN channel @@ -30,24 +30,23 @@ LISTEN name LISTEN registers the current session as a - listener on the notification condition name. + listener on the notification channel named channel. If the current session is already registered as a listener for - this notification condition, nothing is done. + this notification channel, nothing is done. Whenever the command NOTIFY name is invoked, either + class="PARAMETER">channel is invoked, either by this session or another one connected to the same database, all - the sessions currently listening on that notification condition are + the sessions currently listening on that notification channel are notified, and each will in turn notify its connected client - application. See the discussion of NOTIFY for - more information. + application. - A session can be unregistered for a given notify condition with the + A session can be unregistered for a given notification channel with the UNLISTEN command. A session's listen registrations are automatically cleared when the session ends. @@ -78,16 +77,31 @@ LISTEN name - name + channel - Name of a notify condition (any identifier). + Name of a notification channel (any identifier). + + Notes + + + LISTEN takes effect at transaction commit. + If LISTEN or UNLISTEN is executed + within a transaction that later rolls back, the set of notification + channels being listened to is unchanged. + + + A transaction that has executed LISTEN cannot be + prepared for two-phase commit. + + + Examples diff --git a/doc/src/sgml/ref/notify.sgml b/doc/src/sgml/ref/notify.sgml index 563fbbe963..b612bb4cb2 100644 --- a/doc/src/sgml/ref/notify.sgml +++ b/doc/src/sgml/ref/notify.sgml @@ -1,5 +1,5 @@ @@ -21,7 +21,7 @@ PostgreSQL documentation -NOTIFY name +NOTIFY channel [ , payload ] @@ -29,35 +29,39 @@ NOTIFY name Description - The NOTIFY command sends a notification event to each - client application that has previously executed - LISTEN name - for the specified notification name in the current database. + The NOTIFY command sends a notification event together + with an optional payload string to each client application that + has previously executed + LISTEN channel + for the specified channel name in the current database. - NOTIFY provides a simple form of signal or + NOTIFY provides a simple interprocess communication mechanism for a collection of processes accessing the same PostgreSQL database. - Higher-level mechanisms can be built by using tables in the database to - pass additional data (beyond a mere notification name) from notifier to - listener(s). + A payload string can be sent along with the notification, and + higher-level mechanisms for passing structured data can be built by using + tables in the database to pass additional data from notifier to listener(s). - The information passed to the client for a notification event includes the notification - name and the notifying session's server process PID. It is up to the - database designer to define the notification names that will be used in a given - database and what each one means. + The information passed to the client for a notification event includes the + notification channel + name, the notifying session's server process PID, and the + payload string, which is an empty string if it has not been specified. - Commonly, the notification name is the same as the name of some table in + It is up to the database designer to define the channel names that will + be used in a given database and what each one means. + Commonly, the channel name is the same as the name of some table in the database, and the notify event essentially means, I changed this table, take a look at it to see what's new. But no such association is enforced by the NOTIFY and LISTEN commands. For - example, a database designer could use several different notification names - to signal different sorts of changes to a single table. + example, a database designer could use several different channel names + to signal different sorts of changes to a single table. Alternatively, + the payload string could be used to differentiate various cases. @@ -89,19 +93,22 @@ NOTIFY name - NOTIFY behaves like Unix signals in one important - respect: if the same notification name is signaled multiple times in quick - succession, recipients might get only one notification event for several executions - of NOTIFY. So it is a bad idea to depend on the number - of notifications received. Instead, use NOTIFY to wake up - applications that need to pay attention to something, and use a database - object (such as a sequence) to keep track of what happened or how many times - it happened. + If the same channel name is signaled multiple times from the same + transaction with identical payload strings, the + database server can decide to deliver a single notification only. + On the other hand, notifications with distinct payload strings will + always be delivered as distinct notifications. Similarly, notifications from + different transactions will never get folded into one notification. + Except for dropping later instances of duplicate notifications, + NOTIFY guarantees that notifications from the same + transaction get delivered in the order they were sent. It is also + guaranteed that messages from different transactions are delivered in + the order in which the transactions committed. It is common for a client that executes NOTIFY - to be listening on the same notification name itself. In that case + to be listening on the same notification channel itself. In that case it will get back a notification event, just like all the other listening sessions. Depending on the application logic, this could result in useless work, for example, reading a database table to @@ -111,12 +118,7 @@ NOTIFY name notification event message) is the same as one's own session's PID (available from libpq). When they are the same, the notification event is one's own work bouncing - back, and can be ignored. (Despite what was said in the preceding - paragraph, this is a safe technique. - PostgreSQL keeps self-notifications - separate from notifications arriving from other sessions, so you - cannot miss an outside notification by ignoring your own - notifications.) + back, and can be ignored. @@ -125,16 +127,61 @@ NOTIFY name - name + channel - Name of the notification to be signaled (any identifier). + Name of the notification channel to be signaled (any identifier). + + + + + payload + + + The payload string to be communicated along with the + notification. This string must be shorter than 8000 bytes, and + is treated as text. + (If binary data or large amounts of information need to be communicated, + it's best to put it in a database table and send the key of the record.) + + Notes + + + pg_notify + + + + To send a notification you can also use the function + pg_notify(text, + text). The function takes the channel name as the + first argument and the payload as the second. The function is much easier + to use than the NOTIFY command if you need to work with + non-constant channel names and payloads. + + + There is a queue that holds notifications that have been sent but not + yet processed by all listening sessions. If this queue becomes full, + transactions calling NOTIFY will fail at commit. + The queue is quite large (8GB in a standard installation) and should be + sufficiently sized for almost every use case. However, no cleanup can take + place if a session executes LISTEN and then enters a + transaction for a very long time. Once the queue is half full you will see + warnings in the log file pointing you to the session that is preventing + cleanup. In this case you should make sure that this session ends its + current transaction so that cleanup can proceed. + + + A transaction that has executed NOTIFY cannot be + prepared for two-phase commit. + + + Examples @@ -146,6 +193,12 @@ NOTIFY name LISTEN virtual; NOTIFY virtual; Asynchronous notification "virtual" received from server process with PID 8448. +NOTIFY virtual, 'This is the payload'; +Asynchronous notification "virtual" with payload "This is the payload" received from server process with PID 8448. + +LISTEN foo; +SELECT pg_notify('fo' || 'o', 'pay' || 'load'); +Asynchronous notification "foo" with payload "payload" received from server process with PID 14728. diff --git a/doc/src/sgml/ref/unlisten.sgml b/doc/src/sgml/ref/unlisten.sgml index 7cdd7394cb..9e22ea4edc 100644 --- a/doc/src/sgml/ref/unlisten.sgml +++ b/doc/src/sgml/ref/unlisten.sgml @@ -1,5 +1,5 @@ @@ -21,7 +21,7 @@ PostgreSQL documentation -UNLISTEN { name | * } +UNLISTEN { channel | * } @@ -33,8 +33,8 @@ UNLISTEN { name | * } registration for NOTIFY events. UNLISTEN cancels any existing registration of the current PostgreSQL session as a - listener on the notification name. The special wildcard + listener on the notification channel named channel. The special wildcard * cancels all listener registrations for the current session. @@ -52,10 +52,10 @@ UNLISTEN { name | * } - name + channel - Name of a notification (any identifier). + Name of a notification channel (any identifier). @@ -83,6 +83,11 @@ UNLISTEN { name | * } At the end of each session, UNLISTEN * is automatically executed. + + + A transaction that has executed UNLISTEN cannot be + prepared for two-phase commit. + @@ -100,7 +105,7 @@ Asynchronous notification "virtual" received from server process with PID 8448. Once UNLISTEN has been executed, further NOTIFY - commands will be ignored: + messages will be ignored: UNLISTEN virtual; diff --git a/doc/src/sgml/storage.sgml b/doc/src/sgml/storage.sgml index e0cef7dd7b..c4b38ddb6c 100644 --- a/doc/src/sgml/storage.sgml +++ b/doc/src/sgml/storage.sgml @@ -1,4 +1,4 @@ - + @@ -77,6 +77,11 @@ Item (used for shared row locks) + + pg_notify + Subdirectory containing LISTEN/NOTIFY status data + + pg_stat_tmp Subdirectory containing temporary files for the statistics diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c index d5f999f494..6226acc928 100644 --- a/src/backend/access/transam/slru.c +++ b/src/backend/access/transam/slru.c @@ -41,7 +41,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/access/transam/slru.c,v 1.48 2010/01/02 16:57:35 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/slru.c,v 1.49 2010/02/16 22:34:43 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -59,25 +59,6 @@ #include "miscadmin.h" -/* - * Define segment size. A page is the same BLCKSZ as is used everywhere - * else in Postgres. The segment size can be chosen somewhat arbitrarily; - * we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG - * or 64K transactions for SUBTRANS. - * - * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF, - * page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where - * xxxx is CLOG or SUBTRANS, respectively), and segment numbering at - * 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need - * take no explicit notice of that fact in this module, except when comparing - * segment and page numbers in SimpleLruTruncate (see PagePrecedes()). - * - * Note: this file currently assumes that segment file names will be four - * hex digits. This sets a lower bound on the segment size (64K transactions - * for 32-bit TransactionIds). - */ -#define SLRU_PAGES_PER_SEGMENT 32 - #define SlruFileName(ctl, path, seg) \ snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg) @@ -183,6 +164,8 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, shared = (SlruShared) ShmemInitStruct(name, SimpleLruShmemSize(nslots, nlsns), &found); + if (!shared) + elog(ERROR, "out of shared memory"); if (!IsUnderPostmaster) { diff --git a/src/backend/access/transam/twophase_rmgr.c b/src/backend/access/transam/twophase_rmgr.c index 0ea8e42ca5..86a1d12f93 100644 --- a/src/backend/access/transam/twophase_rmgr.c +++ b/src/backend/access/transam/twophase_rmgr.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/transam/twophase_rmgr.c,v 1.12 2010/01/02 16:57:35 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/twophase_rmgr.c,v 1.13 2010/02/16 22:34:43 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -16,7 +16,6 @@ #include "access/multixact.h" #include "access/twophase_rmgr.h" -#include "commands/async.h" #include "pgstat.h" #include "storage/lock.h" @@ -25,7 +24,6 @@ const TwoPhaseCallback twophase_recover_callbacks[TWOPHASE_RM_MAX_ID + 1] = { NULL, /* END ID */ lock_twophase_recover, /* Lock */ - NULL, /* notify/listen */ NULL, /* pgstat */ multixact_twophase_recover /* MultiXact */ }; @@ -34,7 +32,6 @@ const TwoPhaseCallback twophase_postcommit_callbacks[TWOPHASE_RM_MAX_ID + 1] = { NULL, /* END ID */ lock_twophase_postcommit, /* Lock */ - notify_twophase_postcommit, /* notify/listen */ pgstat_twophase_postcommit, /* pgstat */ multixact_twophase_postcommit /* MultiXact */ }; @@ -43,7 +40,6 @@ const TwoPhaseCallback twophase_postabort_callbacks[TWOPHASE_RM_MAX_ID + 1] = { NULL, /* END ID */ lock_twophase_postabort, /* Lock */ - NULL, /* notify/listen */ pgstat_twophase_postabort, /* pgstat */ multixact_twophase_postabort /* MultiXact */ }; @@ -52,7 +48,6 @@ const TwoPhaseCallback twophase_standby_recover_callbacks[TWOPHASE_RM_MAX_ID + 1 { NULL, /* END ID */ lock_twophase_standby_recover, /* Lock */ - NULL, /* notify/listen */ NULL, /* pgstat */ NULL /* MultiXact */ }; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index c239c098fd..46a842bb9a 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.285 2010/02/13 16:15:46 sriggs Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.286 2010/02/16 22:34:43 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -1736,8 +1736,12 @@ CommitTransaction(void) /* close large objects before lower-level cleanup */ AtEOXact_LargeObject(true); - /* NOTIFY commit must come before lower-level cleanup */ - AtCommit_Notify(); + /* + * Insert notifications sent by NOTIFY commands into the queue. This + * should be late in the pre-commit sequence to minimize time spent + * holding the notify-insertion lock. + */ + PreCommit_Notify(); /* Prevent cancel/die interrupt while cleaning up */ HOLD_INTERRUPTS(); @@ -1825,6 +1829,7 @@ CommitTransaction(void) /* Check we've released all catcache entries */ AtEOXact_CatCache(true); + AtCommit_Notify(); AtEOXact_GUC(true, 1); AtEOXact_SPI(true); AtEOXact_on_commit_actions(true); diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index 837865efbe..a73971d162 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -2,7 +2,7 @@ # # Makefile for backend/catalog # -# $PostgreSQL: pgsql/src/backend/catalog/Makefile,v 1.76 2010/01/06 19:56:29 tgl Exp $ +# $PostgreSQL: pgsql/src/backend/catalog/Makefile,v 1.77 2010/02/16 22:34:43 tgl Exp $ # #------------------------------------------------------------------------- @@ -30,7 +30,7 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\ pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \ pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \ pg_language.h pg_largeobject_metadata.h pg_largeobject.h pg_aggregate.h \ - pg_statistic.h pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h \ + pg_statistic.h pg_rewrite.h pg_trigger.h pg_description.h \ pg_cast.h pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \ pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \ pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \ diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index afc5e5fabc..f5863480f5 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -7,109 +7,280 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.150 2010/01/02 16:57:36 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.151 2010/02/16 22:34:43 tgl Exp $ * *------------------------------------------------------------------------- */ /*------------------------------------------------------------------------- - * New Async Notification Model: - * 1. Multiple backends on same machine. Multiple backends listening on - * one relation. (Note: "listening on a relation" is not really the - * right way to think about it, since the notify names need not have - * anything to do with the names of relations actually in the database. - * But this terminology is all over the code and docs, and I don't feel - * like trying to replace it.) - * - * 2. There is a tuple in relation "pg_listener" for each active LISTEN, - * ie, each relname/listenerPID pair. The "notification" field of the - * tuple is zero when no NOTIFY is pending for that listener, or the PID - * of the originating backend when a cross-backend NOTIFY is pending. - * (We skip writing to pg_listener when doing a self-NOTIFY, so the - * notification field should never be equal to the listenerPID field.) - * - * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target - * relname to a list of outstanding NOTIFY requests. Actual processing - * happens if and only if we reach transaction commit. At that time (in - * routine AtCommit_Notify) we scan pg_listener for matching relnames. - * If the listenerPID in a matching tuple is ours, we just send a notify - * message to our own front end. If it is not ours, and "notification" - * is not already nonzero, we set notification to our own PID and send a - * PROCSIG_NOTIFY_INTERRUPT signal to the receiving process (indicated by - * listenerPID). - * BTW: if the signal operation fails, we presume that the listener backend - * crashed without removing this tuple, and remove the tuple for it. - * - * 4. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler + * Async Notification Model as of 9.0: + * + * 1. Multiple backends on same machine. Multiple backends listening on + * several channels. (Channels are also called "conditions" in other + * parts of the code.) + * + * 2. There is one central queue in disk-based storage (directory pg_notify/), + * with actively-used pages mapped into shared memory by the slru.c module. + * All notification messages are placed in the queue and later read out + * by listening backends. + * + * There is no central knowledge of which backend listens on which channel; + * every backend has its own list of interesting channels. + * + * Although there is only one queue, notifications are treated as being + * database-local; this is done by including the sender's database OID + * in each notification message. Listening backends ignore messages + * that don't match their database OID. This is important because it + * ensures senders and receivers have the same database encoding and won't + * misinterpret non-ASCII text in the channel name or payload string. + * + * Since notifications are not expected to survive database crashes, + * we can simply clean out the pg_notify data at any reboot, and there + * is no need for WAL support or fsync'ing. + * + * 3. Every backend that is listening on at least one channel registers by + * entering its PID into the array in AsyncQueueControl. It then scans all + * incoming notifications in the central queue and first compares the + * database OID of the notification with its own database OID and then + * compares the notified channel with the list of channels that it listens + * to. In case there is a match it delivers the notification event to its + * frontend. Non-matching events are simply skipped. + * + * 4. The NOTIFY statement (routine Async_Notify) stores the notification in + * a backend-local list which will not be processed until transaction end. + * + * Duplicate notifications from the same transaction are sent out as one + * notification only. This is done to save work when for example a trigger + * on a 2 million row table fires a notification for each row that has been + * changed. If the application needs to receive every single notification + * that has been sent, it can easily add some unique string into the extra + * payload parameter. + * + * When the transaction is ready to commit, PreCommit_Notify() adds the + * pending notifications to the head of the queue. The head pointer of the + * queue always points to the next free position and a position is just a + * page number and the offset in that page. This is done before marking the + * transaction as committed in clog. If we run into problems writing the + * notifications, we can still call elog(ERROR, ...) and the transaction + * will roll back. + * + * Once we have put all of the notifications into the queue, we return to + * CommitTransaction() which will then do the actual transaction commit. + * + * After commit we are called another time (AtCommit_Notify()). Here we + * make the actual updates to the effective listen state (listenChannels). + * + * Finally, after we are out of the transaction altogether, we check if + * we need to signal listening backends. In SignalBackends() we scan the + * list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal + * to every listening backend (we don't know which backend is listening on + * which channel so we must signal them all). We can exclude backends that + * are already up to date, though. We don't bother with a self-signal + * either, but just process the queue directly. + * + * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler * can call inbound-notify processing immediately if this backend is idle * (ie, it is waiting for a frontend command and is not within a transaction * block). Otherwise the handler may only set a flag, which will cause the * processing to occur just before we next go idle. * - * 5. Inbound-notify processing consists of scanning pg_listener for tuples - * matching our own listenerPID and having nonzero notification fields. - * For each such tuple, we send a message to our frontend and clear the - * notification field. BTW: this routine has to start/commit its own - * transaction, since by assumption it is only called from outside any - * transaction. - * - * Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list - * of pending actions. If we reach transaction commit, the changes are - * applied to pg_listener just before executing any pending NOTIFYs. This - * method is necessary because to avoid race conditions, we must hold lock - * on pg_listener from when we insert a new listener tuple until we commit. - * To do that and not create undue hazard of deadlock, we don't want to - * touch pg_listener until we are otherwise done with the transaction; - * in particular it'd be uncool to still be taking user-commanded locks - * while holding the pg_listener lock. - * - * Although we grab ExclusiveLock on pg_listener for any operation, - * the lock is never held very long, so it shouldn't cause too much of - * a performance problem. (Previously we used AccessExclusiveLock, but - * there's no real reason to forbid concurrent reads.) - * - * An application that listens on the same relname it notifies will get + * Inbound-notify processing consists of reading all of the notifications + * that have arrived since scanning last time. We read every notification + * until we reach either a notification from an uncommitted transaction or + * the head pointer's position. Then we check if we were the laziest + * backend: if our pointer is set to the same position as the global tail + * pointer is set, then we move the global tail pointer ahead to where the + * second-laziest backend is (in general, we take the MIN of the current + * head position and all active backends' new tail pointers). Whenever we + * move the global tail pointer we also truncate now-unused pages (i.e., + * delete files in pg_notify/ that are no longer used). + * + * An application that listens on the same channel it notifies will get * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful, * by comparing be_pid in the NOTIFY message to the application's own backend's * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the * frontend during startup.) The above design guarantees that notifies from - * other backends will never be missed by ignoring self-notifies. Note, - * however, that we do *not* guarantee that a separate frontend message will - * be sent for every outside NOTIFY. Since there is only room for one - * originating PID in pg_listener, outside notifies occurring at about the - * same time may be collapsed into a single message bearing the PID of the - * first outside backend to perform the NOTIFY. + * other backends will never be missed by ignoring self-notifies. + * + * The amount of shared memory used for notify management (NUM_ASYNC_BUFFERS) + * can be varied without affecting anything but performance. The maximum + * amount of notification data that can be queued at one time is determined + * by slru.c's wraparound limit; see QUEUE_MAX_PAGE below. *------------------------------------------------------------------------- */ #include "postgres.h" +#include #include #include -#include "access/heapam.h" -#include "access/twophase_rmgr.h" +#include "access/slru.h" +#include "access/transam.h" #include "access/xact.h" -#include "catalog/pg_listener.h" +#include "catalog/pg_database.h" #include "commands/async.h" +#include "funcapi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "miscadmin.h" #include "storage/ipc.h" +#include "storage/lmgr.h" #include "storage/procsignal.h" #include "storage/sinval.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" -#include "utils/fmgroids.h" #include "utils/memutils.h" #include "utils/ps_status.h" -#include "utils/tqual.h" +/* + * Maximum size of a NOTIFY payload, including terminating NULL. This + * must be kept small enough so that a notification message fits on one + * SLRU page. + */ +#define NOTIFY_PAYLOAD_MAX_LENGTH 8000 + +/* + * Struct representing an entry in the global notify queue + * + * This struct declaration has the maximal length, but in a real queue entry + * the data area is only big enough for the actual channel and payload strings + * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible + * entry size, if both channel and payload strings are empty (but note it + * doesn't include alignment padding). + * + * The "length" field should always be rounded up to the next QUEUEALIGN + * multiple so that all fields are properly aligned. + */ +typedef struct AsyncQueueEntry +{ + int length; /* total allocated length of entry */ + Oid dboid; /* sender's database OID */ + TransactionId xid; /* sender's XID */ + int32 srcPid; /* sender's PID */ + char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH]; +} AsyncQueueEntry; + +/* Currently, no field of AsyncQueueEntry requires more than int alignment */ +#define QUEUEALIGN(len) INTALIGN(len) + +#define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2) + +/* + * Struct describing a queue position, and assorted macros for working with it + */ +typedef struct QueuePosition +{ + int page; /* SLRU page number */ + int offset; /* byte offset within page */ +} QueuePosition; + +#define QUEUE_POS_PAGE(x) ((x).page) +#define QUEUE_POS_OFFSET(x) ((x).offset) + +#define SET_QUEUE_POS(x,y,z) \ + do { \ + (x).page = (y); \ + (x).offset = (z); \ + } while (0) + +#define QUEUE_POS_EQUAL(x,y) \ + ((x).page == (y).page && (x).offset == (y).offset) + +/* choose logically smaller QueuePosition */ +#define QUEUE_POS_MIN(x,y) \ + (asyncQueuePagePrecedesLogically((x).page, (y).page) ? (x) : \ + (x).page != (y).page ? (y) : \ + (x).offset < (y).offset ? (x) : (y)) + +/* + * Struct describing a listening backend's status + */ +typedef struct QueueBackendStatus +{ + int32 pid; /* either a PID or InvalidPid */ + QueuePosition pos; /* backend has read queue up to here */ +} QueueBackendStatus; + +#define InvalidPid (-1) + +/* + * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff) + * + * The AsyncQueueControl structure is protected by the AsyncQueueLock. + * + * When holding the lock in SHARED mode, backends may only inspect their own + * entries as well as the head and tail pointers. Consequently we can allow a + * backend to update its own record while holding only SHARED lock (since no + * other backend will inspect it). + * + * When holding the lock in EXCLUSIVE mode, backends can inspect the entries + * of other backends and also change the head and tail pointers. + * + * In order to avoid deadlocks, whenever we need both locks, we always first + * get AsyncQueueLock and then AsyncCtlLock. + * + * Each backend uses the backend[] array entry with index equal to its + * BackendId (which can range from 1 to MaxBackends). We rely on this to make + * SendProcSignal fast. + */ +typedef struct AsyncQueueControl +{ + QueuePosition head; /* head points to the next free location */ + QueuePosition tail; /* the global tail is equivalent to the + tail of the "slowest" backend */ + TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ + QueueBackendStatus backend[1]; /* actually of length MaxBackends+1 */ + /* DO NOT ADD FURTHER STRUCT MEMBERS HERE */ +} AsyncQueueControl; + +static AsyncQueueControl *asyncQueueControl; + +#define QUEUE_HEAD (asyncQueueControl->head) +#define QUEUE_TAIL (asyncQueueControl->tail) +#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid) +#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) + +/* + * The SLRU buffer area through which we access the notification queue + */ +static SlruCtlData AsyncCtlData; + +#define AsyncCtl (&AsyncCtlData) +#define QUEUE_PAGESIZE BLCKSZ +#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */ + +/* + * slru.c currently assumes that all filenames are four characters of hex + * digits. That means that we can use segments 0000 through FFFF. + * Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us + * the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1. + * + * It's of course possible to enhance slru.c, but this gives us so much + * space already that it doesn't seem worth the trouble. + * + * The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2 + * pages, because more than that would confuse slru.c into thinking there + * was a wraparound condition. With the default BLCKSZ this means there + * can be up to 8GB of queued-and-not-read data. + * + * Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of + * SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour. + */ +#define QUEUE_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1) + +/* + * listenChannels identifies the channels we are actually listening to + * (ie, have committed a LISTEN on). It is a simple list of channel names, + * allocated in TopMemoryContext. + */ +static List *listenChannels = NIL; /* list of C strings */ + /* * State for pending LISTEN/UNLISTEN actions consists of an ordered list of * all actions requested in the current transaction. As explained above, - * we don't actually modify pg_listener until we reach transaction commit. + * we don't actually change listenChannels until we reach transaction commit. * * The list is kept in CurTransactionContext. In subtransactions, each * subtransaction has its own list in its own CurTransactionContext, but @@ -126,7 +297,7 @@ typedef enum typedef struct { ListenActionKind action; - char condname[1]; /* actually, as long as needed */ + char channel[1]; /* actually, as long as needed */ } ListenAction; static List *pendingActions = NIL; /* list of ListenAction */ @@ -134,9 +305,9 @@ static List *pendingActions = NIL; /* list of ListenAction */ static List *upperPendingActions = NIL; /* list of upper-xact lists */ /* - * State for outbound notifies consists of a list of all relnames NOTIFYed - * in the current transaction. We do not actually perform a NOTIFY until - * and unless the transaction commits. pendingNotifies is NIL if no + * State for outbound notifies consists of a list of all channels+payloads + * NOTIFYed in the current transaction. We do not actually perform a NOTIFY + * until and unless the transaction commits. pendingNotifies is NIL if no * NOTIFYs have been done in the current transaction. * * The list is kept in CurTransactionContext. In subtransactions, each @@ -149,12 +320,18 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */ * condition name, it will get a self-notify at commit. This is a bit odd * but is consistent with our historical behavior. */ -static List *pendingNotifies = NIL; /* list of C strings */ +typedef struct Notification +{ + char *channel; /* channel name */ + char *payload; /* payload string (can be empty) */ +} Notification; + +static List *pendingNotifies = NIL; /* list of Notifications */ static List *upperPendingNotifies = NIL; /* list of upper-xact lists */ /* - * State for inbound notifies consists of two flags: one saying whether + * State for inbound notifications consists of two flags: one saying whether * the signal handler is currently allowed to call ProcessIncomingNotify * directly, and one saying whether the signal has occurred but the handler * was not allowed to call ProcessIncomingNotify at the time. @@ -168,57 +345,259 @@ static volatile sig_atomic_t notifyInterruptOccurred = 0; /* True if we've registered an on_shmem_exit cleanup */ static bool unlistenExitRegistered = false; +/* has this backend sent notifications in the current transaction? */ +static bool backendHasSentNotifications = false; +/* has this backend executed its first LISTEN in the current transaction? */ +static bool backendHasExecutedInitialListen = false; +/* GUC parameter */ bool Trace_notify = false; - -static void queue_listen(ListenActionKind action, const char *condname); +/* local function prototypes */ +static bool asyncQueuePagePrecedesPhysically(int p, int q); +static bool asyncQueuePagePrecedesLogically(int p, int q); +static void queue_listen(ListenActionKind action, const char *channel); static void Async_UnlistenOnExit(int code, Datum arg); -static void Exec_Listen(Relation lRel, const char *relname); -static void Exec_Unlisten(Relation lRel, const char *relname); -static void Exec_UnlistenAll(Relation lRel); -static void Send_Notify(Relation lRel); +static void Exec_ListenPreCommit(void); +static void Exec_ListenCommit(const char *channel); +static void Exec_UnlistenCommit(const char *channel); +static void Exec_UnlistenAllCommit(void); +static bool IsListeningOn(const char *channel); +static void asyncQueueUnregister(void); +static bool asyncQueueIsFull(void); +static bool asyncQueueAdvance(QueuePosition *position, int entryLength); +static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe); +static ListCell *asyncQueueAddEntries(ListCell *nextNotify); +static void asyncQueueFillWarning(void); +static bool SignalBackends(void); +static void asyncQueueReadAllNotifications(void); +static bool asyncQueueProcessPageEntries(QueuePosition *current, + QueuePosition stop, + char *page_buffer); +static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(void); -static void NotifyMyFrontEnd(char *relname, int32 listenerPID); -static bool AsyncExistsPendingNotify(const char *relname); +static void NotifyMyFrontEnd(const char *channel, + const char *payload, + int32 srcPid); +static bool AsyncExistsPendingNotify(const char *channel, const char *payload); static void ClearPendingActionsAndNotifies(void); +/* + * We will work on the page range of 0..QUEUE_MAX_PAGE. + * + * asyncQueuePagePrecedesPhysically just checks numerically without any magic + * if one page precedes another one. This is wrong for normal operation but + * is helpful when clearing pg_notify/ during startup. + * + * asyncQueuePagePrecedesLogically compares using wraparound logic, as is + * required by slru.c. + */ +static bool +asyncQueuePagePrecedesPhysically(int p, int q) +{ + return p < q; +} + +static bool +asyncQueuePagePrecedesLogically(int p, int q) +{ + int diff; + + /* + * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should + * be in the range 0..QUEUE_MAX_PAGE. + */ + Assert(p >= 0 && p <= QUEUE_MAX_PAGE); + Assert(q >= 0 && q <= QUEUE_MAX_PAGE); + + diff = p - q; + if (diff >= ((QUEUE_MAX_PAGE+1)/2)) + diff -= QUEUE_MAX_PAGE+1; + else if (diff < -((QUEUE_MAX_PAGE+1)/2)) + diff += QUEUE_MAX_PAGE+1; + return diff < 0; +} + +/* + * Report space needed for our shared memory area + */ +Size +AsyncShmemSize(void) +{ + Size size; + + /* This had better match AsyncShmemInit */ + size = mul_size(MaxBackends, sizeof(QueueBackendStatus)); + size = add_size(size, sizeof(AsyncQueueControl)); + + size = add_size(size, SimpleLruShmemSize(NUM_ASYNC_BUFFERS, 0)); + + return size; +} + +/* + * Initialize our shared memory area + */ +void +AsyncShmemInit(void) +{ + bool found; + int slotno; + Size size; + + /* + * Create or attach to the AsyncQueueControl structure. + * + * The used entries in the backend[] array run from 1 to MaxBackends. + * sizeof(AsyncQueueControl) already includes space for the unused zero'th + * entry, but we need to add on space for the used entries. + */ + size = mul_size(MaxBackends, sizeof(QueueBackendStatus)); + size = add_size(size, sizeof(AsyncQueueControl)); + + asyncQueueControl = (AsyncQueueControl *) + ShmemInitStruct("Async Queue Control", size, &found); + + if (!asyncQueueControl) + elog(ERROR, "out of shared memory"); + + if (!found) + { + /* First time through, so initialize it */ + int i; + + SET_QUEUE_POS(QUEUE_HEAD, 0, 0); + SET_QUEUE_POS(QUEUE_TAIL, 0, 0); + asyncQueueControl->lastQueueFillWarn = 0; + /* zero'th entry won't be used, but let's initialize it anyway */ + for (i = 0; i <= MaxBackends; i++) + { + QUEUE_BACKEND_PID(i) = InvalidPid; + SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); + } + } + + /* + * Set up SLRU management of the pg_notify data. + */ + AsyncCtl->PagePrecedes = asyncQueuePagePrecedesLogically; + SimpleLruInit(AsyncCtl, "Async Ctl", NUM_ASYNC_BUFFERS, 0, + AsyncCtlLock, "pg_notify"); + /* Override default assumption that writes should be fsync'd */ + AsyncCtl->do_fsync = false; + + if (!found) + { + /* + * During start or reboot, clean out the pg_notify directory. + * + * Since we want to remove every file, we temporarily use + * asyncQueuePagePrecedesPhysically() and pass INT_MAX as the + * comparison value; every file in the directory should therefore + * appear to be less than that. + */ + AsyncCtl->PagePrecedes = asyncQueuePagePrecedesPhysically; + (void) SlruScanDirectory(AsyncCtl, INT_MAX, true); + AsyncCtl->PagePrecedes = asyncQueuePagePrecedesLogically; + + /* Now initialize page zero to empty */ + LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE); + slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD)); + /* This write is just to verify that pg_notify/ is writable */ + SimpleLruWritePage(AsyncCtl, slotno, NULL); + LWLockRelease(AsyncCtlLock); + } +} + + +/* + * pg_notify - + * SQL function to send a notification event + */ +Datum +pg_notify(PG_FUNCTION_ARGS) +{ + const char *channel; + const char *payload; + + if (PG_ARGISNULL(0)) + channel = ""; + else + channel = text_to_cstring(PG_GETARG_TEXT_PP(0)); + + if (PG_ARGISNULL(1)) + payload = ""; + else + payload = text_to_cstring(PG_GETARG_TEXT_PP(1)); + + Async_Notify(channel, payload); + + PG_RETURN_VOID(); +} + + /* * Async_Notify * * This is executed by the SQL notify command. * - * Adds the relation to the list of pending notifies. + * Adds the message to the list of pending notifies. * Actual notification happens during transaction commit. * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ */ void -Async_Notify(const char *relname) +Async_Notify(const char *channel, const char *payload) { + Notification *n; + MemoryContext oldcontext; + if (Trace_notify) - elog(DEBUG1, "Async_Notify(%s)", relname); + elog(DEBUG1, "Async_Notify(%s)", channel); - /* no point in making duplicate entries in the list ... */ - if (!AsyncExistsPendingNotify(relname)) + /* a channel name must be specified */ + if (!channel || !strlen(channel)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("channel name cannot be empty"))); + + if (strlen(channel) >= NAMEDATALEN) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("channel name too long"))); + + if (payload) { - /* - * The name list needs to live until end of transaction, so store it - * in the transaction context. - */ - MemoryContext oldcontext; + if (strlen(payload) >= NOTIFY_PAYLOAD_MAX_LENGTH) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("payload string too long"))); + } - oldcontext = MemoryContextSwitchTo(CurTransactionContext); + /* no point in making duplicate entries in the list ... */ + if (AsyncExistsPendingNotify(channel, payload)) + return; - /* - * Ordering of the list isn't important. We choose to put new entries - * on the front, as this might make duplicate-elimination a tad faster - * when the same condition is signaled many times in a row. - */ - pendingNotifies = lcons(pstrdup(relname), pendingNotifies); + /* + * The notification list needs to live until end of transaction, so store + * it in the transaction context. + */ + oldcontext = MemoryContextSwitchTo(CurTransactionContext); - MemoryContextSwitchTo(oldcontext); - } + n = (Notification *) palloc(sizeof(Notification)); + n->channel = pstrdup(channel); + if (payload) + n->payload = pstrdup(payload); + else + n->payload = ""; + + /* + * We want to preserve the order so we need to append every + * notification. See comments at AsyncExistsPendingNotify(). + */ + pendingNotifies = lappend(pendingNotifies, n); + + MemoryContextSwitchTo(oldcontext); } /* @@ -226,11 +605,11 @@ Async_Notify(const char *relname) * Common code for listen, unlisten, unlisten all commands. * * Adds the request to the list of pending actions. - * Actual update of pg_listener happens during transaction commit. - * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + * Actual update of the listenChannels list happens during transaction + * commit. */ static void -queue_listen(ListenActionKind action, const char *condname) +queue_listen(ListenActionKind action, const char *channel) { MemoryContext oldcontext; ListenAction *actrec; @@ -244,9 +623,9 @@ queue_listen(ListenActionKind action, const char *condname) oldcontext = MemoryContextSwitchTo(CurTransactionContext); /* space for terminating null is included in sizeof(ListenAction) */ - actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(condname)); + actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(channel)); actrec->action = action; - strcpy(actrec->condname, condname); + strcpy(actrec->channel, channel); pendingActions = lappend(pendingActions, actrec); @@ -259,12 +638,12 @@ queue_listen(ListenActionKind action, const char *condname) * This is executed by the SQL listen command. */ void -Async_Listen(const char *relname) +Async_Listen(const char *channel) { if (Trace_notify) - elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid); + elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid); - queue_listen(LISTEN_LISTEN, relname); + queue_listen(LISTEN_LISTEN, channel); } /* @@ -273,16 +652,16 @@ Async_Listen(const char *relname) * This is executed by the SQL unlisten command. */ void -Async_Unlisten(const char *relname) +Async_Unlisten(const char *channel) { if (Trace_notify) - elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid); + elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid); /* If we couldn't possibly be listening, no need to queue anything */ if (pendingActions == NIL && !unlistenExitRegistered) return; - queue_listen(LISTEN_UNLISTEN, relname); + queue_listen(LISTEN_UNLISTEN, channel); } /* @@ -304,28 +683,63 @@ Async_UnlistenAll(void) } /* - * Async_UnlistenOnExit + * SQL function: return a set of the channel names this backend is actively + * listening to. * - * Clean up the pg_listener table at backend exit. + * Note: this coding relies on the fact that the listenChannels list cannot + * change within a transaction. + */ +Datum +pg_listening_channels(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + ListCell **lcp; + + /* stuff done only on the first call of the function */ + if (SRF_IS_FIRSTCALL()) + { + MemoryContext oldcontext; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* switch to memory context appropriate for multiple function calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + /* allocate memory for user context */ + lcp = (ListCell **) palloc(sizeof(ListCell *)); + *lcp = list_head(listenChannels); + funcctx->user_fctx = (void *) lcp; + + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + lcp = (ListCell **) funcctx->user_fctx; + + while (*lcp != NULL) + { + char *channel = (char *) lfirst(*lcp); + + *lcp = lnext(*lcp); + SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel)); + } + + SRF_RETURN_DONE(funcctx); +} + +/* + * Async_UnlistenOnExit * - * This is executed if we have done any LISTENs in this backend. - * It might not be necessary anymore, if the user UNLISTENed everything, - * but we don't try to detect that case. + * This is executed at backend exit if we have done any LISTENs in this + * backend. It might not be necessary anymore, if the user UNLISTENed + * everything, but we don't try to detect that case. */ static void Async_UnlistenOnExit(int code, Datum arg) { - /* - * We need to start/commit a transaction for the unlisten, but if there is - * already an active transaction we had better abort that one first. - * Otherwise we'd end up committing changes that probably ought to be - * discarded. - */ - AbortOutOfAnyTransaction(); - /* Now we can do the unlisten */ - StartTransactionCommand(); - Async_UnlistenAll(); - CommitTransactionCommand(); + Exec_UnlistenAllCommit(); } /* @@ -337,71 +751,144 @@ Async_UnlistenOnExit(int code, Datum arg) void AtPrepare_Notify(void) { - ListCell *p; - - /* It's not sensible to have any pending LISTEN/UNLISTEN actions */ - if (pendingActions) + /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */ + if (pendingActions || pendingNotifies) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot PREPARE a transaction that has executed LISTEN or UNLISTEN"))); + errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN or NOTIFY"))); +} - /* We can deal with pending NOTIFY though */ - foreach(p, pendingNotifies) +/* + * PreCommit_Notify + * + * This is called at transaction commit, before actually committing to + * clog. + * + * If there are pending LISTEN actions, make sure we are listed in the + * shared-memory listener array. This must happen before commit to + * ensure we don't miss any notifies from transactions that commit + * just after ours. + * + * If there are outbound notify requests in the pendingNotifies list, + * add them to the global queue. We do that before commit so that + * we can still throw error if we run out of queue space. + */ +void +PreCommit_Notify(void) +{ + ListCell *p; + + if (pendingActions == NIL && pendingNotifies == NIL) + return; /* no relevant statements in this xact */ + + if (Trace_notify) + elog(DEBUG1, "PreCommit_Notify"); + + Assert(backendHasExecutedInitialListen == false); + + /* Preflight for any pending listen/unlisten actions */ + foreach(p, pendingActions) { - const char *relname = (const char *) lfirst(p); + ListenAction *actrec = (ListenAction *) lfirst(p); - RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0, - relname, strlen(relname) + 1); + switch (actrec->action) + { + case LISTEN_LISTEN: + Exec_ListenPreCommit(); + break; + case LISTEN_UNLISTEN: + /* there is no Exec_UnlistenPreCommit() */ + break; + case LISTEN_UNLISTEN_ALL: + /* there is no Exec_UnlistenAllPreCommit() */ + break; + } } - /* - * We can clear the state immediately, rather than needing a separate - * PostPrepare call, because if the transaction fails we'd just discard - * the state anyway. - */ - ClearPendingActionsAndNotifies(); + /* Queue any pending notifies */ + if (pendingNotifies) + { + ListCell *nextNotify; + + /* + * Make sure that we have an XID assigned to the current transaction. + * GetCurrentTransactionId is cheap if we already have an XID, but + * not so cheap if we don't, and we'd prefer not to do that work + * while holding AsyncQueueLock. + */ + (void) GetCurrentTransactionId(); + + /* + * Serialize writers by acquiring a special lock that we hold till + * after commit. This ensures that queue entries appear in commit + * order, and in particular that there are never uncommitted queue + * entries ahead of committed ones, so an uncommitted transaction + * can't block delivery of deliverable notifications. + * + * We use a heavyweight lock so that it'll automatically be released + * after either commit or abort. This also allows deadlocks to be + * detected, though really a deadlock shouldn't be possible here. + * + * The lock is on "database 0", which is pretty ugly but it doesn't + * seem worth inventing a special locktag category just for this. + * (Historical note: before PG 9.0, a similar lock on "database 0" was + * used by the flatfiles mechanism.) + */ + LockSharedObject(DatabaseRelationId, InvalidOid, 0, + AccessExclusiveLock); + + /* Now push the notifications into the queue */ + backendHasSentNotifications = true; + + nextNotify = list_head(pendingNotifies); + while (nextNotify != NULL) + { + /* + * Add the pending notifications to the queue. We acquire and + * release AsyncQueueLock once per page, which might be overkill + * but it does allow readers to get in while we're doing this. + * + * A full queue is very uncommon and should really not happen, + * given that we have so much space available in the SLRU pages. + * Nevertheless we need to deal with this possibility. Note that + * when we get here we are in the process of committing our + * transaction, but we have not yet committed to clog, so at this + * point in time we can still roll the transaction back. + */ + LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); + asyncQueueFillWarning(); + if (asyncQueueIsFull()) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("too many notifications in the NOTIFY queue"))); + nextNotify = asyncQueueAddEntries(nextNotify); + LWLockRelease(AsyncQueueLock); + } + } } /* * AtCommit_Notify * - * This is called at transaction commit. - * - * If there are pending LISTEN/UNLISTEN actions, insert or delete - * tuples in pg_listener accordingly. - * - * If there are outbound notify requests in the pendingNotifies list, - * scan pg_listener for matching tuples, and either signal the other - * backend or send a message to our own frontend. + * This is called at transaction commit, after committing to clog. * - * NOTE: we are still inside the current transaction, therefore can - * piggyback on its committing of changes. + * Update listenChannels and clear transaction-local state. */ void AtCommit_Notify(void) { - Relation lRel; ListCell *p; - if (pendingActions == NIL && pendingNotifies == NIL) - return; /* no relevant statements in this xact */ - /* - * NOTIFY is disabled if not normal processing mode. This test used to be - * in xact.c, but it seems cleaner to do it here. + * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to + * return as soon as possible */ - if (!IsNormalProcessingMode()) - { - ClearPendingActionsAndNotifies(); + if (!pendingActions && !pendingNotifies) return; - } if (Trace_notify) elog(DEBUG1, "AtCommit_Notify"); - /* Acquire ExclusiveLock on pg_listener */ - lRel = heap_open(ListenerRelationId, ExclusiveLock); - /* Perform any pending listen/unlisten actions */ foreach(p, pendingActions) { @@ -410,281 +897,640 @@ AtCommit_Notify(void) switch (actrec->action) { case LISTEN_LISTEN: - Exec_Listen(lRel, actrec->condname); + Exec_ListenCommit(actrec->channel); break; case LISTEN_UNLISTEN: - Exec_Unlisten(lRel, actrec->condname); + Exec_UnlistenCommit(actrec->channel); break; case LISTEN_UNLISTEN_ALL: - Exec_UnlistenAll(lRel); + Exec_UnlistenAllCommit(); break; } + } + + /* + * If we did an initial LISTEN, listenChannels now has the entry, so + * we no longer need or want the flag to be set. + */ + backendHasExecutedInitialListen = false; + + /* And clean up */ + ClearPendingActionsAndNotifies(); +} + +/* + * Exec_ListenPreCommit --- subroutine for PreCommit_Notify + * + * This function must make sure we are ready to catch any incoming messages. + */ +static void +Exec_ListenPreCommit(void) +{ + /* + * Nothing to do if we are already listening to something, nor if we + * already ran this routine in this transaction. + */ + if (listenChannels != NIL || backendHasExecutedInitialListen) + return; + + if (Trace_notify) + elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid); + + /* + * We need this variable to detect an aborted initial LISTEN. + * In that case we would set up our pointer but not listen on any channel. + * This flag gets cleared in AtCommit_Notify or AtAbort_Notify(). + */ + backendHasExecutedInitialListen = true; + + /* + * Before registering, make sure we will unlisten before dying. + * (Note: this action does not get undone if we abort later.) + */ + if (!unlistenExitRegistered) + { + on_shmem_exit(Async_UnlistenOnExit, 0); + unlistenExitRegistered = true; + } + + /* + * This is our first LISTEN, so establish our pointer. + * + * We set our pointer to the global tail pointer and then move it forward + * over already-committed notifications. This ensures we cannot miss any + * not-yet-committed notifications. We might get a few more but that + * doesn't hurt. + */ + LWLockAcquire(AsyncQueueLock, LW_SHARED); + QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL; + QUEUE_BACKEND_PID(MyBackendId) = MyProcPid; + LWLockRelease(AsyncQueueLock); + + /* + * Try to move our pointer forward as far as possible. This will skip over + * already-committed notifications. Still, we could get notifications that + * have already committed before we started to LISTEN. + * + * Note that we are not yet listening on anything, so we won't deliver + * any notification to the frontend. + * + * This will also advance the global tail pointer if possible. + */ + asyncQueueReadAllNotifications(); +} + +/* + * Exec_ListenCommit --- subroutine for AtCommit_Notify + * + * Add the channel to the list of channels we are listening on. + */ +static void +Exec_ListenCommit(const char *channel) +{ + MemoryContext oldcontext; + + /* Do nothing if we are already listening on this channel */ + if (IsListeningOn(channel)) + return; + + /* + * Add the new channel name to listenChannels. + * + * XXX It is theoretically possible to get an out-of-memory failure here, + * which would be bad because we already committed. For the moment it + * doesn't seem worth trying to guard against that, but maybe improve this + * later. + */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + listenChannels = lappend(listenChannels, pstrdup(channel)); + MemoryContextSwitchTo(oldcontext); +} + +/* + * Exec_UnlistenCommit --- subroutine for AtCommit_Notify + * + * Remove the specified channel name from listenChannels. + */ +static void +Exec_UnlistenCommit(const char *channel) +{ + ListCell *q; + ListCell *prev; + + if (Trace_notify) + elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid); + + prev = NULL; + foreach(q, listenChannels) + { + char *lchan = (char *) lfirst(q); + + if (strcmp(lchan, channel) == 0) + { + listenChannels = list_delete_cell(listenChannels, q, prev); + pfree(lchan); + break; + } + prev = q; + } - /* We must CCI after each action in case of conflicting actions */ - CommandCounterIncrement(); + /* + * We do not complain about unlistening something not being listened; + * should we? + */ + + /* If no longer listening to anything, get out of listener array */ + if (listenChannels == NIL) + asyncQueueUnregister(); +} + +/* + * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify + * + * Unlisten on all channels for this backend. + */ +static void +Exec_UnlistenAllCommit(void) +{ + if (Trace_notify) + elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid); + + list_free_deep(listenChannels); + listenChannels = NIL; + + asyncQueueUnregister(); +} + +/* + * ProcessCompletedNotifies --- send out signals and self-notifies + * + * This is called from postgres.c just before going idle at the completion + * of a transaction. If we issued any notifications in the just-completed + * transaction, send signals to other backends to process them, and also + * process the queue ourselves to send messages to our own frontend. + * + * The reason that this is not done in AtCommit_Notify is that there is + * a nonzero chance of errors here (for example, encoding conversion errors + * while trying to format messages to our frontend). An error during + * AtCommit_Notify would be a PANIC condition. The timing is also arranged + * to ensure that a transaction's self-notifies are delivered to the frontend + * before it gets the terminating ReadyForQuery message. + * + * Note that we send signals and process the queue even if the transaction + * eventually aborted. This is because we need to clean out whatever got + * added to the queue. + * + * NOTE: we are outside of any transaction here. + */ +void +ProcessCompletedNotifies(void) +{ + bool signalled; + + /* Nothing to do if we didn't send any notifications */ + if (!backendHasSentNotifications) + return; + + /* + * We reset the flag immediately; otherwise, if any sort of error + * occurs below, we'd be locked up in an infinite loop, because + * control will come right back here after error cleanup. + */ + backendHasSentNotifications = false; + + if (Trace_notify) + elog(DEBUG1, "ProcessCompletedNotifies"); + + /* + * We must run asyncQueueReadAllNotifications inside a transaction, + * else bad things happen if it gets an error. + */ + StartTransactionCommand(); + + /* Send signals to other backends */ + signalled = SignalBackends(); + + if (listenChannels != NIL) + { + /* Read the queue ourselves, and send relevant stuff to the frontend */ + asyncQueueReadAllNotifications(); + } + else if (!signalled) + { + /* + * If we found no other listening backends, and we aren't listening + * ourselves, then we must execute asyncQueueAdvanceTail to flush + * the queue, because ain't nobody else gonna do it. This prevents + * queue overflow when we're sending useless notifies to nobody. + * (A new listener could have joined since we looked, but if so this + * is harmless.) + */ + asyncQueueAdvanceTail(); } - /* Perform any pending notifies */ - if (pendingNotifies) - Send_Notify(lRel); - - /* - * We do NOT release the lock on pg_listener here; we need to hold it - * until end of transaction (which is about to happen, anyway) to ensure - * that notified backends see our tuple updates when they look. Else they - * might disregard the signal, which would make the application programmer - * very unhappy. Also, this prevents race conditions when we have just - * inserted a listening tuple. - */ - heap_close(lRel, NoLock); - - ClearPendingActionsAndNotifies(); + CommitTransactionCommand(); - if (Trace_notify) - elog(DEBUG1, "AtCommit_Notify: done"); + /* We don't need pq_flush() here since postgres.c will do one shortly */ } /* - * Exec_Listen --- subroutine for AtCommit_Notify + * Test whether we are actively listening on the given channel name. * - * Register the current backend as listening on the specified relation. + * Note: this function is executed for every notification found in the queue. + * Perhaps it is worth further optimization, eg convert the list to a sorted + * array so we can binary-search it. In practice the list is likely to be + * fairly short, though. */ -static void -Exec_Listen(Relation lRel, const char *relname) +static bool +IsListeningOn(const char *channel) { - HeapScanDesc scan; - HeapTuple tuple; - Datum values[Natts_pg_listener]; - bool nulls[Natts_pg_listener]; - NameData condname; - bool alreadyListener = false; - - if (Trace_notify) - elog(DEBUG1, "Exec_Listen(%s,%d)", relname, MyProcPid); + ListCell *p; - /* Detect whether we are already listening on this relname */ - scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); - while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + foreach(p, listenChannels) { - Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple); + char *lchan = (char *) lfirst(p); - if (listener->listenerpid == MyProcPid && - strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0) - { - alreadyListener = true; - /* No need to scan the rest of the table */ - break; - } + if (strcmp(lchan, channel) == 0) + return true; } - heap_endscan(scan); + return false; +} - if (alreadyListener) - return; +/* + * Remove our entry from the listeners array when we are no longer listening + * on any channel. NB: must not fail if we're already not listening. + */ +static void +asyncQueueUnregister(void) +{ + bool advanceTail; - /* - * OK to insert a new tuple - */ - memset(nulls, false, sizeof(nulls)); + Assert(listenChannels == NIL); /* else caller error */ - namestrcpy(&condname, relname); - values[Anum_pg_listener_relname - 1] = NameGetDatum(&condname); - values[Anum_pg_listener_listenerpid - 1] = Int32GetDatum(MyProcPid); - values[Anum_pg_listener_notification - 1] = Int32GetDatum(0); /* no notifies pending */ + LWLockAcquire(AsyncQueueLock, LW_SHARED); + /* check if entry is valid and oldest ... */ + advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) && + QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL); + /* ... then mark it invalid */ + QUEUE_BACKEND_PID(MyBackendId) = InvalidPid; + LWLockRelease(AsyncQueueLock); + + /* If we were the laziest backend, try to advance the tail pointer */ + if (advanceTail) + asyncQueueAdvanceTail(); +} - tuple = heap_form_tuple(RelationGetDescr(lRel), values, nulls); +/* + * Test whether there is room to insert more notification messages. + * + * Caller must hold at least shared AsyncQueueLock. + */ +static bool +asyncQueueIsFull(void) +{ + int nexthead; + int boundary; - simple_heap_insert(lRel, tuple); + /* + * The queue is full if creating a new head page would create a page that + * logically precedes the current global tail pointer, ie, the head + * pointer would wrap around compared to the tail. We cannot create such + * a head page for fear of confusing slru.c. For safety we round the tail + * pointer back to a segment boundary (compare the truncation logic in + * asyncQueueAdvanceTail). + * + * Note that this test is *not* dependent on how much space there is on + * the current head page. This is necessary because asyncQueueAddEntries + * might try to create the next head page in any case. + */ + nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1; + if (nexthead > QUEUE_MAX_PAGE) + nexthead = 0; /* wrap around */ + boundary = QUEUE_POS_PAGE(QUEUE_TAIL); + boundary -= boundary % SLRU_PAGES_PER_SEGMENT; + return asyncQueuePagePrecedesLogically(nexthead, boundary); +} -#ifdef NOT_USED /* currently there are no indexes */ - CatalogUpdateIndexes(lRel, tuple); -#endif +/* + * Advance the QueuePosition to the next entry, assuming that the current + * entry is of length entryLength. If we jump to a new page the function + * returns true, else false. + */ +static bool +asyncQueueAdvance(QueuePosition *position, int entryLength) +{ + int pageno = QUEUE_POS_PAGE(*position); + int offset = QUEUE_POS_OFFSET(*position); + bool pageJump = false; - heap_freetuple(tuple); + /* + * Move to the next writing position: First jump over what we have just + * written or read. + */ + offset += entryLength; + Assert(offset <= QUEUE_PAGESIZE); /* - * now that we are listening, make sure we will unlisten before dying. + * In a second step check if another entry can possibly be written to the + * page. If so, stay here, we have reached the next position. If not, then + * we need to move on to the next page. */ - if (!unlistenExitRegistered) + if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE) { - on_shmem_exit(Async_UnlistenOnExit, 0); - unlistenExitRegistered = true; + pageno++; + if (pageno > QUEUE_MAX_PAGE) + pageno = 0; /* wrap around */ + offset = 0; + pageJump = true; } + + SET_QUEUE_POS(*position, pageno, offset); + return pageJump; } /* - * Exec_Unlisten --- subroutine for AtCommit_Notify - * - * Remove the current backend from the list of listening backends - * for the specified relation. + * Fill the AsyncQueueEntry at *qe with an outbound notification message. */ static void -Exec_Unlisten(Relation lRel, const char *relname) +asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) { - HeapScanDesc scan; - HeapTuple tuple; + size_t channellen = strlen(n->channel); + size_t payloadlen = strlen(n->payload); + int entryLength; + + Assert(channellen < NAMEDATALEN); + Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH); + + /* The terminators are already included in AsyncQueueEntryEmptySize */ + entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen; + entryLength = QUEUEALIGN(entryLength); + qe->length = entryLength; + qe->dboid = MyDatabaseId; + qe->xid = GetCurrentTransactionId(); + qe->srcPid = MyProcPid; + memcpy(qe->data, n->channel, channellen + 1); + memcpy(qe->data + channellen + 1, n->payload, payloadlen + 1); +} - if (Trace_notify) - elog(DEBUG1, "Exec_Unlisten(%s,%d)", relname, MyProcPid); +/* + * Add pending notifications to the queue. + * + * We go page by page here, i.e. we stop once we have to go to a new page but + * we will be called again and then fill that next page. If an entry does not + * fit into the current page, we write a dummy entry with an InvalidOid as the + * database OID in order to fill the page. So every page is always used up to + * the last byte which simplifies reading the page later. + * + * We are passed the list cell containing the next notification to write + * and return the first still-unwritten cell back. Eventually we will return + * NULL indicating all is done. + * + * We are holding AsyncQueueLock already from the caller and grab AsyncCtlLock + * locally in this function. + */ +static ListCell * +asyncQueueAddEntries(ListCell *nextNotify) +{ + AsyncQueueEntry qe; + int pageno; + int offset; + int slotno; - scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); - while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + /* We hold both AsyncQueueLock and AsyncCtlLock during this operation */ + LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE); + + /* Fetch the current page */ + pageno = QUEUE_POS_PAGE(QUEUE_HEAD); + slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId); + /* Note we mark the page dirty before writing in it */ + AsyncCtl->shared->page_dirty[slotno] = true; + + while (nextNotify != NULL) { - Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple); + Notification *n = (Notification *) lfirst(nextNotify); + + /* Construct a valid queue entry in local variable qe */ + asyncQueueNotificationToEntry(n, &qe); + + offset = QUEUE_POS_OFFSET(QUEUE_HEAD); - if (listener->listenerpid == MyProcPid && - strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0) + /* Check whether the entry really fits on the current page */ + if (offset + qe.length <= QUEUE_PAGESIZE) { - /* Found the matching tuple, delete it */ - simple_heap_delete(lRel, &tuple->t_self); + /* OK, so advance nextNotify past this item */ + nextNotify = lnext(nextNotify); + } + else + { + /* + * Write a dummy entry to fill up the page. Actually readers will + * only check dboid and since it won't match any reader's database + * OID, they will ignore this entry and move on. + */ + qe.length = QUEUE_PAGESIZE - offset; + qe.dboid = InvalidOid; + qe.data[0] = '\0'; /* empty channel */ + qe.data[1] = '\0'; /* empty payload */ + } + /* Now copy qe into the shared buffer page */ + memcpy(AsyncCtl->shared->page_buffer[slotno] + offset, + &qe, + qe.length); + + /* Advance QUEUE_HEAD appropriately, and note if page is full */ + if (asyncQueueAdvance(&(QUEUE_HEAD), qe.length)) + { /* - * We assume there can be only one match, so no need to scan the - * rest of the table + * Page is full, so we're done here, but first fill the next + * page with zeroes. The reason to do this is to ensure that + * slru.c's idea of the head page is always the same as ours, + * which avoids boundary problems in SimpleLruTruncate. The + * test in asyncQueueIsFull() ensured that there is room to + * create this page without overrunning the queue. */ + slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD)); + /* And exit the loop */ break; } } - heap_endscan(scan); - /* - * We do not complain about unlistening something not being listened; - * should we? - */ + LWLockRelease(AsyncCtlLock); + + return nextNotify; } /* - * Exec_UnlistenAll --- subroutine for AtCommit_Notify + * Check whether the queue is at least half full, and emit a warning if so. + * + * This is unlikely given the size of the queue, but possible. + * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL. * - * Update pg_listener to unlisten all relations for this backend. + * Caller must hold exclusive AsyncQueueLock. */ static void -Exec_UnlistenAll(Relation lRel) +asyncQueueFillWarning(void) { - HeapScanDesc scan; - HeapTuple lTuple; - ScanKeyData key[1]; + int headPage = QUEUE_POS_PAGE(QUEUE_HEAD); + int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL); + int occupied; + double fillDegree; + TimestampTz t; + + occupied = headPage - tailPage; + + if (occupied == 0) + return; /* fast exit for common case */ + + if (occupied < 0) + { + /* head has wrapped around, tail not yet */ + occupied += QUEUE_MAX_PAGE+1; + } - if (Trace_notify) - elog(DEBUG1, "Exec_UnlistenAll"); + fillDegree = (double) occupied / (double) ((QUEUE_MAX_PAGE+1)/2); + + if (fillDegree < 0.5) + return; + + t = GetCurrentTimestamp(); - /* Find and delete all entries with my listenerPID */ - ScanKeyInit(&key[0], - Anum_pg_listener_listenerpid, - BTEqualStrategyNumber, F_INT4EQ, - Int32GetDatum(MyProcPid)); - scan = heap_beginscan(lRel, SnapshotNow, 1, key); + if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn, + t, QUEUE_FULL_WARN_INTERVAL)) + { + QueuePosition min = QUEUE_HEAD; + int32 minPid = InvalidPid; + int i; + + for (i = 1; i <= MaxBackends; i++) + { + if (QUEUE_BACKEND_PID(i) != InvalidPid) + { + min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); + if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i))) + minPid = QUEUE_BACKEND_PID(i); + } + } - while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) - simple_heap_delete(lRel, &lTuple->t_self); + ereport(WARNING, + (errmsg("pg_notify queue is %.0f%% full", fillDegree * 100), + (minPid != InvalidPid ? + errdetail("PID %d is among the slowest backends.", minPid) + : 0), + (minPid != InvalidPid ? + errhint("Cleanup can only proceed if this backend ends its current transaction.") + : 0))); - heap_endscan(scan); + asyncQueueControl->lastQueueFillWarn = t; + } } /* - * Send_Notify --- subroutine for AtCommit_Notify + * Send signals to all listening backends (except our own). + * + * Returns true if we sent at least one signal. * - * Scan pg_listener for tuples matching our pending notifies, and - * either signal the other backend or send a message to our own frontend. + * Since we need EXCLUSIVE lock anyway we also check the position of the other + * backends and in case one is already up-to-date we don't signal it. + * This can happen if concurrent notifying transactions have sent a signal and + * the signaled backend has read the other notifications and ours in the same + * step. + * + * Since we know the BackendId and the Pid the signalling is quite cheap. */ -static void -Send_Notify(Relation lRel) +static bool +SignalBackends(void) { - TupleDesc tdesc = RelationGetDescr(lRel); - HeapScanDesc scan; - HeapTuple lTuple, - rTuple; - Datum value[Natts_pg_listener]; - bool repl[Natts_pg_listener], - nulls[Natts_pg_listener]; - - /* preset data to update notify column to MyProcPid */ - memset(nulls, false, sizeof(nulls)); - memset(repl, false, sizeof(repl)); - repl[Anum_pg_listener_notification - 1] = true; - memset(value, 0, sizeof(value)); - value[Anum_pg_listener_notification - 1] = Int32GetDatum(MyProcPid); - - scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); - - while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) - { - Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple); - char *relname = NameStr(listener->relname); - int32 listenerPID = listener->listenerpid; + bool signalled = false; + int32 *pids; + BackendId *ids; + int count; + int i; + int32 pid; - if (!AsyncExistsPendingNotify(relname)) - continue; - - if (listenerPID == MyProcPid) - { - /* - * Self-notify: no need to bother with table update. Indeed, we - * *must not* clear the notification field in this path, or we - * could lose an outside notify, which'd be bad for applications - * that ignore self-notify messages. - */ - if (Trace_notify) - elog(DEBUG1, "AtCommit_Notify: notifying self"); + /* + * Identify all backends that are listening and not already up-to-date. + * We don't want to send signals while holding the AsyncQueueLock, so + * we just build a list of target PIDs. + * + * XXX in principle these pallocs could fail, which would be bad. + * Maybe preallocate the arrays? But in practice this is only run + * in trivial transactions, so there should surely be space available. + */ + pids = (int32 *) palloc(MaxBackends * sizeof(int32)); + ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId)); + count = 0; - NotifyMyFrontEnd(relname, listenerPID); - } - else + LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); + for (i = 1; i <= MaxBackends; i++) + { + pid = QUEUE_BACKEND_PID(i); + if (pid != InvalidPid && pid != MyProcPid) { - if (Trace_notify) - elog(DEBUG1, "AtCommit_Notify: notifying pid %d", - listenerPID); + QueuePosition pos = QUEUE_BACKEND_POS(i); - /* - * If someone has already notified this listener, we don't bother - * modifying the table, but we do still send a NOTIFY_INTERRUPT - * signal, just in case that backend missed the earlier signal for - * some reason. It's OK to send the signal first, because the - * other guy can't read pg_listener until we unlock it. - * - * Note: we don't have the other guy's BackendId available, so - * this will incur a search of the ProcSignal table. That's - * probably not worth worrying about. - */ - if (SendProcSignal(listenerPID, PROCSIG_NOTIFY_INTERRUPT, - InvalidBackendId) < 0) - { - /* - * Get rid of pg_listener entry if it refers to a PID that no - * longer exists. Presumably, that backend crashed without - * deleting its pg_listener entries. This code used to only - * delete the entry if errno==ESRCH, but as far as I can see - * we should just do it for any failure (certainly at least - * for EPERM too...) - */ - simple_heap_delete(lRel, &lTuple->t_self); - } - else if (listener->notification == 0) + if (!QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) { - /* Rewrite the tuple with my PID in notification column */ - rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl); - simple_heap_update(lRel, &lTuple->t_self, rTuple); - -#ifdef NOT_USED /* currently there are no indexes */ - CatalogUpdateIndexes(lRel, rTuple); -#endif + pids[count] = pid; + ids[count] = i; + count++; } } } + LWLockRelease(AsyncQueueLock); - heap_endscan(scan); + /* Now send signals */ + for (i = 0; i < count; i++) + { + pid = pids[i]; + + /* + * Note: assuming things aren't broken, a signal failure here could + * only occur if the target backend exited since we released + * AsyncQueueLock; which is unlikely but certainly possible. + * So we just log a low-level debug message if it happens. + */ + if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0) + elog(DEBUG3, "could not signal backend with PID %d: %m", pid); + else + signalled = true; + } + + pfree(pids); + pfree(ids); + + return signalled; } /* * AtAbort_Notify * - * This is called at transaction abort. + * This is called at transaction abort. * - * Gets rid of pending actions and outbound notifies that we would have - * executed if the transaction got committed. + * Gets rid of pending actions and outbound notifies that we would have + * executed if the transaction got committed. */ void AtAbort_Notify(void) { + /* + * If we LISTEN but then roll back the transaction we have set our pointer + * but have not made any entry in listenChannels. In that case, remove + * our pointer again. + */ + if (backendHasExecutedInitialListen) + { + /* + * Checking listenChannels should be redundant but it can't hurt doing + * it for safety reasons. + */ + if (listenChannels == NIL) + asyncQueueUnregister(); + + backendHasExecutedInitialListen = false; + } + + /* And clean up */ ClearPendingActionsAndNotifies(); } @@ -939,31 +1785,299 @@ DisableNotifyInterrupt(void) return result; } +/* + * Read all pending notifications from the queue, and deliver appropriate + * ones to my frontend. Stop when we reach queue head or an uncommitted + * notification. + */ +static void +asyncQueueReadAllNotifications(void) +{ + QueuePosition pos; + QueuePosition oldpos; + QueuePosition head; + bool advanceTail; + /* page_buffer must be adequately aligned, so use a union */ + union { + char buf[QUEUE_PAGESIZE]; + AsyncQueueEntry align; + } page_buffer; + + /* Fetch current state */ + LWLockAcquire(AsyncQueueLock, LW_SHARED); + /* Assert checks that we have a valid state entry */ + Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId)); + pos = oldpos = QUEUE_BACKEND_POS(MyBackendId); + head = QUEUE_HEAD; + LWLockRelease(AsyncQueueLock); + + if (QUEUE_POS_EQUAL(pos, head)) + { + /* Nothing to do, we have read all notifications already. */ + return; + } + + /*---------- + * Note that we deliver everything that we see in the queue and that + * matches our _current_ listening state. + * Especially we do not take into account different commit times. + * Consider the following example: + * + * Backend 1: Backend 2: + * + * transaction starts + * NOTIFY foo; + * commit starts + * transaction starts + * LISTEN foo; + * commit starts + * commit to clog + * commit to clog + * + * It could happen that backend 2 sees the notification from backend 1 in + * the queue. Even though the notifying transaction committed before + * the listening transaction, we still deliver the notification. + * + * The idea is that an additional notification does not do any harm, we + * just need to make sure that we do not miss a notification. + * + * It is possible that we fail while trying to send a message to our + * frontend (for example, because of encoding conversion failure). + * If that happens it is critical that we not try to send the same + * message over and over again. Therefore, we place a PG_TRY block + * here that will forcibly advance our backend position before we lose + * control to an error. (We could alternatively retake AsyncQueueLock + * and move the position before handling each individual message, but + * that seems like too much lock traffic.) + *---------- + */ + PG_TRY(); + { + bool reachedStop; + + do + { + int curpage = QUEUE_POS_PAGE(pos); + int curoffset = QUEUE_POS_OFFSET(pos); + int slotno; + int copysize; + + /* + * We copy the data from SLRU into a local buffer, so as to avoid + * holding the AsyncCtlLock while we are examining the entries and + * possibly transmitting them to our frontend. Copy only the part + * of the page we will actually inspect. + */ + slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, curpage, + InvalidTransactionId); + if (curpage == QUEUE_POS_PAGE(head)) + { + /* we only want to read as far as head */ + copysize = QUEUE_POS_OFFSET(head) - curoffset; + if (copysize < 0) + copysize = 0; /* just for safety */ + } + else + { + /* fetch all the rest of the page */ + copysize = QUEUE_PAGESIZE - curoffset; + } + memcpy(page_buffer.buf + curoffset, + AsyncCtl->shared->page_buffer[slotno] + curoffset, + copysize); + /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ + LWLockRelease(AsyncCtlLock); + + /* + * Process messages up to the stop position, end of page, or an + * uncommitted message. + * + * Our stop position is what we found to be the head's position + * when we entered this function. It might have changed + * already. But if it has, we will receive (or have already + * received and queued) another signal and come here again. + * + * We are not holding AsyncQueueLock here! The queue can only + * extend beyond the head pointer (see above) and we leave our + * backend's pointer where it is so nobody will truncate or + * rewrite pages under us. Especially we don't want to hold a lock + * while sending the notifications to the frontend. + */ + reachedStop = asyncQueueProcessPageEntries(&pos, head, + page_buffer.buf); + } while (!reachedStop); + } + PG_CATCH(); + { + /* Update shared state */ + LWLockAcquire(AsyncQueueLock, LW_SHARED); + QUEUE_BACKEND_POS(MyBackendId) = pos; + advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL); + LWLockRelease(AsyncQueueLock); + + /* If we were the laziest backend, try to advance the tail pointer */ + if (advanceTail) + asyncQueueAdvanceTail(); + + PG_RE_THROW(); + } + PG_END_TRY(); + + /* Update shared state */ + LWLockAcquire(AsyncQueueLock, LW_SHARED); + QUEUE_BACKEND_POS(MyBackendId) = pos; + advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL); + LWLockRelease(AsyncQueueLock); + + /* If we were the laziest backend, try to advance the tail pointer */ + if (advanceTail) + asyncQueueAdvanceTail(); +} + +/* + * Fetch notifications from the shared queue, beginning at position current, + * and deliver relevant ones to my frontend. + * + * The current page must have been fetched into page_buffer from shared + * memory. (We could access the page right in shared memory, but that + * would imply holding the AsyncCtlLock throughout this routine.) + * + * We stop if we reach the "stop" position, or reach a notification from an + * uncommitted transaction, or reach the end of the page. + * + * The function returns true once we have reached the stop position or an + * uncommitted notification, and false if we have finished with the page. + * In other words: once it returns true there is no need to look further. + */ +static bool +asyncQueueProcessPageEntries(QueuePosition *current, + QueuePosition stop, + char *page_buffer) +{ + bool reachedStop = false; + bool reachedEndOfPage; + AsyncQueueEntry *qe; + + do + { + if (QUEUE_POS_EQUAL(*current, stop)) + break; + + qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(*current)); + + /* + * Advance *current over this message, possibly to the next page. + * As noted in the comments for asyncQueueReadAllNotifications, we + * must do this before possibly failing while processing the message. + */ + reachedEndOfPage = asyncQueueAdvance(current, qe->length); + + /* Ignore messages destined for other databases */ + if (qe->dboid == MyDatabaseId) + { + if (TransactionIdDidCommit(qe->xid)) + { + /* qe->data is the null-terminated channel name */ + char *channel = qe->data; + + if (IsListeningOn(channel)) + { + /* payload follows channel name */ + char *payload = qe->data + strlen(channel) + 1; + + NotifyMyFrontEnd(channel, payload, qe->srcPid); + } + } + else if (TransactionIdDidAbort(qe->xid)) + { + /* + * If the source transaction aborted, we just ignore its + * notifications. + */ + } + else + { + /* + * The transaction has neither committed nor aborted so far, + * so we can't process its message yet. Break out of the loop. + */ + reachedStop = true; + break; + } + } + + /* Loop back if we're not at end of page */ + } while (!reachedEndOfPage); + + if (QUEUE_POS_EQUAL(*current, stop)) + reachedStop = true; + + return reachedStop; +} + +/* + * Advance the shared queue tail variable to the minimum of all the + * per-backend tail pointers. Truncate pg_notify space if possible. + */ +static void +asyncQueueAdvanceTail(void) +{ + QueuePosition min; + int i; + int oldtailpage; + int newtailpage; + int boundary; + + LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); + min = QUEUE_HEAD; + for (i = 1; i <= MaxBackends; i++) + { + if (QUEUE_BACKEND_PID(i) != InvalidPid) + min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); + } + oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL); + QUEUE_TAIL = min; + LWLockRelease(AsyncQueueLock); + + /* + * We can truncate something if the global tail advanced across an SLRU + * segment boundary. + * + * XXX it might be better to truncate only once every several segments, + * to reduce the number of directory scans. + */ + newtailpage = QUEUE_POS_PAGE(min); + boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT); + if (asyncQueuePagePrecedesLogically(oldtailpage, boundary)) + { + /* + * SimpleLruTruncate() will ask for AsyncCtlLock but will also + * release the lock again. + */ + SimpleLruTruncate(AsyncCtl, newtailpage); + } +} + /* * ProcessIncomingNotify * * Deal with arriving NOTIFYs from other backends. * This is called either directly from the PROCSIG_NOTIFY_INTERRUPT * signal handler, or the next time control reaches the outer idle loop. - * Scan pg_listener for arriving notifies, report them to my front end, - * and clear the notification field in pg_listener until next time. + * Scan the queue for arriving notifications and report them to my front + * end. * * NOTE: since we are outside any transaction, we must create our own. */ static void ProcessIncomingNotify(void) { - Relation lRel; - TupleDesc tdesc; - ScanKeyData key[1]; - HeapScanDesc scan; - HeapTuple lTuple, - rTuple; - Datum value[Natts_pg_listener]; - bool repl[Natts_pg_listener], - nulls[Natts_pg_listener]; bool catchup_enabled; + /* Do nothing if we aren't actively listening */ + if (listenChannels == NIL) + return; + /* Must prevent catchup interrupt while I am running */ catchup_enabled = DisableCatchupInterrupt(); @@ -974,62 +2088,13 @@ ProcessIncomingNotify(void) notifyInterruptOccurred = 0; - StartTransactionCommand(); - - lRel = heap_open(ListenerRelationId, ExclusiveLock); - tdesc = RelationGetDescr(lRel); - - /* Scan only entries with my listenerPID */ - ScanKeyInit(&key[0], - Anum_pg_listener_listenerpid, - BTEqualStrategyNumber, F_INT4EQ, - Int32GetDatum(MyProcPid)); - scan = heap_beginscan(lRel, SnapshotNow, 1, key); - - /* Prepare data for rewriting 0 into notification field */ - memset(nulls, false, sizeof(nulls)); - memset(repl, false, sizeof(repl)); - repl[Anum_pg_listener_notification - 1] = true; - memset(value, 0, sizeof(value)); - value[Anum_pg_listener_notification - 1] = Int32GetDatum(0); - - while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) - { - Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple); - char *relname = NameStr(listener->relname); - int32 sourcePID = listener->notification; - - if (sourcePID != 0) - { - /* Notify the frontend */ - - if (Trace_notify) - elog(DEBUG1, "ProcessIncomingNotify: received %s from %d", - relname, (int) sourcePID); - - NotifyMyFrontEnd(relname, sourcePID); - - /* - * Rewrite the tuple with 0 in notification column. - */ - rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl); - simple_heap_update(lRel, &lTuple->t_self, rTuple); - -#ifdef NOT_USED /* currently there are no indexes */ - CatalogUpdateIndexes(lRel, rTuple); -#endif - } - } - heap_endscan(scan); - /* - * We do NOT release the lock on pg_listener here; we need to hold it - * until end of transaction (which is about to happen, anyway) to ensure - * that other backends see our tuple updates when they look. Otherwise, a - * transaction started after this one might mistakenly think it doesn't - * need to send this backend a new NOTIFY. + * We must run asyncQueueReadAllNotifications inside a transaction, + * else bad things happen if it gets an error. */ - heap_close(lRel, NoLock); + StartTransactionCommand(); + + asyncQueueReadAllNotifications(); CommitTransactionCommand(); @@ -1051,20 +2116,17 @@ ProcessIncomingNotify(void) * Send NOTIFY message to my front end. */ static void -NotifyMyFrontEnd(char *relname, int32 listenerPID) +NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid) { if (whereToSendOutput == DestRemote) { StringInfoData buf; pq_beginmessage(&buf, 'A'); - pq_sendint(&buf, listenerPID, sizeof(int32)); - pq_sendstring(&buf, relname); + pq_sendint(&buf, srcPid, sizeof(int32)); + pq_sendstring(&buf, channel); if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) - { - /* XXX Add parameter string here later */ - pq_sendstring(&buf, ""); - } + pq_sendstring(&buf, payload); pq_endmessage(&buf); /* @@ -1074,20 +2136,51 @@ NotifyMyFrontEnd(char *relname, int32 listenerPID) */ } else - elog(INFO, "NOTIFY for %s", relname); + elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload); } -/* Does pendingNotifies include the given relname? */ +/* Does pendingNotifies include the given channel/payload? */ static bool -AsyncExistsPendingNotify(const char *relname) +AsyncExistsPendingNotify(const char *channel, const char *payload) { ListCell *p; + Notification *n; + + if (pendingNotifies == NIL) + return false; + + if (payload == NULL) + payload = ""; + + /*---------- + * We need to append new elements to the end of the list in order to keep + * the order. However, on the other hand we'd like to check the list + * backwards in order to make duplicate-elimination a tad faster when the + * same condition is signaled many times in a row. So as a compromise we + * check the tail element first which we can access directly. If this + * doesn't match, we check the whole list. + * + * As we are not checking our parents' lists, we can still get duplicates + * in combination with subtransactions, like in: + * + * begin; + * notify foo '1'; + * savepoint foo; + * notify foo '1'; + * commit; + *---------- + */ + n = (Notification *) llast(pendingNotifies); + if (strcmp(n->channel, channel) == 0 && + strcmp(n->payload, payload) == 0) + return true; foreach(p, pendingNotifies) { - const char *prelname = (const char *) lfirst(p); + n = (Notification *) lfirst(p); - if (strcmp(prelname, relname) == 0) + if (strcmp(n->channel, channel) == 0 && + strcmp(n->payload, payload) == 0) return true; } @@ -1108,21 +2201,3 @@ ClearPendingActionsAndNotifies(void) pendingActions = NIL; pendingNotifies = NIL; } - -/* - * 2PC processing routine for COMMIT PREPARED case. - * - * (We don't have to do anything for ROLLBACK PREPARED.) - */ -void -notify_twophase_postcommit(TransactionId xid, uint16 info, - void *recdata, uint32 len) -{ - /* - * Set up to issue the NOTIFY at the end of my own current transaction. - * (XXX this has some issues if my own transaction later rolls back, or if - * there is any significant delay before I commit. OK for now because we - * disallow COMMIT PREPARED inside a transaction block.) - */ - Async_Notify((char *) recdata); -} diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 271a7a2129..371d1f245e 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -15,7 +15,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.461 2010/02/12 17:33:20 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.462 2010/02/16 22:34:43 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -2777,6 +2777,7 @@ _copyNotifyStmt(NotifyStmt *from) NotifyStmt *newnode = makeNode(NotifyStmt); COPY_STRING_FIELD(conditionname); + COPY_STRING_FIELD(payload); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 001c2096b4..7dfc1969f5 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -22,7 +22,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/nodes/equalfuncs.c,v 1.382 2010/02/12 17:33:20 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/nodes/equalfuncs.c,v 1.383 2010/02/16 22:34:43 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -1325,6 +1325,7 @@ static bool _equalNotifyStmt(NotifyStmt *a, NotifyStmt *b) { COMPARE_STRING_FIELD(conditionname); + COMPARE_STRING_FIELD(payload); return true; } diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 593fe397d3..878e4b3b94 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/nodes/outfuncs.c,v 1.382 2010/02/12 17:33:20 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/nodes/outfuncs.c,v 1.383 2010/02/16 22:34:43 tgl Exp $ * * NOTES * Every node type that can appear in stored rules' parsetrees *must* @@ -1820,6 +1820,7 @@ _outNotifyStmt(StringInfo str, NotifyStmt *node) WRITE_NODE_TYPE("NOTIFY"); WRITE_STRING_FIELD(conditionname); + WRITE_STRING_FIELD(payload); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 35ba22203f..f28191d2d0 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/nodes/readfuncs.c,v 1.231 2010/02/12 17:33:20 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/nodes/readfuncs.c,v 1.232 2010/02/16 22:34:43 tgl Exp $ * * NOTES * Path and Plan nodes do not have any readfuncs support, because we @@ -231,6 +231,7 @@ _readNotifyStmt(void) READ_LOCALS(NotifyStmt); READ_STRING_FIELD(conditionname); + READ_STRING_FIELD(payload); READ_DONE(); } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index da70ee089c..235a7001ad 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -11,7 +11,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.708 2010/02/12 17:33:20 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.709 2010/02/16 22:34:49 tgl Exp $ * * HISTORY * AUTHOR DATE MAJOR EVENT @@ -400,7 +400,7 @@ static TypeName *TableFuncTypeName(List *columns); %type Iconst SignedIconst %type Iconst_list -%type Sconst comment_text +%type Sconst comment_text notify_payload %type RoleId opt_granted_by opt_boolean ColId_or_Sconst %type var_list %type ColId ColLabel var_name type_function_name param_name @@ -6123,14 +6123,20 @@ DropRuleStmt: * *****************************************************************************/ -NotifyStmt: NOTIFY ColId +NotifyStmt: NOTIFY ColId notify_payload { NotifyStmt *n = makeNode(NotifyStmt); n->conditionname = $2; + n->payload = $3; $$ = (Node *)n; } ; +notify_payload: + ',' Sconst { $$ = $2; } + | /*EMPTY*/ { $$ = NULL; } + ; + ListenStmt: LISTEN ColId { ListenStmt *n = makeNode(ListenStmt); diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 01a9fabc8c..b4923d278a 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/ipc/ipci.c,v 1.103 2010/01/15 09:19:03 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/storage/ipc/ipci.c,v 1.104 2010/02/16 22:34:50 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -20,6 +20,7 @@ #include "access/nbtree.h" #include "access/subtrans.h" #include "access/twophase.h" +#include "commands/async.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -122,6 +123,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, WalRcvShmemSize()); size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); + size = add_size(size, AsyncShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -225,6 +227,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) */ BTreeShmemInit(); SyncScanShmemInit(); + AsyncShmemInit(); #ifdef EXEC_BACKEND diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 4e635d1e8a..7a6cab968b 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -15,7 +15,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/lmgr/lwlock.c,v 1.55 2010/01/02 16:57:52 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/storage/lmgr/lwlock.c,v 1.56 2010/02/16 22:34:50 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -24,6 +24,7 @@ #include "access/clog.h" #include "access/multixact.h" #include "access/subtrans.h" +#include "commands/async.h" #include "miscadmin.h" #include "pg_trace.h" #include "storage/ipc.h" @@ -174,6 +175,9 @@ NumLWLocks(void) /* multixact.c needs two SLRU areas */ numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS; + /* async.c needs one per Async buffer */ + numLocks += NUM_ASYNC_BUFFERS; + /* * Add any requested by loadable modules; for backwards-compatibility * reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 15d0808ad2..2ae15d5ce0 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/tcop/postgres.c,v 1.589 2010/02/16 20:15:14 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/tcop/postgres.c,v 1.590 2010/02/16 22:34:50 tgl Exp $ * * NOTES * this is the "main" module of the postgres backend and @@ -3779,7 +3779,8 @@ PostgresMain(int argc, char *argv[], const char *username) * collector, and to update the PS stats display. We avoid doing * those every time through the message loop because it'd slow down * processing of batched messages, and because we don't want to report - * uncommitted updates (that confuses autovacuum). + * uncommitted updates (that confuses autovacuum). The notification + * processor wants a call too, if we are not in a transaction block. */ if (send_ready_for_query) { @@ -3795,6 +3796,7 @@ PostgresMain(int argc, char *argv[], const char *username) } else { + ProcessCompletedNotifies(); pgstat_report_stat(false); set_ps_display("idle", false); diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 4879970632..07f4d0c57a 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/tcop/utility.c,v 1.332 2010/02/14 18:42:15 rhaas Exp $ + * $PostgreSQL: pgsql/src/backend/tcop/utility.c,v 1.333 2010/02/16 22:34:50 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -926,17 +926,17 @@ standard_ProcessUtility(Node *parsetree, case T_NotifyStmt: { NotifyStmt *stmt = (NotifyStmt *) parsetree; - PreventCommandDuringRecovery(); - Async_Notify(stmt->conditionname); + PreventCommandDuringRecovery(); + Async_Notify(stmt->conditionname, stmt->payload); } break; case T_ListenStmt: { ListenStmt *stmt = (ListenStmt *) parsetree; - PreventCommandDuringRecovery(); + PreventCommandDuringRecovery(); CheckRestrictedOperation("LISTEN"); Async_Listen(stmt->conditionname); } @@ -945,8 +945,8 @@ standard_ProcessUtility(Node *parsetree, case T_UnlistenStmt: { UnlistenStmt *stmt = (UnlistenStmt *) parsetree; - PreventCommandDuringRecovery(); + PreventCommandDuringRecovery(); CheckRestrictedOperation("UNLISTEN"); if (stmt->conditionname) Async_Unlisten(stmt->conditionname); @@ -1105,8 +1105,8 @@ standard_ProcessUtility(Node *parsetree, case T_ReindexStmt: { ReindexStmt *stmt = (ReindexStmt *) parsetree; - PreventCommandDuringRecovery(); + PreventCommandDuringRecovery(); switch (stmt->kind) { case OBJECT_INDEX: diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index b2127a3129..294c7c904b 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/adt/ruleutils.c,v 1.322 2010/02/14 18:42:16 rhaas Exp $ + * $PostgreSQL: pgsql/src/backend/utils/adt/ruleutils.c,v 1.323 2010/02/16 22:34:50 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -3465,6 +3465,11 @@ get_utility_query_def(Query *query, deparse_context *context) 0, PRETTYINDENT_STD, 1); appendStringInfo(buf, "NOTIFY %s", quote_identifier(stmt->conditionname)); + if (stmt->payload) + { + appendStringInfoString(buf, ", "); + simple_quote_literal(buf, stmt->payload); + } } else { diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c index 06df8f6af8..733d8ef74d 100644 --- a/src/bin/initdb/initdb.c +++ b/src/bin/initdb/initdb.c @@ -42,7 +42,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * Portions taken from FreeBSD. * - * $PostgreSQL: pgsql/src/bin/initdb/initdb.c,v 1.184 2010/01/26 16:18:12 tgl Exp $ + * $PostgreSQL: pgsql/src/bin/initdb/initdb.c,v 1.185 2010/02/16 22:34:50 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -2458,6 +2458,7 @@ main(int argc, char *argv[]) "pg_xlog", "pg_xlog/archive_status", "pg_clog", + "pg_notify", "pg_subtrans", "pg_twophase", "pg_multixact/members", diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c index 8422bb76ab..9661f0cd80 100644 --- a/src/bin/psql/common.c +++ b/src/bin/psql/common.c @@ -3,7 +3,7 @@ * * Copyright (c) 2000-2010, PostgreSQL Global Development Group * - * $PostgreSQL: pgsql/src/bin/psql/common.c,v 1.143 2010/01/02 16:57:59 momjian Exp $ + * $PostgreSQL: pgsql/src/bin/psql/common.c,v 1.144 2010/02/16 22:34:50 tgl Exp $ */ #include "postgres_fe.h" #include "common.h" @@ -555,8 +555,13 @@ PrintNotifications(void) while ((notify = PQnotifies(pset.db))) { - fprintf(pset.queryFout, _("Asynchronous notification \"%s\" received from server process with PID %d.\n"), - notify->relname, notify->be_pid); + /* for backward compatibility, only show payload if nonempty */ + if (notify->extra[0]) + fprintf(pset.queryFout, _("Asynchronous notification \"%s\" with payload \"%s\" received from server process with PID %d.\n"), + notify->relname, notify->extra, notify->be_pid); + else + fprintf(pset.queryFout, _("Asynchronous notification \"%s\" received from server process with PID %d.\n"), + notify->relname, notify->be_pid); fflush(pset.queryFout); PQfreemem(notify); } diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 19b9b1f3ef..0b10472c12 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -3,7 +3,7 @@ * * Copyright (c) 2000-2010, PostgreSQL Global Development Group * - * $PostgreSQL: pgsql/src/bin/psql/tab-complete.c,v 1.193 2010/02/15 02:55:01 itagaki Exp $ + * $PostgreSQL: pgsql/src/bin/psql/tab-complete.c,v 1.194 2010/02/16 22:34:50 tgl Exp $ */ /*---------------------------------------------------------------------- @@ -1864,7 +1864,7 @@ psql_completion(char *text, int start, int end) /* NOTIFY */ else if (pg_strcasecmp(prev_wd, "NOTIFY") == 0) - COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(relname) FROM pg_catalog.pg_listener WHERE substring(pg_catalog.quote_ident(relname),1,%d)='%s'"); + COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(channel) FROM pg_catalog.pg_listening_channels() AS channel WHERE substring(pg_catalog.quote_ident(channel),1,%d)='%s'"); /* OPTIONS */ else if (pg_strcasecmp(prev_wd, "OPTIONS") == 0) @@ -2105,7 +2105,7 @@ psql_completion(char *text, int start, int end) /* UNLISTEN */ else if (pg_strcasecmp(prev_wd, "UNLISTEN") == 0) - COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(relname) FROM pg_catalog.pg_listener WHERE substring(pg_catalog.quote_ident(relname),1,%d)='%s' UNION SELECT '*'"); + COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(channel) FROM pg_catalog.pg_listening_channels() AS channel WHERE substring(pg_catalog.quote_ident(channel),1,%d)='%s' UNION SELECT '*'"); /* UPDATE */ /* If prev. word is UPDATE suggest a list of tables */ diff --git a/src/include/access/slru.h b/src/include/access/slru.h index 8e820ae72d..4cc40ba5f7 100644 --- a/src/include/access/slru.h +++ b/src/include/access/slru.h @@ -6,7 +6,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/slru.h,v 1.25 2010/01/02 16:58:00 momjian Exp $ + * $PostgreSQL: pgsql/src/include/access/slru.h,v 1.26 2010/02/16 22:34:50 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -17,6 +17,25 @@ #include "storage/lwlock.h" +/* + * Define SLRU segment size. A page is the same BLCKSZ as is used everywhere + * else in Postgres. The segment size can be chosen somewhat arbitrarily; + * we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG + * or 64K transactions for SUBTRANS. + * + * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF, + * page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where + * xxxx is CLOG or SUBTRANS, respectively), and segment numbering at + * 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need + * take no explicit notice of that fact in slru.c, except when comparing + * segment and page numbers in SimpleLruTruncate (see PagePrecedes()). + * + * Note: slru.c currently assumes that segment file names will be four hex + * digits. This sets a lower bound on the segment size (64K transactions + * for 32-bit TransactionIds). + */ +#define SLRU_PAGES_PER_SEGMENT 32 + /* * Page status codes. Note that these do not include the "dirty" bit. * page_dirty can be TRUE only in the VALID or WRITE_IN_PROGRESS states; @@ -55,8 +74,8 @@ typedef struct SlruSharedData /* * Optional array of WAL flush LSNs associated with entries in the SLRU * pages. If not zero/NULL, we must flush WAL before writing pages (true - * for pg_clog, false for multixact and pg_subtrans). group_lsn[] has - * lsn_groups_per_page entries per buffer slot, each containing the + * for pg_clog, false for multixact, pg_subtrans, pg_notify). group_lsn[] + * has lsn_groups_per_page entries per buffer slot, each containing the * highest LSN known for a contiguous group of SLRU entries on that slot's * page. */ @@ -94,7 +113,7 @@ typedef struct SlruCtlData /* * This flag tells whether to fsync writes (true for pg_clog and multixact - * stuff, false for pg_subtrans). + * stuff, false for pg_subtrans and pg_notify). */ bool do_fsync; diff --git a/src/include/access/twophase_rmgr.h b/src/include/access/twophase_rmgr.h index a42d774520..1d4d1cb221 100644 --- a/src/include/access/twophase_rmgr.h +++ b/src/include/access/twophase_rmgr.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/twophase_rmgr.h,v 1.11 2010/01/02 16:58:00 momjian Exp $ + * $PostgreSQL: pgsql/src/include/access/twophase_rmgr.h,v 1.12 2010/02/16 22:34:50 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -23,9 +23,8 @@ typedef uint8 TwoPhaseRmgrId; */ #define TWOPHASE_RM_END_ID 0 #define TWOPHASE_RM_LOCK_ID 1 -#define TWOPHASE_RM_NOTIFY_ID 2 -#define TWOPHASE_RM_PGSTAT_ID 3 -#define TWOPHASE_RM_MULTIXACT_ID 4 +#define TWOPHASE_RM_PGSTAT_ID 2 +#define TWOPHASE_RM_MULTIXACT_ID 3 #define TWOPHASE_RM_MAX_ID TWOPHASE_RM_MULTIXACT_ID extern const TwoPhaseCallback twophase_recover_callbacks[]; diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 9a1d19380c..2d63232b9b 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -37,7 +37,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.584 2010/02/12 17:33:20 tgl Exp $ + * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.585 2010/02/16 22:34:54 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201002121 +#define CATALOG_VERSION_NO 201002161 #endif diff --git a/src/include/catalog/pg_listener.h b/src/include/catalog/pg_listener.h deleted file mode 100644 index 4d1a77d660..0000000000 --- a/src/include/catalog/pg_listener.h +++ /dev/null @@ -1,59 +0,0 @@ -/*------------------------------------------------------------------------- - * - * pg_listener.h - * Asynchronous notification - * - * - * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group - * Portions Copyright (c) 1994, Regents of the University of California - * - * $PostgreSQL: pgsql/src/include/catalog/pg_listener.h,v 1.28 2010/01/05 01:06:56 tgl Exp $ - * - * NOTES - * the genbki.pl script reads this file and generates .bki - * information from the DATA() statements. - * - *------------------------------------------------------------------------- - */ -#ifndef PG_LISTENER_H -#define PG_LISTENER_H - -#include "catalog/genbki.h" - -/* ---------------------------------------------------------------- - * pg_listener definition. - * - * cpp turns this into typedef struct FormData_pg_listener - * ---------------------------------------------------------------- - */ -#define ListenerRelationId 2614 - -CATALOG(pg_listener,2614) BKI_WITHOUT_OIDS -{ - NameData relname; - int4 listenerpid; - int4 notification; -} FormData_pg_listener; - -/* ---------------- - * Form_pg_listener corresponds to a pointer to a tuple with - * the format of pg_listener relation. - * ---------------- - */ -typedef FormData_pg_listener *Form_pg_listener; - -/* ---------------- - * compiler constants for pg_listener - * ---------------- - */ -#define Natts_pg_listener 3 -#define Anum_pg_listener_relname 1 -#define Anum_pg_listener_listenerpid 2 -#define Anum_pg_listener_notification 3 - -/* ---------------- - * initial contents of pg_listener are NOTHING. - * ---------------- - */ - -#endif /* PG_LISTENER_H */ diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 727b13e264..1c87a94a0d 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/catalog/pg_proc.h,v 1.568 2010/02/07 20:48:11 tgl Exp $ + * $PostgreSQL: pgsql/src/include/catalog/pg_proc.h,v 1.569 2010/02/16 22:34:56 tgl Exp $ * * NOTES * The script catalog/genbki.pl reads this file and generates .bki @@ -4131,8 +4131,12 @@ DATA(insert OID = 2599 ( pg_timezone_abbrevs PGNSP PGUID 12 1 1000 0 f f f t t DESCR("get the available time zone abbreviations"); DATA(insert OID = 2856 ( pg_timezone_names PGNSP PGUID 12 1 1000 0 f f f t t s 0 0 2249 "" "{25,25,1186,16}" "{o,o,o,o}" "{name,abbrev,utc_offset,is_dst}" _null_ pg_timezone_names _null_ _null_ _null_ )); DESCR("get the available time zone names"); -DATA(insert OID = 2730 ( pg_get_triggerdef PGNSP PGUID 12 1 0 0 f f f t f s 2 0 25 "26 16" _null_ _null_ _null_ _null_ pg_get_triggerdef_ext _null_ _null_ _null_ )); +DATA(insert OID = 2730 ( pg_get_triggerdef PGNSP PGUID 12 1 0 0 f f f t f s 2 0 25 "26 16" _null_ _null_ _null_ _null_ pg_get_triggerdef_ext _null_ _null_ _null_ )); DESCR("trigger description with pretty-print option"); +DATA(insert OID = 3035 ( pg_listening_channels PGNSP PGUID 12 1 10 0 f f f t t s 0 0 25 "" _null_ _null_ _null_ _null_ pg_listening_channels _null_ _null_ _null_ )); +DESCR("get the channels that the current backend listens to"); +DATA(insert OID = 3036 ( pg_notify PGNSP PGUID 12 1 0 0 f f f f f v 2 0 2278 "25 25" _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_ )); +DESCR("send a notification event"); /* non-persistent series generator */ DATA(insert OID = 1066 ( generate_series PGNSP PGUID 12 1 1000 0 f f f t t i 3 0 23 "23 23 23" _null_ _null_ _null_ _null_ generate_series_step_int4 _null_ _null_ _null_ )); diff --git a/src/include/commands/async.h b/src/include/commands/async.h index 5c9e8ab890..a9e4d42853 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -6,28 +6,44 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/commands/async.h,v 1.39 2010/01/02 16:58:03 momjian Exp $ + * $PostgreSQL: pgsql/src/include/commands/async.h,v 1.40 2010/02/16 22:34:57 tgl Exp $ * *------------------------------------------------------------------------- */ #ifndef ASYNC_H #define ASYNC_H +#include "fmgr.h" + +/* + * The number of SLRU page buffers we use for the notification queue. + */ +#define NUM_ASYNC_BUFFERS 8 + extern bool Trace_notify; +extern Size AsyncShmemSize(void); +extern void AsyncShmemInit(void); + /* notify-related SQL statements */ -extern void Async_Notify(const char *relname); -extern void Async_Listen(const char *relname); -extern void Async_Unlisten(const char *relname); +extern void Async_Notify(const char *channel, const char *payload); +extern void Async_Listen(const char *channel); +extern void Async_Unlisten(const char *channel); extern void Async_UnlistenAll(void); +/* notify-related SQL functions */ +extern Datum pg_listening_channels(PG_FUNCTION_ARGS); +extern Datum pg_notify(PG_FUNCTION_ARGS); + /* perform (or cancel) outbound notify processing at transaction commit */ +extern void PreCommit_Notify(void); extern void AtCommit_Notify(void); extern void AtAbort_Notify(void); extern void AtSubStart_Notify(void); extern void AtSubCommit_Notify(void); extern void AtSubAbort_Notify(void); extern void AtPrepare_Notify(void); +extern void ProcessCompletedNotifies(void); /* signal handler for inbound notifies (PROCSIG_NOTIFY_INTERRUPT) */ extern void HandleNotifyInterrupt(void); @@ -40,7 +56,4 @@ extern void HandleNotifyInterrupt(void); extern void EnableNotifyInterrupt(void); extern bool DisableNotifyInterrupt(void); -extern void notify_twophase_postcommit(TransactionId xid, uint16 info, - void *recdata, uint32 len); - #endif /* ASYNC_H */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 0c3aecfa6e..ca229c8e23 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -13,7 +13,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/nodes/parsenodes.h,v 1.429 2010/02/12 17:33:20 tgl Exp $ + * $PostgreSQL: pgsql/src/include/nodes/parsenodes.h,v 1.430 2010/02/16 22:34:57 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -2097,6 +2097,7 @@ typedef struct NotifyStmt { NodeTag type; char *conditionname; /* condition name to notify */ + char *payload; /* the payload string, or NULL if none */ } NotifyStmt; /* ---------------------- diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index f0beb20a24..2ace958500 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.44 2010/02/07 20:48:13 tgl Exp $ + * $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.45 2010/02/16 22:34:57 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -68,6 +68,8 @@ typedef enum LWLockId AutovacuumScheduleLock, SyncScanLock, RelationMappingLock, + AsyncCtlLock, + AsyncQueueLock, /* Individual lock IDs end here */ FirstBufMappingLock, FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS, diff --git a/src/test/regress/expected/guc.out b/src/test/regress/expected/guc.out index 83cacbdd20..59c120c99a 100644 --- a/src/test/regress/expected/guc.out +++ b/src/test/regress/expected/guc.out @@ -532,9 +532,9 @@ CREATE TEMP TABLE tmp_foo (data text) ON COMMIT DELETE ROWS; CREATE ROLE temp_reset_user; SET SESSION AUTHORIZATION temp_reset_user; -- look changes -SELECT relname FROM pg_listener; - relname ------------ +SELECT pg_listening_channels(); + pg_listening_channels +----------------------- foo_event (1 row) @@ -571,9 +571,9 @@ SELECT current_user = 'temp_reset_user'; -- discard everything DISCARD ALL; -- look again -SELECT relname FROM pg_listener; - relname ---------- +SELECT pg_listening_channels(); + pg_listening_channels +----------------------- (0 rows) SELECT name FROM pg_prepared_statements; diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out index 4dc59d9b5d..1d9e110044 100644 --- a/src/test/regress/expected/sanity_check.out +++ b/src/test/regress/expected/sanity_check.out @@ -107,7 +107,6 @@ SELECT relname, relhasindex pg_language | t pg_largeobject | t pg_largeobject_metadata | t - pg_listener | f pg_namespace | t pg_opclass | t pg_operator | t @@ -154,7 +153,7 @@ SELECT relname, relhasindex timetz_tbl | f tinterval_tbl | f varchar_tbl | f -(143 rows) +(142 rows) -- -- another sanity check: every system catalog that has OIDs should have diff --git a/src/test/regress/sql/guc.sql b/src/test/regress/sql/guc.sql index a7b795af47..21ed86f26b 100644 --- a/src/test/regress/sql/guc.sql +++ b/src/test/regress/sql/guc.sql @@ -165,7 +165,7 @@ CREATE TEMP TABLE tmp_foo (data text) ON COMMIT DELETE ROWS; CREATE ROLE temp_reset_user; SET SESSION AUTHORIZATION temp_reset_user; -- look changes -SELECT relname FROM pg_listener; +SELECT pg_listening_channels(); SELECT name FROM pg_prepared_statements; SELECT name FROM pg_cursors; SHOW vacuum_cost_delay; @@ -174,7 +174,7 @@ SELECT current_user = 'temp_reset_user'; -- discard everything DISCARD ALL; -- look again -SELECT relname FROM pg_listener; +SELECT pg_listening_channels(); SELECT name FROM pg_prepared_statements; SELECT name FROM pg_cursors; SHOW vacuum_cost_delay; -- 2.40.0