1 /*-------------------------------------------------------------------------
4 * example logical decoding output plugin
6 * Copyright (c) 2012-2014, PostgreSQL Global Development Group
9 * contrib/test_decoding/test_decoding.c
11 *-------------------------------------------------------------------------
15 #include "access/sysattr.h"
17 #include "catalog/pg_class.h"
18 #include "catalog/pg_type.h"
20 #include "nodes/parsenodes.h"
22 #include "replication/output_plugin.h"
23 #include "replication/logical.h"
25 #include "utils/builtins.h"
26 #include "utils/lsyscache.h"
27 #include "utils/memutils.h"
28 #include "utils/rel.h"
29 #include "utils/relcache.h"
30 #include "utils/syscache.h"
31 #include "utils/typcache.h"
35 /* These must be available to pg_dlsym() */
36 extern void _PG_init(void);
37 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
41 MemoryContext context;
43 bool include_timestamp;
46 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
48 static void pg_decode_shutdown(LogicalDecodingContext *ctx);
49 static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
50 ReorderBufferTXN *txn);
51 static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
52 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
53 static void pg_decode_change(LogicalDecodingContext *ctx,
54 ReorderBufferTXN *txn, Relation rel,
55 ReorderBufferChange *change);
60 /* other plugins can perform things here */
63 /* specify output plugin callbacks */
65 _PG_output_plugin_init(OutputPluginCallbacks *cb)
67 AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
69 cb->startup_cb = pg_decode_startup;
70 cb->begin_cb = pg_decode_begin_txn;
71 cb->change_cb = pg_decode_change;
72 cb->commit_cb = pg_decode_commit_txn;
73 cb->shutdown_cb = pg_decode_shutdown;
77 /* initialize this plugin */
79 pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
83 TestDecodingData *data;
85 data = palloc(sizeof(TestDecodingData));
86 data->context = AllocSetContextCreate(ctx->context,
87 "text conversion context",
88 ALLOCSET_DEFAULT_MINSIZE,
89 ALLOCSET_DEFAULT_INITSIZE,
90 ALLOCSET_DEFAULT_MAXSIZE);
91 data->include_xids = true;
92 data->include_timestamp = false;
94 ctx->output_plugin_private = data;
96 opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
98 foreach(option, ctx->output_plugin_options)
100 DefElem *elem = lfirst(option);
102 Assert(elem->arg == NULL || IsA(elem->arg, String));
104 if (strcmp(elem->defname, "include-xids") == 0)
106 /* if option does not provide a value, it means its value is true */
107 if (elem->arg == NULL)
108 data->include_xids = true;
109 else if (!parse_bool(strVal(elem->arg), &data->include_xids))
111 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
112 errmsg("could not parse value \"%s\" for parameter \"%s\"",
113 strVal(elem->arg), elem->defname)));
115 else if (strcmp(elem->defname, "include-timestamp") == 0)
117 if (elem->arg == NULL)
118 data->include_timestamp = true;
119 else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
121 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
122 errmsg("could not parse value \"%s\" for parameter \"%s\"",
123 strVal(elem->arg), elem->defname)));
125 else if (strcmp(elem->defname, "force-binary") == 0)
129 if (elem->arg == NULL)
131 else if (!parse_bool(strVal(elem->arg), &force_binary))
133 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
134 errmsg("could not parse value \"%s\" for parameter \"%s\"",
135 strVal(elem->arg), elem->defname)));
138 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
143 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
144 errmsg("option \"%s\" = \"%s\" is unknown",
146 elem->arg ? strVal(elem->arg) : "(null)")));
151 /* cleanup this plugin's resources */
153 pg_decode_shutdown(LogicalDecodingContext *ctx)
155 TestDecodingData *data = ctx->output_plugin_private;
157 /* cleanup our own resources via memory context reset */
158 MemoryContextDelete(data->context);
163 pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
165 TestDecodingData *data = ctx->output_plugin_private;
167 OutputPluginPrepareWrite(ctx, true);
168 if (data->include_xids)
169 appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
171 appendStringInfoString(ctx->out, "BEGIN");
172 OutputPluginWrite(ctx, true);
175 /* COMMIT callback */
177 pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
178 XLogRecPtr commit_lsn)
180 TestDecodingData *data = ctx->output_plugin_private;
182 OutputPluginPrepareWrite(ctx, true);
183 if (data->include_xids)
184 appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
186 appendStringInfoString(ctx->out, "COMMIT");
188 if (data->include_timestamp)
189 appendStringInfo(ctx->out, " (at %s)",
190 timestamptz_to_str(txn->commit_time));
192 OutputPluginWrite(ctx, true);
196 * Print literal `outputstr' already represented as string of type `typid'
197 * into stringbuf `s'.
199 * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
200 * if standard_conforming_strings were enabled.
203 print_literal(StringInfo s, Oid typid, char *outputstr)
216 /* NB: We don't care about Inf, NaN et al. */
217 appendStringInfoString(s, outputstr);
222 appendStringInfo(s, "B'%s'", outputstr);
226 if (strcmp(outputstr, "t") == 0)
227 appendStringInfoString(s, "true");
229 appendStringInfoString(s, "false");
233 appendStringInfoChar(s, '\'');
234 for (valptr = outputstr; *valptr; valptr++)
238 if (SQL_STR_DOUBLE(ch, false))
239 appendStringInfoChar(s, ch);
240 appendStringInfoChar(s, ch);
242 appendStringInfoChar(s, '\'');
247 /* print the tuple 'tuple' into the StringInfo s */
249 tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
254 /* print oid of tuple, it's not included in the TupleDesc */
255 if ((oid = HeapTupleHeaderGetOid(tuple->t_data)) != InvalidOid)
257 appendStringInfo(s, " oid[oid]:%u", oid);
260 /* print all columns individually */
261 for (natt = 0; natt < tupdesc->natts; natt++)
263 Form_pg_attribute attr; /* the attribute itself */
264 Oid typid; /* type of current attribute */
265 Oid typoutput; /* output function */
267 Datum origval; /* possibly toasted Datum */
268 bool isnull; /* column is null? */
270 attr = tupdesc->attrs[natt];
273 * don't print dropped columns, we can't be sure everything is
276 if (attr->attisdropped)
280 * Don't print system columns, oid will already have been printed if
283 if (attr->attnum < 0)
286 typid = attr->atttypid;
288 /* get Datum from tuple */
289 origval = fastgetattr(tuple, natt + 1, tupdesc, &isnull);
291 if (isnull && skip_nulls)
294 /* print attribute name */
295 appendStringInfoChar(s, ' ');
296 appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
298 /* print attribute type */
299 appendStringInfoChar(s, '[');
300 appendStringInfoString(s, format_type_be(typid));
301 appendStringInfoChar(s, ']');
303 /* query output function */
304 getTypeOutputInfo(typid,
305 &typoutput, &typisvarlena);
307 /* print separator */
308 appendStringInfoChar(s, ':');
312 appendStringInfoString(s, "null");
313 else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
314 appendStringInfoString(s, "unchanged-toast-datum");
315 else if (!typisvarlena)
316 print_literal(s, typid,
317 OidOutputFunctionCall(typoutput, origval));
320 Datum val; /* definitely detoasted Datum */
322 val = PointerGetDatum(PG_DETOAST_DATUM(origval));
323 print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
329 * callback for individual changed tuples
332 pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
333 Relation relation, ReorderBufferChange *change)
335 TestDecodingData *data;
336 Form_pg_class class_form;
340 data = ctx->output_plugin_private;
341 class_form = RelationGetForm(relation);
342 tupdesc = RelationGetDescr(relation);
344 /* Avoid leaking memory by using and resetting our own context */
345 old = MemoryContextSwitchTo(data->context);
347 OutputPluginPrepareWrite(ctx, true);
349 appendStringInfoString(ctx->out, "table ");
350 appendStringInfoString(ctx->out,
351 quote_qualified_identifier(
353 get_rel_namespace(RelationGetRelid(relation))),
354 NameStr(class_form->relname)));
355 appendStringInfoString(ctx->out, ":");
357 switch (change->action)
359 case REORDER_BUFFER_CHANGE_INSERT:
360 appendStringInfoString(ctx->out, " INSERT:");
361 if (change->data.tp.newtuple == NULL)
362 appendStringInfoString(ctx->out, " (no-tuple-data)");
364 tuple_to_stringinfo(ctx->out, tupdesc,
365 &change->data.tp.newtuple->tuple,
368 case REORDER_BUFFER_CHANGE_UPDATE:
369 appendStringInfoString(ctx->out, " UPDATE:");
370 if (change->data.tp.oldtuple != NULL)
372 appendStringInfoString(ctx->out, " old-key:");
373 tuple_to_stringinfo(ctx->out, tupdesc,
374 &change->data.tp.oldtuple->tuple,
376 appendStringInfoString(ctx->out, " new-tuple:");
379 if (change->data.tp.newtuple == NULL)
380 appendStringInfoString(ctx->out, " (no-tuple-data)");
382 tuple_to_stringinfo(ctx->out, tupdesc,
383 &change->data.tp.newtuple->tuple,
386 case REORDER_BUFFER_CHANGE_DELETE:
387 appendStringInfoString(ctx->out, " DELETE:");
389 /* if there was no PK, we only know that a delete happened */
390 if (change->data.tp.oldtuple == NULL)
391 appendStringInfoString(ctx->out, " (no-tuple-data)");
392 /* In DELETE, only the replica identity is present; display that */
394 tuple_to_stringinfo(ctx->out, tupdesc,
395 &change->data.tp.oldtuple->tuple,
402 MemoryContextSwitchTo(old);
403 MemoryContextReset(data->context);
405 OutputPluginWrite(ctx, true);