From: Robert Haas Date: Fri, 20 Jan 2012 04:15:15 +0000 (-0500) Subject: Triggered change notifications. X-Git-Tag: REL9_2_BETA1~564 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=6e3323d41dc45e93700a3420fd27ca05db6a64a7;p=postgresql Triggered change notifications. Kevin Grittner, reviewed (in earlier versions) by Álvaro Herrera --- diff --git a/contrib/Makefile b/contrib/Makefile index 0c238aae16..ac0a80a014 100644 --- a/contrib/Makefile +++ b/contrib/Makefile @@ -45,6 +45,7 @@ SUBDIRS = \ seg \ spi \ tablefunc \ + tcn \ test_parser \ tsearch2 \ unaccent \ diff --git a/contrib/tcn/Makefile b/contrib/tcn/Makefile new file mode 100644 index 0000000000..7bac5e359c --- /dev/null +++ b/contrib/tcn/Makefile @@ -0,0 +1,17 @@ +# contrib/tcn/Makefile + +MODULES = tcn + +EXTENSION = tcn +DATA = tcn--1.0.sql + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/tcn +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/tcn/tcn--1.0.sql b/contrib/tcn/tcn--1.0.sql new file mode 100644 index 0000000000..027a4effbb --- /dev/null +++ b/contrib/tcn/tcn--1.0.sql @@ -0,0 +1,9 @@ +/* contrib/tcn/tcn--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION tcn" to load this file. \quit + +CREATE FUNCTION triggered_change_notification() +RETURNS pg_catalog.trigger +AS 'MODULE_PATHNAME' +LANGUAGE C; diff --git a/contrib/tcn/tcn.c b/contrib/tcn/tcn.c new file mode 100644 index 0000000000..314632dd89 --- /dev/null +++ b/contrib/tcn/tcn.c @@ -0,0 +1,184 @@ +/*------------------------------------------------------------------------- + * + * tcn.c + * triggered change notification support for PostgreSQL + * + * Portions Copyright (c) 2011-2012, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * contrib/tcn/tcn.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "executor/spi.h" +#include "commands/async.h" +#include "commands/trigger.h" +#include "lib/stringinfo.h" +#include "utils/rel.h" +#include "utils/syscache.h" + + +PG_MODULE_MAGIC; + + +/* forward declarations */ +Datum triggered_change_notification(PG_FUNCTION_ARGS); + + +/* + * Copy from s (for source) to r (for result), wrapping with q (quote) + * characters and doubling any quote characters found. + */ +static void +strcpy_quoted(StringInfo r, const char *s, const char q) +{ + appendStringInfoCharMacro(r, q); + while (*s) + { + if (*s == q) + appendStringInfoCharMacro(r, q); + appendStringInfoCharMacro(r, *s); + s++; + } + appendStringInfoCharMacro(r, q); +} + +/* + * triggered_change_notification + * + * This trigger function will send a notification of data modification with + * primary key values. The channel will be "tcn" unless the trigger is + * created with a parameter, in which case that parameter will be used. + */ +PG_FUNCTION_INFO_V1(triggered_change_notification); + +Datum +triggered_change_notification(PG_FUNCTION_ARGS) +{ + TriggerData *trigdata = (TriggerData *) fcinfo->context; + Trigger *trigger; + int nargs; + HeapTuple trigtuple; + Relation rel; + TupleDesc tupdesc; + char *channel; + char operation; + StringInfo payload = makeStringInfo(); + bool foundPK; + + List *indexoidlist; + ListCell *indexoidscan; + + /* make sure it's called as a trigger */ + if (!CALLED_AS_TRIGGER(fcinfo)) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must be called as trigger"))); + + /* and that it's called after the change */ + if (!TRIGGER_FIRED_AFTER(trigdata->tg_event)) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must be called after the change"))); + + /* and that it's called for each row */ + if (!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event)) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must be called for each row"))); + + if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) + operation = 'I'; + else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) + operation = 'U'; + else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event)) + operation = 'D'; + else + { + elog(ERROR, "triggered_change_notification: trigger fired by unrecognized operation"); + operation = 'X'; /* silence compiler warning */ + } + + trigger = trigdata->tg_trigger; + nargs = trigger->tgnargs; + if (nargs > 1) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must not be called with more than one parameter"))); + + if (nargs == 0) + channel = "tcn"; + else + channel = trigger->tgargs[0]; + + /* get tuple data */ + trigtuple = trigdata->tg_trigtuple; + rel = trigdata->tg_relation; + tupdesc = rel->rd_att; + + foundPK = false; + + /* + * Get the list of index OIDs for the table from the relcache, and look up + * each one in the pg_index syscache until we find one marked primary key + * (hopefully there isn't more than one such). + */ + indexoidlist = RelationGetIndexList(rel); + + foreach(indexoidscan, indexoidlist) + { + Oid indexoid = lfirst_oid(indexoidscan); + HeapTuple indexTuple; + Form_pg_index index; + + indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid)); + if (!HeapTupleIsValid(indexTuple)) /* should not happen */ + elog(ERROR, "cache lookup failed for index %u", indexoid); + index = (Form_pg_index) GETSTRUCT(indexTuple); + /* we're only interested if it is the primary key */ + if (index->indisprimary) + { + int numatts = index->indnatts; + + if (numatts > 0) + { + int i; + + foundPK = true; + + strcpy_quoted(payload, RelationGetRelationName(rel), '"'); + appendStringInfoCharMacro(payload, ','); + appendStringInfoCharMacro(payload, operation); + + for (i = 0; i < numatts; i++) + { + int colno = index->indkey.values[i]; + + appendStringInfoCharMacro(payload, ','); + strcpy_quoted(payload, NameStr((tupdesc->attrs[colno - 1])->attname), '"'); + appendStringInfoCharMacro(payload, '='); + strcpy_quoted(payload, SPI_getvalue(trigtuple, tupdesc, colno), '\''); + } + + Async_Notify(channel, payload->data); + } + ReleaseSysCache(indexTuple); + break; + } + ReleaseSysCache(indexTuple); + } + + list_free(indexoidlist); + + if (!foundPK) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must be called on a table with a primary key"))); + + return PointerGetDatum(NULL); /* after trigger; value doesn't matter */ +} diff --git a/contrib/tcn/tcn.control b/contrib/tcn/tcn.control new file mode 100644 index 0000000000..8abfd19dc7 --- /dev/null +++ b/contrib/tcn/tcn.control @@ -0,0 +1,5 @@ +# tcn extension +comment = 'Triggered change notifications' +default_version = '1.0' +module_pathname = '$libdir/tcn' +relocatable = true diff --git a/doc/src/sgml/contrib.sgml b/doc/src/sgml/contrib.sgml index adf09ca872..d4da4eec87 100644 --- a/doc/src/sgml/contrib.sgml +++ b/doc/src/sgml/contrib.sgml @@ -128,6 +128,7 @@ CREATE EXTENSION module_name FROM unpackaged; &contrib-spi; &sslinfo; &tablefunc; + &tcn; &test-parser; &tsearch2; &unaccent; diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml index b96dd656ad..b5d3c6d4fc 100644 --- a/doc/src/sgml/filelist.sgml +++ b/doc/src/sgml/filelist.sgml @@ -136,6 +136,7 @@ + diff --git a/doc/src/sgml/tcn.sgml b/doc/src/sgml/tcn.sgml new file mode 100644 index 0000000000..af830dfffe --- /dev/null +++ b/doc/src/sgml/tcn.sgml @@ -0,0 +1,72 @@ + + + + tcn + + + tcn + + + + triggered_change_notification + + + + The tcn module provides a trigger function that notifies + listeners of changes to any table on which it is attached. It must be + used as an AFTER trigger FOR EACH ROW. + + + + Only one parameter may be suupplied to the function in a + CREATE TRIGGER statement, and that is optional. If supplied + it will be used for the channel name for the notifications. If omitted + tcn will be used for the channel name. + + + + The payload of the notifications consists of the table name, a letter to + indicate which type of operation was performed, and column name/value pairs + for primary key columns. Each part is separated from the next by a comma. + For ease of parsing using regular expressions, table and column names are + always wrapped in double quotes, and data values are always wrapped in + single quotes. Embeded quotes are doubled. + + + + A brief example of using the extension follows. + + +test=# create table tcndata +test-# ( +test(# a int not null, +test(# b date not null, +test(# c text, +test(# primary key (a, b) +test(# ); +NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "tcndata_pkey" for table "tcndata" +CREATE TABLE +test=# create trigger tcndata_tcn_trigger +test-# after insert or update or delete on tcndata +test-# for each row execute procedure triggered_change_notification(); +CREATE TRIGGER +test=# listen tcn; +LISTEN +test=# insert into tcndata values (1, date '2012-12-22', 'one'), +test-# (1, date '2012-12-23', 'another'), +test-# (2, date '2012-12-23', 'two'); +INSERT 0 3 +Asynchronous notification "tcn" with payload ""tcndata",I,"a"='1',"b"='2012-12-22'" received from server process with PID 22770. +Asynchronous notification "tcn" with payload ""tcndata",I,"a"='1',"b"='2012-12-23'" received from server process with PID 22770. +Asynchronous notification "tcn" with payload ""tcndata",I,"a"='2',"b"='2012-12-23'" received from server process with PID 22770. +test=# update tcndata set c = 'uno' where a = 1; +UPDATE 2 +Asynchronous notification "tcn" with payload ""tcndata",U,"a"='1',"b"='2012-12-22'" received from server process with PID 22770. +Asynchronous notification "tcn" with payload ""tcndata",U,"a"='1',"b"='2012-12-23'" received from server process with PID 22770. +test=# delete from tcndata where a = 1 and b = date '2012-12-22'; +DELETE 1 +Asynchronous notification "tcn" with payload ""tcndata",D,"a"='1',"b"='2012-12-22'" received from server process with PID 22770. + + + +