1 /*-------------------------------------------------------------------------
4 * Logical Replication output plugin
6 * Copyright (c) 2012-2017, PostgreSQL Global Development Group
9 * src/backend/replication/pgoutput/pgoutput.c
11 *-------------------------------------------------------------------------
15 #include "catalog/pg_publication.h"
17 #include "replication/logical.h"
18 #include "replication/logicalproto.h"
19 #include "replication/origin.h"
20 #include "replication/pgoutput.h"
22 #include "utils/inval.h"
23 #include "utils/int8.h"
24 #include "utils/memutils.h"
25 #include "utils/syscache.h"
26 #include "utils/varlena.h"
30 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
32 static void pgoutput_startup(LogicalDecodingContext *ctx,
33 OutputPluginOptions *opt, bool is_init);
34 static void pgoutput_shutdown(LogicalDecodingContext *ctx);
35 static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
36 ReorderBufferTXN *txn);
37 static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
38 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
39 static void pgoutput_change(LogicalDecodingContext *ctx,
40 ReorderBufferTXN *txn, Relation rel,
41 ReorderBufferChange *change);
42 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
43 RepOriginId origin_id);
45 static bool publications_valid;
47 static List *LoadPublications(List *pubnames);
48 static void publication_invalidation_cb(Datum arg, int cacheid,
51 /* Entry in the map used to remember which relation schemas we sent. */
52 typedef struct RelationSyncEntry
54 Oid relid; /* relation oid */
55 bool schema_sent; /* did we send the schema? */
57 PublicationActions pubactions;
60 /* Map used to remember which relation schemas we sent. */
61 static HTAB *RelationSyncCache = NULL;
63 static void init_rel_sync_cache(MemoryContext decoding_context);
64 static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
65 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
66 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
70 * Specify output plugin callbacks
73 _PG_output_plugin_init(OutputPluginCallbacks *cb)
75 AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
77 cb->startup_cb = pgoutput_startup;
78 cb->begin_cb = pgoutput_begin_txn;
79 cb->change_cb = pgoutput_change;
80 cb->commit_cb = pgoutput_commit_txn;
81 cb->filter_by_origin_cb = pgoutput_origin_filter;
82 cb->shutdown_cb = pgoutput_shutdown;
86 parse_output_parameters(List *options, uint32 *protocol_version,
87 List **publication_names)
90 bool protocol_version_given = false;
91 bool publication_names_given = false;
95 DefElem *defel = (DefElem *) lfirst(lc);
97 Assert(defel->arg == NULL || IsA(defel->arg, String));
99 /* Check each param, whether or not we recognize it */
100 if (strcmp(defel->defname, "proto_version") == 0)
104 if (protocol_version_given)
106 (errcode(ERRCODE_SYNTAX_ERROR),
107 errmsg("conflicting or redundant options")));
108 protocol_version_given = true;
110 if (!scanint8(strVal(defel->arg), true, &parsed))
112 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
113 errmsg("invalid proto_version")));
115 if (parsed > PG_UINT32_MAX || parsed < 0)
117 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
118 errmsg("proto_verson \"%s\" out of range",
119 strVal(defel->arg))));
121 *protocol_version = (uint32) parsed;
123 else if (strcmp(defel->defname, "publication_names") == 0)
125 if (publication_names_given)
127 (errcode(ERRCODE_SYNTAX_ERROR),
128 errmsg("conflicting or redundant options")));
129 publication_names_given = true;
131 if (!SplitIdentifierString(strVal(defel->arg), ',',
134 (errcode(ERRCODE_INVALID_NAME),
135 errmsg("invalid publication_names syntax")));
138 elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
143 * Initialize this plugin
146 pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
149 PGOutputData *data = palloc0(sizeof(PGOutputData));
151 /* Create our memory context for private allocations. */
152 data->context = AllocSetContextCreate(ctx->context,
153 "logical replication output context",
154 ALLOCSET_DEFAULT_MINSIZE,
155 ALLOCSET_DEFAULT_INITSIZE,
156 ALLOCSET_DEFAULT_MAXSIZE);
158 ctx->output_plugin_private = data;
160 /* This plugin uses binary protocol. */
161 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
164 * This is replication start and not slot initialization.
166 * Parse and validate options passed by the client.
170 /* Parse the params and ERROR if we see any we don't recognize */
171 parse_output_parameters(ctx->output_plugin_options,
172 &data->protocol_version,
173 &data->publication_names);
175 /* Check if we support requested protocol */
176 if (data->protocol_version != LOGICALREP_PROTO_VERSION_NUM)
178 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
179 errmsg("client sent proto_version=%d but we only support protocol %d or lower",
180 data->protocol_version, LOGICALREP_PROTO_VERSION_NUM)));
182 if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
184 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
185 errmsg("client sent proto_version=%d but we only support protocol %d or higher",
186 data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
188 if (list_length(data->publication_names) < 1)
190 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
191 errmsg("publication_names parameter missing")));
193 /* Init publication state. */
194 data->publications = NIL;
195 publications_valid = false;
196 CacheRegisterSyscacheCallback(PUBLICATIONOID,
197 publication_invalidation_cb,
200 /* Initialize relation schema cache. */
201 init_rel_sync_cache(CacheMemoryContext);
209 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
211 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
213 OutputPluginPrepareWrite(ctx, !send_replication_origin);
214 logicalrep_write_begin(ctx->out, txn);
216 if (send_replication_origin)
220 /* Message boundary */
221 OutputPluginWrite(ctx, false);
222 OutputPluginPrepareWrite(ctx, true);
225 * XXX: which behaviour do we want here?
228 * - don't send origin message if origin name not found
229 * (that's what we do now)
230 * - throw error - that will break replication, not good
231 * - send some special "unknown" origin
234 if (replorigin_by_oid(txn->origin_id, true, &origin))
235 logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
238 OutputPluginWrite(ctx, true);
245 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
246 XLogRecPtr commit_lsn)
248 OutputPluginUpdateProgress(ctx);
250 OutputPluginPrepareWrite(ctx, true);
251 logicalrep_write_commit(ctx->out, txn, commit_lsn);
252 OutputPluginWrite(ctx, true);
256 * Sends the decoded DML over wire.
259 pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
260 Relation relation, ReorderBufferChange *change)
262 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
264 RelationSyncEntry *relentry;
266 relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
268 /* First check the table filter */
269 switch (change->action)
271 case REORDER_BUFFER_CHANGE_INSERT:
272 if (!relentry->pubactions.pubinsert)
275 case REORDER_BUFFER_CHANGE_UPDATE:
276 if (!relentry->pubactions.pubupdate)
279 case REORDER_BUFFER_CHANGE_DELETE:
280 if (!relentry->pubactions.pubdelete)
287 /* Avoid leaking memory by using and resetting our own context */
288 old = MemoryContextSwitchTo(data->context);
291 * Write the relation schema if the current schema haven't been sent yet.
293 if (!relentry->schema_sent)
298 desc = RelationGetDescr(relation);
301 * Write out type info if needed. We do that only for user created
304 for (i = 0; i < desc->natts; i++)
306 Form_pg_attribute att = desc->attrs[i];
308 if (att->attisdropped)
311 if (att->atttypid < FirstNormalObjectId)
314 OutputPluginPrepareWrite(ctx, false);
315 logicalrep_write_typ(ctx->out, att->atttypid);
316 OutputPluginWrite(ctx, false);
319 OutputPluginPrepareWrite(ctx, false);
320 logicalrep_write_rel(ctx->out, relation);
321 OutputPluginWrite(ctx, false);
322 relentry->schema_sent = true;
326 switch (change->action)
328 case REORDER_BUFFER_CHANGE_INSERT:
329 OutputPluginPrepareWrite(ctx, true);
330 logicalrep_write_insert(ctx->out, relation,
331 &change->data.tp.newtuple->tuple);
332 OutputPluginWrite(ctx, true);
334 case REORDER_BUFFER_CHANGE_UPDATE:
336 HeapTuple oldtuple = change->data.tp.oldtuple ?
337 &change->data.tp.oldtuple->tuple : NULL;
339 OutputPluginPrepareWrite(ctx, true);
340 logicalrep_write_update(ctx->out, relation, oldtuple,
341 &change->data.tp.newtuple->tuple);
342 OutputPluginWrite(ctx, true);
345 case REORDER_BUFFER_CHANGE_DELETE:
346 if (change->data.tp.oldtuple)
348 OutputPluginPrepareWrite(ctx, true);
349 logicalrep_write_delete(ctx->out, relation,
350 &change->data.tp.oldtuple->tuple);
351 OutputPluginWrite(ctx, true);
354 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
361 MemoryContextSwitchTo(old);
362 MemoryContextReset(data->context);
366 * Currently we always forward.
369 pgoutput_origin_filter(LogicalDecodingContext *ctx,
370 RepOriginId origin_id)
376 * Shutdown the output plugin.
378 * Note, we don't need to clean the data->context as it's child context
379 * of the ctx->context so it will be cleaned up by logical decoding machinery.
382 pgoutput_shutdown(LogicalDecodingContext *ctx)
384 if (RelationSyncCache)
386 hash_destroy(RelationSyncCache);
387 RelationSyncCache = NULL;
392 * Load publications from the list of publication names.
395 LoadPublications(List *pubnames)
400 foreach(lc, pubnames)
402 char *pubname = (char *) lfirst(lc);
403 Publication *pub = GetPublicationByName(pubname, false);
405 result = lappend(result, pub);
412 * Publication cache invalidation callback.
415 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
417 publications_valid = false;
420 * Also invalidate per-relation cache so that next time the filtering info
421 * is checked it will be updated with the new publication settings.
423 rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
427 * Initialize the relation schema sync cache for a decoding session.
429 * The hash table is destroyed at the end of a decoding session. While
430 * relcache invalidations still exist and will still be invoked, they
431 * will just see the null hash table global and take no action.
434 init_rel_sync_cache(MemoryContext cachectx)
437 MemoryContext old_ctxt;
439 if (RelationSyncCache != NULL)
442 /* Make a new hash table for the cache */
443 MemSet(&ctl, 0, sizeof(ctl));
444 ctl.keysize = sizeof(Oid);
445 ctl.entrysize = sizeof(RelationSyncEntry);
448 old_ctxt = MemoryContextSwitchTo(cachectx);
449 RelationSyncCache = hash_create("logical replication output relation cache",
451 HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
452 (void) MemoryContextSwitchTo(old_ctxt);
454 Assert(RelationSyncCache != NULL);
456 CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
457 CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
458 rel_sync_cache_publication_cb,
463 * Find or create entry in the relation schema cache.
465 static RelationSyncEntry *
466 get_rel_sync_entry(PGOutputData *data, Oid relid)
468 RelationSyncEntry *entry;
470 MemoryContext oldctx;
472 Assert(RelationSyncCache != NULL);
474 /* Find cached function info, creating if not found */
475 oldctx = MemoryContextSwitchTo(CacheMemoryContext);
476 entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
479 MemoryContextSwitchTo(oldctx);
480 Assert(entry != NULL);
482 /* Not found means schema wasn't sent */
483 if (!found || !entry->replicate_valid)
485 List *pubids = GetRelationPublications(relid);
488 /* Reload publications if needed before use. */
489 if (!publications_valid)
491 oldctx = MemoryContextSwitchTo(CacheMemoryContext);
492 if (data->publications)
493 list_free_deep(data->publications);
495 data->publications = LoadPublications(data->publication_names);
496 MemoryContextSwitchTo(oldctx);
497 publications_valid = true;
501 * Build publication cache. We can't use one provided by relcache as
502 * relcache considers all publications given relation is in, but here
503 * we only need to consider ones that the subscriber requested.
505 entry->pubactions.pubinsert = entry->pubactions.pubupdate =
506 entry->pubactions.pubdelete = false;
508 foreach(lc, data->publications)
510 Publication *pub = lfirst(lc);
512 if (pub->alltables || list_member_oid(pubids, pub->oid))
514 entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
515 entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
516 entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
519 if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
520 entry->pubactions.pubdelete)
526 entry->replicate_valid = true;
530 entry->schema_sent = false;
536 * Relcache invalidation callback
539 rel_sync_cache_relation_cb(Datum arg, Oid relid)
541 RelationSyncEntry *entry;
544 * We can get here if the plugin was used in SQL interface as the
545 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
546 * is no way to unregister the relcache invalidation callback.
548 if (RelationSyncCache == NULL)
552 * Nobody keeps pointers to entries in this hash table around outside
553 * logical decoding callback calls - but invalidation events can come in
554 * *during* a callback if we access the relcache in the callback. Because
555 * of that we must mark the cache entry as invalid but not remove it from
556 * the hash while it could still be referenced, then prune it at a later
559 * Getting invalidations for relations that aren't in the table is
560 * entirely normal, since there's no way to unregister for an invalidation
561 * event. So we don't care if it's found or not.
563 entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
567 * Reset schema sent status as the relation definition may have changed.
570 entry->schema_sent = false;
574 * Publication relation map syscache invalidation callback
577 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
579 HASH_SEQ_STATUS status;
580 RelationSyncEntry *entry;
583 * We can get here if the plugin was used in SQL interface as the
584 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
585 * is no way to unregister the relcache invalidation callback.
587 if (RelationSyncCache == NULL)
591 * There is no way to find which entry in our cache the hash belongs to so
592 * mark the whole cache as invalid.
594 hash_seq_init(&status, RelationSyncCache);
595 while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
596 entry->replicate_valid = false;