]> granicus.if.org Git - postgresql/blob - contrib/test_decoding/test_decoding.c
Adjust blank lines around PG_MODULE_MAGIC defines, for consistency
[postgresql] / contrib / test_decoding / test_decoding.c
1 /*-------------------------------------------------------------------------
2  *
3  * test_decoding.c
4  *                example logical decoding output plugin
5  *
6  * Copyright (c) 2012-2014, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *                contrib/test_decoding/test_decoding.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14
15 #include "access/sysattr.h"
16
17 #include "catalog/pg_class.h"
18 #include "catalog/pg_type.h"
19
20 #include "nodes/parsenodes.h"
21
22 #include "replication/output_plugin.h"
23 #include "replication/logical.h"
24
25 #include "utils/builtins.h"
26 #include "utils/lsyscache.h"
27 #include "utils/memutils.h"
28 #include "utils/rel.h"
29 #include "utils/relcache.h"
30 #include "utils/syscache.h"
31 #include "utils/typcache.h"
32
33 PG_MODULE_MAGIC;
34
35 /* These must be available to pg_dlsym() */
36 extern void _PG_init(void);
37 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
38
39 typedef struct
40 {
41         MemoryContext context;
42         bool            include_xids;
43         bool            include_timestamp;
44 } TestDecodingData;
45
46 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
47                                   bool is_init);
48 static void pg_decode_shutdown(LogicalDecodingContext *ctx);
49 static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
50                                         ReorderBufferTXN *txn);
51 static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
52                                          ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
53 static void pg_decode_change(LogicalDecodingContext *ctx,
54                                  ReorderBufferTXN *txn, Relation rel,
55                                  ReorderBufferChange *change);
56
57 void
58 _PG_init(void)
59 {
60         /* other plugins can perform things here */
61 }
62
63 /* specify output plugin callbacks */
64 void
65 _PG_output_plugin_init(OutputPluginCallbacks *cb)
66 {
67         AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
68
69         cb->startup_cb = pg_decode_startup;
70         cb->begin_cb = pg_decode_begin_txn;
71         cb->change_cb = pg_decode_change;
72         cb->commit_cb = pg_decode_commit_txn;
73         cb->shutdown_cb = pg_decode_shutdown;
74 }
75
76
77 /* initialize this plugin */
78 static void
79 pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
80                                   bool is_init)
81 {
82         ListCell   *option;
83         TestDecodingData *data;
84
85         data = palloc(sizeof(TestDecodingData));
86         data->context = AllocSetContextCreate(ctx->context,
87                                                                                   "text conversion context",
88                                                                                   ALLOCSET_DEFAULT_MINSIZE,
89                                                                                   ALLOCSET_DEFAULT_INITSIZE,
90                                                                                   ALLOCSET_DEFAULT_MAXSIZE);
91         data->include_xids = true;
92         data->include_timestamp = false;
93
94         ctx->output_plugin_private = data;
95
96         opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
97
98         foreach(option, ctx->output_plugin_options)
99         {
100                 DefElem    *elem = lfirst(option);
101
102                 Assert(elem->arg == NULL || IsA(elem->arg, String));
103
104                 if (strcmp(elem->defname, "include-xids") == 0)
105                 {
106                         /* if option does not provide a value, it means its value is true */
107                         if (elem->arg == NULL)
108                                 data->include_xids = true;
109                         else if (!parse_bool(strVal(elem->arg), &data->include_xids))
110                                 ereport(ERROR,
111                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
112                                   errmsg("could not parse value \"%s\" for parameter \"%s\"",
113                                                  strVal(elem->arg), elem->defname)));
114                 }
115                 else if (strcmp(elem->defname, "include-timestamp") == 0)
116                 {
117                         if (elem->arg == NULL)
118                                 data->include_timestamp = true;
119                         else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
120                                 ereport(ERROR,
121                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
122                                   errmsg("could not parse value \"%s\" for parameter \"%s\"",
123                                                  strVal(elem->arg), elem->defname)));
124                 }
125                 else if (strcmp(elem->defname, "force-binary") == 0)
126                 {
127                         bool            force_binary;
128
129                         if (elem->arg == NULL)
130                                 continue;
131                         else if (!parse_bool(strVal(elem->arg), &force_binary))
132                                 ereport(ERROR,
133                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
134                                   errmsg("could not parse value \"%s\" for parameter \"%s\"",
135                                                  strVal(elem->arg), elem->defname)));
136
137                         if (force_binary)
138                                 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
139                 }
140                 else
141                 {
142                         ereport(ERROR,
143                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
144                                          errmsg("option \"%s\" = \"%s\" is unknown",
145                                                         elem->defname,
146                                                         elem->arg ? strVal(elem->arg) : "(null)")));
147                 }
148         }
149 }
150
151 /* cleanup this plugin's resources */
152 static void
153 pg_decode_shutdown(LogicalDecodingContext *ctx)
154 {
155         TestDecodingData *data = ctx->output_plugin_private;
156
157         /* cleanup our own resources via memory context reset */
158         MemoryContextDelete(data->context);
159 }
160
161 /* BEGIN callback */
162 static void
163 pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
164 {
165         TestDecodingData *data = ctx->output_plugin_private;
166
167         OutputPluginPrepareWrite(ctx, true);
168         if (data->include_xids)
169                 appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
170         else
171                 appendStringInfoString(ctx->out, "BEGIN");
172         OutputPluginWrite(ctx, true);
173 }
174
175 /* COMMIT callback */
176 static void
177 pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
178                                          XLogRecPtr commit_lsn)
179 {
180         TestDecodingData *data = ctx->output_plugin_private;
181
182         OutputPluginPrepareWrite(ctx, true);
183         if (data->include_xids)
184                 appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
185         else
186                 appendStringInfoString(ctx->out, "COMMIT");
187
188         if (data->include_timestamp)
189                 appendStringInfo(ctx->out, " (at %s)",
190                                                  timestamptz_to_str(txn->commit_time));
191
192         OutputPluginWrite(ctx, true);
193 }
194
195 /*
196  * Print literal `outputstr' already represented as string of type `typid'
197  * into stringbuf `s'.
198  *
199  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
200  * if standard_conforming_strings were enabled.
201  */
202 static void
203 print_literal(StringInfo s, Oid typid, char *outputstr)
204 {
205         const char *valptr;
206
207         switch (typid)
208         {
209                 case INT2OID:
210                 case INT4OID:
211                 case INT8OID:
212                 case OIDOID:
213                 case FLOAT4OID:
214                 case FLOAT8OID:
215                 case NUMERICOID:
216                         /* NB: We don't care about Inf, NaN et al. */
217                         appendStringInfoString(s, outputstr);
218                         break;
219
220                 case BITOID:
221                 case VARBITOID:
222                         appendStringInfo(s, "B'%s'", outputstr);
223                         break;
224
225                 case BOOLOID:
226                         if (strcmp(outputstr, "t") == 0)
227                                 appendStringInfoString(s, "true");
228                         else
229                                 appendStringInfoString(s, "false");
230                         break;
231
232                 default:
233                         appendStringInfoChar(s, '\'');
234                         for (valptr = outputstr; *valptr; valptr++)
235                         {
236                                 char            ch = *valptr;
237
238                                 if (SQL_STR_DOUBLE(ch, false))
239                                         appendStringInfoChar(s, ch);
240                                 appendStringInfoChar(s, ch);
241                         }
242                         appendStringInfoChar(s, '\'');
243                         break;
244         }
245 }
246
247 /* print the tuple 'tuple' into the StringInfo s */
248 static void
249 tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
250 {
251         int                     natt;
252         Oid                     oid;
253
254         /* print oid of tuple, it's not included in the TupleDesc */
255         if ((oid = HeapTupleHeaderGetOid(tuple->t_data)) != InvalidOid)
256         {
257                 appendStringInfo(s, " oid[oid]:%u", oid);
258         }
259
260         /* print all columns individually */
261         for (natt = 0; natt < tupdesc->natts; natt++)
262         {
263                 Form_pg_attribute attr; /* the attribute itself */
264                 Oid                     typid;          /* type of current attribute */
265                 Oid                     typoutput;      /* output function */
266                 bool            typisvarlena;
267                 Datum           origval;        /* possibly toasted Datum */
268                 bool            isnull;         /* column is null? */
269
270                 attr = tupdesc->attrs[natt];
271
272                 /*
273                  * don't print dropped columns, we can't be sure everything is
274                  * available for them
275                  */
276                 if (attr->attisdropped)
277                         continue;
278
279                 /*
280                  * Don't print system columns, oid will already have been printed if
281                  * present.
282                  */
283                 if (attr->attnum < 0)
284                         continue;
285
286                 typid = attr->atttypid;
287
288                 /* get Datum from tuple */
289                 origval = fastgetattr(tuple, natt + 1, tupdesc, &isnull);
290
291                 if (isnull && skip_nulls)
292                         continue;
293
294                 /* print attribute name */
295                 appendStringInfoChar(s, ' ');
296                 appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
297
298                 /* print attribute type */
299                 appendStringInfoChar(s, '[');
300                 appendStringInfoString(s, format_type_be(typid));
301                 appendStringInfoChar(s, ']');
302
303                 /* query output function */
304                 getTypeOutputInfo(typid,
305                                                   &typoutput, &typisvarlena);
306
307                 /* print separator */
308                 appendStringInfoChar(s, ':');
309
310                 /* print data */
311                 if (isnull)
312                         appendStringInfoString(s, "null");
313                 else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
314                         appendStringInfoString(s, "unchanged-toast-datum");
315                 else if (!typisvarlena)
316                         print_literal(s, typid,
317                                                   OidOutputFunctionCall(typoutput, origval));
318                 else
319                 {
320                         Datum           val;    /* definitely detoasted Datum */
321
322                         val = PointerGetDatum(PG_DETOAST_DATUM(origval));
323                         print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
324                 }
325         }
326 }
327
328 /*
329  * callback for individual changed tuples
330  */
331 static void
332 pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
333                                  Relation relation, ReorderBufferChange *change)
334 {
335         TestDecodingData *data;
336         Form_pg_class class_form;
337         TupleDesc       tupdesc;
338         MemoryContext old;
339
340         data = ctx->output_plugin_private;
341         class_form = RelationGetForm(relation);
342         tupdesc = RelationGetDescr(relation);
343
344         /* Avoid leaking memory by using and resetting our own context */
345         old = MemoryContextSwitchTo(data->context);
346
347         OutputPluginPrepareWrite(ctx, true);
348
349         appendStringInfoString(ctx->out, "table ");
350         appendStringInfoString(ctx->out,
351                                                    quote_qualified_identifier(
352                                                                                                           get_namespace_name(
353                                                           get_rel_namespace(RelationGetRelid(relation))),
354                                                                                           NameStr(class_form->relname)));
355         appendStringInfoString(ctx->out, ":");
356
357         switch (change->action)
358         {
359                 case REORDER_BUFFER_CHANGE_INSERT:
360                         appendStringInfoString(ctx->out, " INSERT:");
361                         if (change->data.tp.newtuple == NULL)
362                                 appendStringInfoString(ctx->out, " (no-tuple-data)");
363                         else
364                                 tuple_to_stringinfo(ctx->out, tupdesc,
365                                                                         &change->data.tp.newtuple->tuple,
366                                                                         false);
367                         break;
368                 case REORDER_BUFFER_CHANGE_UPDATE:
369                         appendStringInfoString(ctx->out, " UPDATE:");
370                         if (change->data.tp.oldtuple != NULL)
371                         {
372                                 appendStringInfoString(ctx->out, " old-key:");
373                                 tuple_to_stringinfo(ctx->out, tupdesc,
374                                                                         &change->data.tp.oldtuple->tuple,
375                                                                         true);
376                                 appendStringInfoString(ctx->out, " new-tuple:");
377                         }
378
379                         if (change->data.tp.newtuple == NULL)
380                                 appendStringInfoString(ctx->out, " (no-tuple-data)");
381                         else
382                                 tuple_to_stringinfo(ctx->out, tupdesc,
383                                                                         &change->data.tp.newtuple->tuple,
384                                                                         false);
385                         break;
386                 case REORDER_BUFFER_CHANGE_DELETE:
387                         appendStringInfoString(ctx->out, " DELETE:");
388
389                         /* if there was no PK, we only know that a delete happened */
390                         if (change->data.tp.oldtuple == NULL)
391                                 appendStringInfoString(ctx->out, " (no-tuple-data)");
392                         /* In DELETE, only the replica identity is present; display that */
393                         else
394                                 tuple_to_stringinfo(ctx->out, tupdesc,
395                                                                         &change->data.tp.oldtuple->tuple,
396                                                                         true);
397                         break;
398                 default:
399                         Assert(false);
400         }
401
402         MemoryContextSwitchTo(old);
403         MemoryContextReset(data->context);
404
405         OutputPluginWrite(ctx, true);
406 }