]> granicus.if.org Git - postgresql/blobdiff - src/backend/replication/pgoutput/pgoutput.c
Post-PG 10 beta1 pgindent run
[postgresql] / src / backend / replication / pgoutput / pgoutput.c
index f3eaccffd5b8b8be0bcad857b6a93b2befa9a145..5bdfa60ae74044e4ad186c29fcc7d34c25595278 100644 (file)
@@ -29,31 +29,31 @@ PG_MODULE_MAGIC;
 
 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
 
-static void pgoutput_startup(LogicalDecodingContext * ctx,
-                                                         OutputPluginOptions *opt, bool is_init);
-static void pgoutput_shutdown(LogicalDecodingContext * ctx);
+static void pgoutput_startup(LogicalDecodingContext *ctx,
+                                OutputPluginOptions *opt, bool is_init);
+static void pgoutput_shutdown(LogicalDecodingContext *ctx);
 static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
-                                       ReorderBufferTXN *txn);
+                                  ReorderBufferTXN *txn);
 static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
-                                        ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+                                       ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
 static void pgoutput_change(LogicalDecodingContext *ctx,
-                                ReorderBufferTXN *txn, Relation rel,
-                                ReorderBufferChange *change);
+                               ReorderBufferTXN *txn, Relation rel,
+                               ReorderBufferChange *change);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
-                                               RepOriginId origin_id);
+                                          RepOriginId origin_id);
 
 static bool publications_valid;
 
 static List *LoadPublications(List *pubnames);
 static void publication_invalidation_cb(Datum arg, int cacheid,
-                                                                               uint32 hashvalue);
+                                                       uint32 hashvalue);
 
 /* Entry in the map used to remember which relation schemas we sent. */
 typedef struct RelationSyncEntry
 {
-       Oid             relid;                  /* relation oid */
-       bool    schema_sent;    /* did we send the schema? */
-       bool    replicate_valid;
+       Oid                     relid;                  /* relation oid */
+       bool            schema_sent;    /* did we send the schema? */
+       bool            replicate_valid;
        PublicationActions pubactions;
 } RelationSyncEntry;
 
@@ -64,7 +64,7 @@ static void init_rel_sync_cache(MemoryContext decoding_context);
 static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
-                                                                                 uint32 hashvalue);
+                                                         uint32 hashvalue);
 
 /*
  * Specify output plugin callbacks
@@ -130,9 +130,9 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 
                        if (!SplitIdentifierString(strVal(defel->arg), ',',
                                                                           publication_names))
-                                       ereport(ERROR,
-                                                       (errcode(ERRCODE_INVALID_NAME),
-                                                        errmsg("invalid publication_names syntax")));
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_INVALID_NAME),
+                                                errmsg("invalid publication_names syntax")));
                }
                else
                        elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
@@ -143,14 +143,14 @@ parse_output_parameters(List *options, uint32 *protocol_version,
  * Initialize this plugin
  */
 static void
-pgoutput_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
-                                 bool is_init)
+pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
+                                bool is_init)
 {
-       PGOutputData   *data = palloc0(sizeof(PGOutputData));
+       PGOutputData *data = palloc0(sizeof(PGOutputData));
 
        /* Create our memory context for private allocations. */
        data->context = AllocSetContextCreate(ctx->context,
-                                                                                 "logical replication output context",
+                                                                               "logical replication output context",
                                                                                  ALLOCSET_DEFAULT_MINSIZE,
                                                                                  ALLOCSET_DEFAULT_INITSIZE,
                                                                                  ALLOCSET_DEFAULT_MAXSIZE);
@@ -175,15 +175,15 @@ pgoutput_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
                /* Check if we support requested protocol */
                if (data->protocol_version != LOGICALREP_PROTO_VERSION_NUM)
                        ereport(ERROR,
-                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                errmsg("client sent proto_version=%d but we only support protocol %d or lower",
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                        errmsg("client sent proto_version=%d but we only support protocol %d or lower",
                                         data->protocol_version, LOGICALREP_PROTO_VERSION_NUM)));
 
                if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
                        ereport(ERROR,
-                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                errmsg("client sent proto_version=%d but we only support protocol %d or higher",
-                                  data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                        errmsg("client sent proto_version=%d but we only support protocol %d or higher",
+                                data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
 
                if (list_length(data->publication_names) < 1)
                        ereport(ERROR,
@@ -208,27 +208,28 @@ pgoutput_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
-       bool    send_replication_origin = txn->origin_id != InvalidRepOriginId;
+       bool            send_replication_origin = txn->origin_id != InvalidRepOriginId;
 
        OutputPluginPrepareWrite(ctx, !send_replication_origin);
        logicalrep_write_begin(ctx->out, txn);
 
        if (send_replication_origin)
        {
-               char *origin;
+               char       *origin;
 
                /* Message boundary */
                OutputPluginWrite(ctx, false);
                OutputPluginPrepareWrite(ctx, true);
 
-               /*
-                * XXX: which behaviour we want here?
+               /*----------
+                * XXX: which behaviour do we want here?
                 *
                 * Alternatives:
-                *  - don't send origin message if origin name not found
-                *    (that's what we do now)
-                *  - throw error - that will break replication, not good
-                *  - send some special "unknown" origin
+                *      - don't send origin message if origin name not found
+                *        (that's what we do now)
+                *      - throw error - that will break replication, not good
+                *      - send some special "unknown" origin
+                *----------
                 */
                if (replorigin_by_oid(txn->origin_id, true, &origin))
                        logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
@@ -242,8 +243,10 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
  */
 static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
-                                        XLogRecPtr commit_lsn)
+                                       XLogRecPtr commit_lsn)
 {
+       OutputPluginUpdateProgress(ctx);
+
        OutputPluginPrepareWrite(ctx, true);
        logicalrep_write_commit(ctx->out, txn, commit_lsn);
        OutputPluginWrite(ctx, true);
@@ -256,9 +259,9 @@ static void
 pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                Relation relation, ReorderBufferChange *change)
 {
-       PGOutputData       *data = (PGOutputData *) ctx->output_plugin_private;
-       MemoryContext           old;
-       RelationSyncEntry  *relentry;
+       PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+       MemoryContext old;
+       RelationSyncEntry *relentry;
 
        relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
 
@@ -330,8 +333,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                        break;
                case REORDER_BUFFER_CHANGE_UPDATE:
                        {
-                               HeapTuple oldtuple = change->data.tp.oldtuple ?
-                                       &change->data.tp.oldtuple->tuple : NULL;
+                               HeapTuple       oldtuple = change->data.tp.oldtuple ?
+                               &change->data.tp.oldtuple->tuple : NULL;
 
                                OutputPluginPrepareWrite(ctx, true);
                                logicalrep_write_update(ctx->out, relation, oldtuple,
@@ -364,7 +367,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
  */
 static bool
 pgoutput_origin_filter(LogicalDecodingContext *ctx,
-                                               RepOriginId origin_id)
+                                          RepOriginId origin_id)
 {
        return false;
 }
@@ -376,7 +379,7 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
  * of the ctx->context so it will be cleaned up by logical decoding machinery.
  */
 static void
-pgoutput_shutdown(LogicalDecodingContext * ctx)
+pgoutput_shutdown(LogicalDecodingContext *ctx)
 {
        if (RelationSyncCache)
        {
@@ -394,10 +397,10 @@ LoadPublications(List *pubnames)
        List       *result = NIL;
        ListCell   *lc;
 
-       foreach (lc, pubnames)
+       foreach(lc, pubnames)
        {
-               char               *pubname = (char *) lfirst(lc);
-               Publication        *pub = GetPublicationByName(pubname, false);
+               char       *pubname = (char *) lfirst(lc);
+               Publication *pub = GetPublicationByName(pubname, false);
 
                result = lappend(result, pub);
        }
@@ -414,9 +417,8 @@ publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
        publications_valid = false;
 
        /*
-        * Also invalidate per-relation cache so that next time the filtering
-        * info is checked it will be updated with the new publication
-        * settings.
+        * Also invalidate per-relation cache so that next time the filtering info
+        * is checked it will be updated with the new publication settings.
         */
        rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
 }
@@ -431,7 +433,7 @@ publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
 static void
 init_rel_sync_cache(MemoryContext cachectx)
 {
-       HASHCTL ctl;
+       HASHCTL         ctl;
        MemoryContext old_ctxt;
 
        if (RelationSyncCache != NULL)
@@ -463,9 +465,9 @@ init_rel_sync_cache(MemoryContext cachectx)
 static RelationSyncEntry *
 get_rel_sync_entry(PGOutputData *data, Oid relid)
 {
-       RelationSyncEntry  *entry;
-       bool                            found;
-       MemoryContext           oldctx;
+       RelationSyncEntry *entry;
+       bool            found;
+       MemoryContext oldctx;
 
        Assert(RelationSyncCache != NULL);
 
@@ -496,9 +498,9 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
                }
 
                /*
-                * Build publication cache. We can't use one provided by relcache
-                * as relcache considers all publications given relation is in, but
-                * here we only need to consider ones that the subscriber requested.
+                * Build publication cache. We can't use one provided by relcache as
+                * relcache considers all publications given relation is in, but here
+                * we only need to consider ones that the subscriber requested.
                 */
                entry->pubactions.pubinsert = entry->pubactions.pubupdate =
                        entry->pubactions.pubdelete = false;
@@ -536,7 +538,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 static void
 rel_sync_cache_relation_cb(Datum arg, Oid relid)
 {
-       RelationSyncEntry  *entry;
+       RelationSyncEntry *entry;
 
        /*
         * We can get here if the plugin was used in SQL interface as the
@@ -555,15 +557,14 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
         * safe point.
         *
         * Getting invalidations for relations that aren't in the table is
-        * entirely normal, since there's no way to unregister for an
-        * invalidation event. So we don't care if it's found or not.
+        * entirely normal, since there's no way to unregister for an invalidation
+        * event. So we don't care if it's found or not.
         */
        entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
                                                                                          HASH_FIND, NULL);
 
        /*
-        * Reset schema sent status as the relation definition may have
-        * changed.
+        * Reset schema sent status as the relation definition may have changed.
         */
        if (entry != NULL)
                entry->schema_sent = false;
@@ -575,8 +576,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 static void
 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
-       HASH_SEQ_STATUS         status;
-       RelationSyncEntry  *entry;
+       HASH_SEQ_STATUS status;
+       RelationSyncEntry *entry;
 
        /*
         * We can get here if the plugin was used in SQL interface as the
@@ -587,8 +588,8 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
                return;
 
        /*
-        * There is no way to find which entry in our cache the hash belongs to
-        * so mark the whole cache as invalid.
+        * There is no way to find which entry in our cache the hash belongs to so
+        * mark the whole cache as invalid.
         */
        hash_seq_init(&status, RelationSyncCache);
        while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)