]> granicus.if.org Git - postgresql/blob - contrib/tcn/tcn.c
Split tuple struct defs from htup.h to htup_details.h
[postgresql] / contrib / tcn / tcn.c
1 /*-------------------------------------------------------------------------
2  *
3  * tcn.c
4  *        triggered change notification support for PostgreSQL
5  *
6  * Portions Copyright (c) 2011-2012, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        contrib/tcn/tcn.c
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/htup_details.h"
19 #include "executor/spi.h"
20 #include "commands/async.h"
21 #include "commands/trigger.h"
22 #include "lib/stringinfo.h"
23 #include "utils/rel.h"
24 #include "utils/syscache.h"
25
26
27 PG_MODULE_MAGIC;
28
29
30 /* forward declarations */
31 Datum           triggered_change_notification(PG_FUNCTION_ARGS);
32
33
34 /*
35  * Copy from s (for source) to r (for result), wrapping with q (quote)
36  * characters and doubling any quote characters found.
37  */
38 static void
39 strcpy_quoted(StringInfo r, const char *s, const char q)
40 {
41         appendStringInfoCharMacro(r, q);
42         while (*s)
43         {
44                 if (*s == q)
45                         appendStringInfoCharMacro(r, q);
46                 appendStringInfoCharMacro(r, *s);
47                 s++;
48         }
49         appendStringInfoCharMacro(r, q);
50 }
51
52 /*
53  * triggered_change_notification
54  *
55  * This trigger function will send a notification of data modification with
56  * primary key values.  The channel will be "tcn" unless the trigger is
57  * created with a parameter, in which case that parameter will be used.
58  */
59 PG_FUNCTION_INFO_V1(triggered_change_notification);
60
61 Datum
62 triggered_change_notification(PG_FUNCTION_ARGS)
63 {
64         TriggerData *trigdata = (TriggerData *) fcinfo->context;
65         Trigger    *trigger;
66         int                     nargs;
67         HeapTuple       trigtuple;
68         Relation        rel;
69         TupleDesc       tupdesc;
70         char       *channel;
71         char            operation;
72         StringInfo      payload = makeStringInfo();
73         bool            foundPK;
74
75         List       *indexoidlist;
76         ListCell   *indexoidscan;
77
78         /* make sure it's called as a trigger */
79         if (!CALLED_AS_TRIGGER(fcinfo))
80                 ereport(ERROR,
81                                 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
82                 errmsg("triggered_change_notification: must be called as trigger")));
83
84         /* and that it's called after the change */
85         if (!TRIGGER_FIRED_AFTER(trigdata->tg_event))
86                 ereport(ERROR,
87                                 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
88                                  errmsg("triggered_change_notification: must be called after the change")));
89
90         /* and that it's called for each row */
91         if (!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event))
92                 ereport(ERROR,
93                                 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
94                                  errmsg("triggered_change_notification: must be called for each row")));
95
96         if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
97                 operation = 'I';
98         else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
99                 operation = 'U';
100         else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
101                 operation = 'D';
102         else
103         {
104                 elog(ERROR, "triggered_change_notification: trigger fired by unrecognized operation");
105                 operation = 'X';                /* silence compiler warning */
106         }
107
108         trigger = trigdata->tg_trigger;
109         nargs = trigger->tgnargs;
110         if (nargs > 1)
111                 ereport(ERROR,
112                                 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
113                                  errmsg("triggered_change_notification: must not be called with more than one parameter")));
114
115         if (nargs == 0)
116                 channel = "tcn";
117         else
118                 channel = trigger->tgargs[0];
119
120         /* get tuple data */
121         trigtuple = trigdata->tg_trigtuple;
122         rel = trigdata->tg_relation;
123         tupdesc = rel->rd_att;
124
125         foundPK = false;
126
127         /*
128          * Get the list of index OIDs for the table from the relcache, and look up
129          * each one in the pg_index syscache until we find one marked primary key
130          * (hopefully there isn't more than one such).
131          */
132         indexoidlist = RelationGetIndexList(rel);
133
134         foreach(indexoidscan, indexoidlist)
135         {
136                 Oid                     indexoid = lfirst_oid(indexoidscan);
137                 HeapTuple       indexTuple;
138                 Form_pg_index index;
139
140                 indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
141                 if (!HeapTupleIsValid(indexTuple))              /* should not happen */
142                         elog(ERROR, "cache lookup failed for index %u", indexoid);
143                 index = (Form_pg_index) GETSTRUCT(indexTuple);
144                 /* we're only interested if it is the primary key */
145                 if (index->indisprimary)
146                 {
147                         int                     numatts = index->indnatts;
148
149                         if (numatts > 0)
150                         {
151                                 int                     i;
152
153                                 foundPK = true;
154
155                                 strcpy_quoted(payload, RelationGetRelationName(rel), '"');
156                                 appendStringInfoCharMacro(payload, ',');
157                                 appendStringInfoCharMacro(payload, operation);
158
159                                 for (i = 0; i < numatts; i++)
160                                 {
161                                         int                     colno = index->indkey.values[i];
162
163                                         appendStringInfoCharMacro(payload, ',');
164                                         strcpy_quoted(payload, NameStr((tupdesc->attrs[colno - 1])->attname), '"');
165                                         appendStringInfoCharMacro(payload, '=');
166                                         strcpy_quoted(payload, SPI_getvalue(trigtuple, tupdesc, colno), '\'');
167                                 }
168
169                                 Async_Notify(channel, payload->data);
170                         }
171                         ReleaseSysCache(indexTuple);
172                         break;
173                 }
174                 ReleaseSysCache(indexTuple);
175         }
176
177         list_free(indexoidlist);
178
179         if (!foundPK)
180                 ereport(ERROR,
181                                 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
182                                  errmsg("triggered_change_notification: must be called on a table with a primary key")));
183
184         return PointerGetDatum(NULL);           /* after trigger; value doesn't matter */
185 }