]> granicus.if.org Git - postgresql/blob - src/backend/replication/pgoutput/pgoutput.c
Post-PG 10 beta1 pgindent run
[postgresql] / src / backend / replication / pgoutput / pgoutput.c
1 /*-------------------------------------------------------------------------
2  *
3  * pgoutput.c
4  *              Logical Replication output plugin
5  *
6  * Copyright (c) 2012-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *                src/backend/replication/pgoutput/pgoutput.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14
15 #include "catalog/pg_publication.h"
16
17 #include "replication/logical.h"
18 #include "replication/logicalproto.h"
19 #include "replication/origin.h"
20 #include "replication/pgoutput.h"
21
22 #include "utils/inval.h"
23 #include "utils/int8.h"
24 #include "utils/memutils.h"
25 #include "utils/syscache.h"
26 #include "utils/varlena.h"
27
28 PG_MODULE_MAGIC;
29
30 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
31
32 static void pgoutput_startup(LogicalDecodingContext *ctx,
33                                  OutputPluginOptions *opt, bool is_init);
34 static void pgoutput_shutdown(LogicalDecodingContext *ctx);
35 static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
36                                    ReorderBufferTXN *txn);
37 static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
38                                         ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
39 static void pgoutput_change(LogicalDecodingContext *ctx,
40                                 ReorderBufferTXN *txn, Relation rel,
41                                 ReorderBufferChange *change);
42 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
43                                            RepOriginId origin_id);
44
45 static bool publications_valid;
46
47 static List *LoadPublications(List *pubnames);
48 static void publication_invalidation_cb(Datum arg, int cacheid,
49                                                         uint32 hashvalue);
50
51 /* Entry in the map used to remember which relation schemas we sent. */
52 typedef struct RelationSyncEntry
53 {
54         Oid                     relid;                  /* relation oid */
55         bool            schema_sent;    /* did we send the schema? */
56         bool            replicate_valid;
57         PublicationActions pubactions;
58 } RelationSyncEntry;
59
60 /* Map used to remember which relation schemas we sent. */
61 static HTAB *RelationSyncCache = NULL;
62
63 static void init_rel_sync_cache(MemoryContext decoding_context);
64 static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
65 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
66 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
67                                                           uint32 hashvalue);
68
69 /*
70  * Specify output plugin callbacks
71  */
72 void
73 _PG_output_plugin_init(OutputPluginCallbacks *cb)
74 {
75         AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
76
77         cb->startup_cb = pgoutput_startup;
78         cb->begin_cb = pgoutput_begin_txn;
79         cb->change_cb = pgoutput_change;
80         cb->commit_cb = pgoutput_commit_txn;
81         cb->filter_by_origin_cb = pgoutput_origin_filter;
82         cb->shutdown_cb = pgoutput_shutdown;
83 }
84
85 static void
86 parse_output_parameters(List *options, uint32 *protocol_version,
87                                                 List **publication_names)
88 {
89         ListCell   *lc;
90         bool            protocol_version_given = false;
91         bool            publication_names_given = false;
92
93         foreach(lc, options)
94         {
95                 DefElem    *defel = (DefElem *) lfirst(lc);
96
97                 Assert(defel->arg == NULL || IsA(defel->arg, String));
98
99                 /* Check each param, whether or not we recognize it */
100                 if (strcmp(defel->defname, "proto_version") == 0)
101                 {
102                         int64           parsed;
103
104                         if (protocol_version_given)
105                                 ereport(ERROR,
106                                                 (errcode(ERRCODE_SYNTAX_ERROR),
107                                                  errmsg("conflicting or redundant options")));
108                         protocol_version_given = true;
109
110                         if (!scanint8(strVal(defel->arg), true, &parsed))
111                                 ereport(ERROR,
112                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
113                                                  errmsg("invalid proto_version")));
114
115                         if (parsed > PG_UINT32_MAX || parsed < 0)
116                                 ereport(ERROR,
117                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
118                                                  errmsg("proto_verson \"%s\" out of range",
119                                                                 strVal(defel->arg))));
120
121                         *protocol_version = (uint32) parsed;
122                 }
123                 else if (strcmp(defel->defname, "publication_names") == 0)
124                 {
125                         if (publication_names_given)
126                                 ereport(ERROR,
127                                                 (errcode(ERRCODE_SYNTAX_ERROR),
128                                                  errmsg("conflicting or redundant options")));
129                         publication_names_given = true;
130
131                         if (!SplitIdentifierString(strVal(defel->arg), ',',
132                                                                            publication_names))
133                                 ereport(ERROR,
134                                                 (errcode(ERRCODE_INVALID_NAME),
135                                                  errmsg("invalid publication_names syntax")));
136                 }
137                 else
138                         elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
139         }
140 }
141
142 /*
143  * Initialize this plugin
144  */
145 static void
146 pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
147                                  bool is_init)
148 {
149         PGOutputData *data = palloc0(sizeof(PGOutputData));
150
151         /* Create our memory context for private allocations. */
152         data->context = AllocSetContextCreate(ctx->context,
153                                                                                 "logical replication output context",
154                                                                                   ALLOCSET_DEFAULT_MINSIZE,
155                                                                                   ALLOCSET_DEFAULT_INITSIZE,
156                                                                                   ALLOCSET_DEFAULT_MAXSIZE);
157
158         ctx->output_plugin_private = data;
159
160         /* This plugin uses binary protocol. */
161         opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
162
163         /*
164          * This is replication start and not slot initialization.
165          *
166          * Parse and validate options passed by the client.
167          */
168         if (!is_init)
169         {
170                 /* Parse the params and ERROR if we see any we don't recognize */
171                 parse_output_parameters(ctx->output_plugin_options,
172                                                                 &data->protocol_version,
173                                                                 &data->publication_names);
174
175                 /* Check if we support requested protocol */
176                 if (data->protocol_version != LOGICALREP_PROTO_VERSION_NUM)
177                         ereport(ERROR,
178                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
179                                          errmsg("client sent proto_version=%d but we only support protocol %d or lower",
180                                          data->protocol_version, LOGICALREP_PROTO_VERSION_NUM)));
181
182                 if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
183                         ereport(ERROR,
184                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
185                                          errmsg("client sent proto_version=%d but we only support protocol %d or higher",
186                                  data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
187
188                 if (list_length(data->publication_names) < 1)
189                         ereport(ERROR,
190                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
191                                          errmsg("publication_names parameter missing")));
192
193                 /* Init publication state. */
194                 data->publications = NIL;
195                 publications_valid = false;
196                 CacheRegisterSyscacheCallback(PUBLICATIONOID,
197                                                                           publication_invalidation_cb,
198                                                                           (Datum) 0);
199
200                 /* Initialize relation schema cache. */
201                 init_rel_sync_cache(CacheMemoryContext);
202         }
203 }
204
205 /*
206  * BEGIN callback
207  */
208 static void
209 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
210 {
211         bool            send_replication_origin = txn->origin_id != InvalidRepOriginId;
212
213         OutputPluginPrepareWrite(ctx, !send_replication_origin);
214         logicalrep_write_begin(ctx->out, txn);
215
216         if (send_replication_origin)
217         {
218                 char       *origin;
219
220                 /* Message boundary */
221                 OutputPluginWrite(ctx, false);
222                 OutputPluginPrepareWrite(ctx, true);
223
224                 /*----------
225                  * XXX: which behaviour do we want here?
226                  *
227                  * Alternatives:
228                  *      - don't send origin message if origin name not found
229                  *        (that's what we do now)
230                  *      - throw error - that will break replication, not good
231                  *      - send some special "unknown" origin
232                  *----------
233                  */
234                 if (replorigin_by_oid(txn->origin_id, true, &origin))
235                         logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
236         }
237
238         OutputPluginWrite(ctx, true);
239 }
240
241 /*
242  * COMMIT callback
243  */
244 static void
245 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
246                                         XLogRecPtr commit_lsn)
247 {
248         OutputPluginUpdateProgress(ctx);
249
250         OutputPluginPrepareWrite(ctx, true);
251         logicalrep_write_commit(ctx->out, txn, commit_lsn);
252         OutputPluginWrite(ctx, true);
253 }
254
255 /*
256  * Sends the decoded DML over wire.
257  */
258 static void
259 pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
260                                 Relation relation, ReorderBufferChange *change)
261 {
262         PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
263         MemoryContext old;
264         RelationSyncEntry *relentry;
265
266         relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
267
268         /* First check the table filter */
269         switch (change->action)
270         {
271                 case REORDER_BUFFER_CHANGE_INSERT:
272                         if (!relentry->pubactions.pubinsert)
273                                 return;
274                         break;
275                 case REORDER_BUFFER_CHANGE_UPDATE:
276                         if (!relentry->pubactions.pubupdate)
277                                 return;
278                         break;
279                 case REORDER_BUFFER_CHANGE_DELETE:
280                         if (!relentry->pubactions.pubdelete)
281                                 return;
282                         break;
283                 default:
284                         Assert(false);
285         }
286
287         /* Avoid leaking memory by using and resetting our own context */
288         old = MemoryContextSwitchTo(data->context);
289
290         /*
291          * Write the relation schema if the current schema haven't been sent yet.
292          */
293         if (!relentry->schema_sent)
294         {
295                 TupleDesc       desc;
296                 int                     i;
297
298                 desc = RelationGetDescr(relation);
299
300                 /*
301                  * Write out type info if needed. We do that only for user created
302                  * types.
303                  */
304                 for (i = 0; i < desc->natts; i++)
305                 {
306                         Form_pg_attribute att = desc->attrs[i];
307
308                         if (att->attisdropped)
309                                 continue;
310
311                         if (att->atttypid < FirstNormalObjectId)
312                                 continue;
313
314                         OutputPluginPrepareWrite(ctx, false);
315                         logicalrep_write_typ(ctx->out, att->atttypid);
316                         OutputPluginWrite(ctx, false);
317                 }
318
319                 OutputPluginPrepareWrite(ctx, false);
320                 logicalrep_write_rel(ctx->out, relation);
321                 OutputPluginWrite(ctx, false);
322                 relentry->schema_sent = true;
323         }
324
325         /* Send the data */
326         switch (change->action)
327         {
328                 case REORDER_BUFFER_CHANGE_INSERT:
329                         OutputPluginPrepareWrite(ctx, true);
330                         logicalrep_write_insert(ctx->out, relation,
331                                                                         &change->data.tp.newtuple->tuple);
332                         OutputPluginWrite(ctx, true);
333                         break;
334                 case REORDER_BUFFER_CHANGE_UPDATE:
335                         {
336                                 HeapTuple       oldtuple = change->data.tp.oldtuple ?
337                                 &change->data.tp.oldtuple->tuple : NULL;
338
339                                 OutputPluginPrepareWrite(ctx, true);
340                                 logicalrep_write_update(ctx->out, relation, oldtuple,
341                                                                                 &change->data.tp.newtuple->tuple);
342                                 OutputPluginWrite(ctx, true);
343                                 break;
344                         }
345                 case REORDER_BUFFER_CHANGE_DELETE:
346                         if (change->data.tp.oldtuple)
347                         {
348                                 OutputPluginPrepareWrite(ctx, true);
349                                 logicalrep_write_delete(ctx->out, relation,
350                                                                                 &change->data.tp.oldtuple->tuple);
351                                 OutputPluginWrite(ctx, true);
352                         }
353                         else
354                                 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
355                         break;
356                 default:
357                         Assert(false);
358         }
359
360         /* Cleanup */
361         MemoryContextSwitchTo(old);
362         MemoryContextReset(data->context);
363 }
364
365 /*
366  * Currently we always forward.
367  */
368 static bool
369 pgoutput_origin_filter(LogicalDecodingContext *ctx,
370                                            RepOriginId origin_id)
371 {
372         return false;
373 }
374
375 /*
376  * Shutdown the output plugin.
377  *
378  * Note, we don't need to clean the data->context as it's child context
379  * of the ctx->context so it will be cleaned up by logical decoding machinery.
380  */
381 static void
382 pgoutput_shutdown(LogicalDecodingContext *ctx)
383 {
384         if (RelationSyncCache)
385         {
386                 hash_destroy(RelationSyncCache);
387                 RelationSyncCache = NULL;
388         }
389 }
390
391 /*
392  * Load publications from the list of publication names.
393  */
394 static List *
395 LoadPublications(List *pubnames)
396 {
397         List       *result = NIL;
398         ListCell   *lc;
399
400         foreach(lc, pubnames)
401         {
402                 char       *pubname = (char *) lfirst(lc);
403                 Publication *pub = GetPublicationByName(pubname, false);
404
405                 result = lappend(result, pub);
406         }
407
408         return result;
409 }
410
411 /*
412  * Publication cache invalidation callback.
413  */
414 static void
415 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
416 {
417         publications_valid = false;
418
419         /*
420          * Also invalidate per-relation cache so that next time the filtering info
421          * is checked it will be updated with the new publication settings.
422          */
423         rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
424 }
425
426 /*
427  * Initialize the relation schema sync cache for a decoding session.
428  *
429  * The hash table is destroyed at the end of a decoding session. While
430  * relcache invalidations still exist and will still be invoked, they
431  * will just see the null hash table global and take no action.
432  */
433 static void
434 init_rel_sync_cache(MemoryContext cachectx)
435 {
436         HASHCTL         ctl;
437         MemoryContext old_ctxt;
438
439         if (RelationSyncCache != NULL)
440                 return;
441
442         /* Make a new hash table for the cache */
443         MemSet(&ctl, 0, sizeof(ctl));
444         ctl.keysize = sizeof(Oid);
445         ctl.entrysize = sizeof(RelationSyncEntry);
446         ctl.hcxt = cachectx;
447
448         old_ctxt = MemoryContextSwitchTo(cachectx);
449         RelationSyncCache = hash_create("logical replication output relation cache",
450                                                                         128, &ctl,
451                                                                         HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
452         (void) MemoryContextSwitchTo(old_ctxt);
453
454         Assert(RelationSyncCache != NULL);
455
456         CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
457         CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
458                                                                   rel_sync_cache_publication_cb,
459                                                                   (Datum) 0);
460 }
461
462 /*
463  * Find or create entry in the relation schema cache.
464  */
465 static RelationSyncEntry *
466 get_rel_sync_entry(PGOutputData *data, Oid relid)
467 {
468         RelationSyncEntry *entry;
469         bool            found;
470         MemoryContext oldctx;
471
472         Assert(RelationSyncCache != NULL);
473
474         /* Find cached function info, creating if not found */
475         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
476         entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
477                                                                                           (void *) &relid,
478                                                                                           HASH_ENTER, &found);
479         MemoryContextSwitchTo(oldctx);
480         Assert(entry != NULL);
481
482         /* Not found means schema wasn't sent */
483         if (!found || !entry->replicate_valid)
484         {
485                 List       *pubids = GetRelationPublications(relid);
486                 ListCell   *lc;
487
488                 /* Reload publications if needed before use. */
489                 if (!publications_valid)
490                 {
491                         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
492                         if (data->publications)
493                                 list_free_deep(data->publications);
494
495                         data->publications = LoadPublications(data->publication_names);
496                         MemoryContextSwitchTo(oldctx);
497                         publications_valid = true;
498                 }
499
500                 /*
501                  * Build publication cache. We can't use one provided by relcache as
502                  * relcache considers all publications given relation is in, but here
503                  * we only need to consider ones that the subscriber requested.
504                  */
505                 entry->pubactions.pubinsert = entry->pubactions.pubupdate =
506                         entry->pubactions.pubdelete = false;
507
508                 foreach(lc, data->publications)
509                 {
510                         Publication *pub = lfirst(lc);
511
512                         if (pub->alltables || list_member_oid(pubids, pub->oid))
513                         {
514                                 entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
515                                 entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
516                                 entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
517                         }
518
519                         if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
520                                 entry->pubactions.pubdelete)
521                                 break;
522                 }
523
524                 list_free(pubids);
525
526                 entry->replicate_valid = true;
527         }
528
529         if (!found)
530                 entry->schema_sent = false;
531
532         return entry;
533 }
534
535 /*
536  * Relcache invalidation callback
537  */
538 static void
539 rel_sync_cache_relation_cb(Datum arg, Oid relid)
540 {
541         RelationSyncEntry *entry;
542
543         /*
544          * We can get here if the plugin was used in SQL interface as the
545          * RelSchemaSyncCache is destroyed when the decoding finishes, but there
546          * is no way to unregister the relcache invalidation callback.
547          */
548         if (RelationSyncCache == NULL)
549                 return;
550
551         /*
552          * Nobody keeps pointers to entries in this hash table around outside
553          * logical decoding callback calls - but invalidation events can come in
554          * *during* a callback if we access the relcache in the callback. Because
555          * of that we must mark the cache entry as invalid but not remove it from
556          * the hash while it could still be referenced, then prune it at a later
557          * safe point.
558          *
559          * Getting invalidations for relations that aren't in the table is
560          * entirely normal, since there's no way to unregister for an invalidation
561          * event. So we don't care if it's found or not.
562          */
563         entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
564                                                                                           HASH_FIND, NULL);
565
566         /*
567          * Reset schema sent status as the relation definition may have changed.
568          */
569         if (entry != NULL)
570                 entry->schema_sent = false;
571 }
572
573 /*
574  * Publication relation map syscache invalidation callback
575  */
576 static void
577 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
578 {
579         HASH_SEQ_STATUS status;
580         RelationSyncEntry *entry;
581
582         /*
583          * We can get here if the plugin was used in SQL interface as the
584          * RelSchemaSyncCache is destroyed when the decoding finishes, but there
585          * is no way to unregister the relcache invalidation callback.
586          */
587         if (RelationSyncCache == NULL)
588                 return;
589
590         /*
591          * There is no way to find which entry in our cache the hash belongs to so
592          * mark the whole cache as invalid.
593          */
594         hash_seq_init(&status, RelationSyncCache);
595         while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
596                 entry->replicate_valid = false;
597 }