]> granicus.if.org Git - postgresql/commitdiff
Logical replication support for TRUNCATE
authorPeter Eisentraut <peter_e@gmx.net>
Sat, 7 Apr 2018 15:24:53 +0000 (11:24 -0400)
committerPeter Eisentraut <peter_e@gmx.net>
Sat, 7 Apr 2018 15:34:11 +0000 (11:34 -0400)
Update the built-in logical replication system to make use of the
previously added logical decoding for TRUNCATE support.  Add the
required truncate callback to pgoutput and a new logical replication
protocol message.

Publications get a new attribute to determine whether to replicate
truncate actions.  When updating a publication via pg_dump from an older
version, this is not set, thus preserving the previous behavior.

Author: Simon Riggs <simon@2ndquadrant.com>
Author: Marco Nenciarini <marco.nenciarini@2ndquadrant.it>
Author: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
19 files changed:
doc/src/sgml/catalogs.sgml
doc/src/sgml/logical-replication.sgml
doc/src/sgml/protocol.sgml
doc/src/sgml/ref/create_publication.sgml
src/backend/catalog/pg_publication.c
src/backend/commands/publicationcmds.c
src/backend/replication/logical/proto.c
src/backend/replication/logical/worker.c
src/backend/replication/pgoutput/pgoutput.c
src/backend/utils/cache/relcache.c
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/pg_dump.h
src/bin/pg_dump/t/002_pg_dump.pl
src/bin/psql/describe.c
src/include/catalog/catversion.h
src/include/catalog/pg_publication.h
src/include/replication/logicalproto.h
src/test/regress/expected/publication.out
src/test/subscription/t/010_truncate.pl [new file with mode: 0644]

index d6a9d8c5808ee71e16060f2f32e11a9169fe14a6..e8efa13e8df9cf0d05d338f470521d7868a75611 100644 (file)
@@ -5518,6 +5518,14 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       <entry>If true, <command>DELETE</command> operations are replicated for
        tables in the publication.</entry>
      </row>
+
+     <row>
+      <entry><structfield>pubtruncate</structfield></entry>
+      <entry><type>bool</type></entry>
+      <entry></entry>
+      <entry>If true, <command>TRUNCATE</command> operations are replicated for
+       tables in the publication.</entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
index 75551d8ee1a58d58f2f97101c677fca2a81b740f..151e773fc2cc8c9b55ef84135f252508d8098cf6 100644 (file)
 
   <para>
    Publications can choose to limit the changes they produce to
-   any combination of <command>INSERT</command>, <command>UPDATE</command>, and
-   <command>DELETE</command>, similar to how triggers are fired by
+   any combination of <command>INSERT</command>, <command>UPDATE</command>,
+   <command>DELETE</command>, and <command>TRUNCATE</command>, similar to how triggers are fired by
    particular event types.  By default, all operation types are replicated.
   </para>
 
     </para>
    </listitem>
 
-   <listitem>
-    <para>
-     <command>TRUNCATE</command> commands are not replicated.  This can, of
-     course, be worked around by using <command>DELETE</command> instead.  To
-     avoid accidental <command>TRUNCATE</command> invocations, you can revoke
-     the <literal>TRUNCATE</literal> privilege from tables.
-    </para>
-   </listitem>
-
    <listitem>
     <para>
      Large objects (see <xref linkend="largeobjects"/>) are not replicated.
index b94dd4ac65418e4754214512ad2c38dfc8691885..004b36084f19a925c958ee3e11de909585b5e267 100644 (file)
@@ -6774,6 +6774,62 @@ Delete
 </listitem>
 </varlistentry>
 
+<varlistentry>
+<term>
+Truncate
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+        Byte1('T')
+</term>
+<listitem>
+<para>
+                Identifies the message as a truncate message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int32
+</term>
+<listitem>
+<para>
+                Number of relations
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int8
+</term>
+<listitem>
+<para>
+                Option bits for <command>TRUNCATE</command>:
+                1 for <literal>CASCADE</literal>, 2 for <literal>RESTART IDENTITY</literal>
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int32
+</term>
+<listitem>
+<para>
+                ID of the relation corresponding to the ID in the relation
+                message.  This field is repeated for each relation.
+</para>
+</listitem>
+</varlistentry>
+
+</variablelist>
+</para>
+</listitem>
+</varlistentry>
+
 </variablelist>
 
 <para>
index bfe12d5f41002bf0b84db2bc3f54ce5650551a03..99f87ca39386949f39c4412820859d7470877116 100644 (file)
@@ -106,10 +106,11 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
           This parameter determines which DML operations will be published by
           the new publication to the subscribers.  The value is
           comma-separated list of operations.  The allowed operations are
-          <literal>insert</literal>, <literal>update</literal>, and
-          <literal>delete</literal>.  The default is to publish all actions,
+          <literal>insert</literal>, <literal>update</literal>,
+          <literal>delete</literal>, and <literal>truncate</literal>.
+          The default is to publish all actions,
           and so the default value for this option is
-          <literal>'insert, update, delete'</literal>.
+          <literal>'insert, update, delete, truncate'</literal>.
          </para>
         </listitem>
        </varlistentry>
@@ -168,8 +169,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
   </para>
 
   <para>
-   <command>TRUNCATE</command> and <acronym>DDL</acronym> operations
-   are not published.
+   <acronym>DDL</acronym> operations are not published.
   </para>
  </refsect1>
 
index ba18258ebb7ee989f281b55aeb3daad360560234..ec3bd1d22d22a399d54001d2a1d7ee3087f13bfc 100644 (file)
@@ -376,6 +376,7 @@ GetPublication(Oid pubid)
        pub->pubactions.pubinsert = pubform->pubinsert;
        pub->pubactions.pubupdate = pubform->pubupdate;
        pub->pubactions.pubdelete = pubform->pubdelete;
+       pub->pubactions.pubtruncate = pubform->pubtruncate;
 
        ReleaseSysCache(tup);
 
index 9c5aa9ebc25ba052b2869a73469804ba061be231..29992d4a0e2fedf9086e68843bdd7351dfa3bb07 100644 (file)
@@ -62,7 +62,8 @@ parse_publication_options(List *options,
                                                  bool *publish_given,
                                                  bool *publish_insert,
                                                  bool *publish_update,
-                                                 bool *publish_delete)
+                                                 bool *publish_delete,
+                                                 bool *publish_truncate)
 {
        ListCell   *lc;
 
@@ -72,6 +73,7 @@ parse_publication_options(List *options,
        *publish_insert = true;
        *publish_update = true;
        *publish_delete = true;
+       *publish_truncate = true;
 
        /* Parse options */
        foreach(lc, options)
@@ -96,6 +98,7 @@ parse_publication_options(List *options,
                        *publish_insert = false;
                        *publish_update = false;
                        *publish_delete = false;
+                       *publish_truncate = false;
 
                        *publish_given = true;
                        publish = defGetString(defel);
@@ -116,6 +119,8 @@ parse_publication_options(List *options,
                                        *publish_update = true;
                                else if (strcmp(publish_opt, "delete") == 0)
                                        *publish_delete = true;
+                               else if (strcmp(publish_opt, "truncate") == 0)
+                                       *publish_truncate = true;
                                else
                                        ereport(ERROR,
                                                        (errcode(ERRCODE_SYNTAX_ERROR),
@@ -145,6 +150,7 @@ CreatePublication(CreatePublicationStmt *stmt)
        bool            publish_insert;
        bool            publish_update;
        bool            publish_delete;
+       bool            publish_truncate;
        AclResult       aclresult;
 
        /* must have CREATE privilege on database */
@@ -181,7 +187,8 @@ CreatePublication(CreatePublicationStmt *stmt)
 
        parse_publication_options(stmt->options,
                                                          &publish_given, &publish_insert,
-                                                         &publish_update, &publish_delete);
+                                                         &publish_update, &publish_delete,
+                                                         &publish_truncate);
 
        values[Anum_pg_publication_puballtables - 1] =
                BoolGetDatum(stmt->for_all_tables);
@@ -191,6 +198,8 @@ CreatePublication(CreatePublicationStmt *stmt)
                BoolGetDatum(publish_update);
        values[Anum_pg_publication_pubdelete - 1] =
                BoolGetDatum(publish_delete);
+       values[Anum_pg_publication_pubtruncate - 1] =
+               BoolGetDatum(publish_truncate);
 
        tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -237,11 +246,13 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
        bool            publish_insert;
        bool            publish_update;
        bool            publish_delete;
+       bool            publish_truncate;
        ObjectAddress obj;
 
        parse_publication_options(stmt->options,
                                                          &publish_given, &publish_insert,
-                                                         &publish_update, &publish_delete);
+                                                         &publish_update, &publish_delete,
+                                                         &publish_truncate);
 
        /* Everything ok, form a new tuple. */
        memset(values, 0, sizeof(values));
@@ -258,6 +269,9 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
 
                values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
                replaces[Anum_pg_publication_pubdelete - 1] = true;
+
+               values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate);
+               replaces[Anum_pg_publication_pubtruncate - 1] = true;
        }
 
        tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
index 948343e4aeee87a9395255f08e621de38429fc45..edc97a7662b6b4ad4be530d6845aab93f85028de 100644 (file)
@@ -26,6 +26,9 @@
  */
 #define LOGICALREP_IS_REPLICA_IDENTITY 1
 
+#define TRUNCATE_CASCADE               (1<<0)
+#define TRUNCATE_RESTART_SEQS  (1<<1)
+
 static void logicalrep_write_attrs(StringInfo out, Relation rel);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
                                           HeapTuple tuple);
@@ -292,6 +295,58 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
        return relid;
 }
 
+/*
+ * Write TRUNCATE to the output stream.
+ */
+void
+logicalrep_write_truncate(StringInfo out,
+                                                 int nrelids,
+                                                 Oid relids[],
+                                                 bool cascade, bool restart_seqs)
+{
+       int                     i;
+       uint8 flags = 0;
+
+       pq_sendbyte(out, 'T');          /* action TRUNCATE */
+
+       pq_sendint32(out, nrelids);
+
+       /* encode and send truncate flags */
+       if (cascade)
+               flags |= TRUNCATE_CASCADE;
+       if (restart_seqs)
+               flags |= TRUNCATE_RESTART_SEQS;
+       pq_sendint8(out, flags);
+
+       for (i = 0; i < nrelids; i++)
+               pq_sendint32(out, relids[i]);
+}
+
+/*
+ * Read TRUNCATE from stream.
+ */
+List *
+logicalrep_read_truncate(StringInfo in,
+                                                bool *cascade, bool *restart_seqs)
+{
+       int                     i;
+       int                     nrelids;
+       List       *relids = NIL;
+       uint8 flags;
+
+       nrelids = pq_getmsgint(in, 4);
+
+       /* read and decode truncate flags */
+       flags = pq_getmsgint(in, 1);
+       *cascade = (flags & TRUNCATE_CASCADE) > 0;
+       *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
+
+       for (i = 0; i < nrelids; i++)
+               relids = lappend_oid(relids, pq_getmsgint(in, 4));
+
+       return relids;
+}
+
 /*
  * Write relation description to the output stream.
  */
index b10857550a6105ec509c607c5b38961d3006b358..aa7e27179e8b10f73370448cb4cc9fe5af5bb285 100644 (file)
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 
+#include "catalog/catalog.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 
+#include "commands/tablecmds.h"
 #include "commands/trigger.h"
 
 #include "executor/executor.h"
@@ -83,6 +85,7 @@
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/rel.h"
 #include "utils/timeout.h"
 #include "utils/tqual.h"
 #include "utils/syscache.h"
@@ -888,6 +891,67 @@ apply_handle_delete(StringInfo s)
        CommandCounterIncrement();
 }
 
+/*
+ * Handle TRUNCATE message.
+ *
+ * TODO: FDW support
+ */
+static void
+apply_handle_truncate(StringInfo s)
+{
+       bool     cascade = false;
+       bool     restart_seqs = false;
+       List    *remote_relids = NIL;
+       List    *remote_rels = NIL;
+       List    *rels = NIL;
+       List    *relids = NIL;
+       List    *relids_logged = NIL;
+       ListCell *lc;
+
+       ensure_transaction();
+
+       remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
+
+       foreach(lc, remote_relids)
+       {
+               LogicalRepRelId relid = lfirst_oid(lc);
+               LogicalRepRelMapEntry *rel;
+
+               rel = logicalrep_rel_open(relid, RowExclusiveLock);
+               if (!should_apply_changes_for_rel(rel))
+               {
+                       /*
+                        * The relation can't become interesting in the middle of the
+                        * transaction so it's safe to unlock it.
+                        */
+                       logicalrep_rel_close(rel, RowExclusiveLock);
+                       continue;
+               }
+
+               remote_rels = lappend(remote_rels, rel);
+               rels = lappend(rels, rel->localrel);
+               relids = lappend_oid(relids, rel->localreloid);
+               if (RelationIsLogicallyLogged(rel->localrel))
+                       relids_logged = lappend_oid(relids, rel->localreloid);
+       }
+
+       /*
+        * Even if we used CASCADE on the upstream master we explicitly
+        * default to replaying changes without further cascading.
+        * This might be later changeable with a user specified option.
+        */
+       ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
+
+       foreach(lc, remote_rels)
+       {
+               LogicalRepRelMapEntry *rel = lfirst(lc);
+
+               logicalrep_rel_close(rel, NoLock);
+       }
+
+       CommandCounterIncrement();
+}
+
 
 /*
  * Logical replication protocol message dispatcher.
@@ -919,6 +983,10 @@ apply_dispatch(StringInfo s)
                case 'D':
                        apply_handle_delete(s);
                        break;
+                       /* TRUNCATE */
+               case 'T':
+                       apply_handle_truncate(s);
+                       break;
                        /* RELATION */
                case 'R':
                        apply_handle_relation(s);
index aa9cf5b54ed28af559132b79abbce950916ef3d8..06dfbc082f227855c0f0d86aa07a414f43efdb98 100644 (file)
@@ -39,6 +39,9 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
 static void pgoutput_change(LogicalDecodingContext *ctx,
                                ReorderBufferTXN *txn, Relation rel,
                                ReorderBufferChange *change);
+static void pgoutput_truncate(LogicalDecodingContext *ctx,
+                                                         ReorderBufferTXN *txn, int nrelations, Relation relations[],
+                                                         ReorderBufferChange *change);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
                                           RepOriginId origin_id);
 
@@ -77,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
        cb->startup_cb = pgoutput_startup;
        cb->begin_cb = pgoutput_begin_txn;
        cb->change_cb = pgoutput_change;
+       cb->truncate_cb = pgoutput_truncate;
        cb->commit_cb = pgoutput_commit_txn;
        cb->filter_by_origin_cb = pgoutput_origin_filter;
        cb->shutdown_cb = pgoutput_shutdown;
@@ -250,6 +254,46 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        OutputPluginWrite(ctx, true);
 }
 
+/*
+ * Write the relation schema if the current schema hasn't been sent yet.
+ */
+static void
+maybe_send_schema(LogicalDecodingContext *ctx,
+                                 Relation relation, RelationSyncEntry *relentry)
+{
+       if (!relentry->schema_sent)
+       {
+               TupleDesc       desc;
+               int                     i;
+
+               desc = RelationGetDescr(relation);
+
+               /*
+                * Write out type info if needed. We do that only for user created
+                * types.
+                */
+               for (i = 0; i < desc->natts; i++)
+               {
+                       Form_pg_attribute att = TupleDescAttr(desc, i);
+
+                       if (att->attisdropped)
+                               continue;
+
+                       if (att->atttypid < FirstNormalObjectId)
+                               continue;
+
+                       OutputPluginPrepareWrite(ctx, false);
+                       logicalrep_write_typ(ctx->out, att->atttypid);
+                       OutputPluginWrite(ctx, false);
+               }
+
+               OutputPluginPrepareWrite(ctx, false);
+               logicalrep_write_rel(ctx->out, relation);
+               OutputPluginWrite(ctx, false);
+               relentry->schema_sent = true;
+       }
+}
+
 /*
  * Sends the decoded DML over wire.
  */
@@ -288,40 +332,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        /* Avoid leaking memory by using and resetting our own context */
        old = MemoryContextSwitchTo(data->context);
 
-       /*
-        * Write the relation schema if the current schema haven't been sent yet.
-        */
-       if (!relentry->schema_sent)
-       {
-               TupleDesc       desc;
-               int                     i;
-
-               desc = RelationGetDescr(relation);
-
-               /*
-                * Write out type info if needed. We do that only for user created
-                * types.
-                */
-               for (i = 0; i < desc->natts; i++)
-               {
-                       Form_pg_attribute att = TupleDescAttr(desc, i);
-
-                       if (att->attisdropped)
-                               continue;
-
-                       if (att->atttypid < FirstNormalObjectId)
-                               continue;
-
-                       OutputPluginPrepareWrite(ctx, false);
-                       logicalrep_write_typ(ctx->out, att->atttypid);
-                       OutputPluginWrite(ctx, false);
-               }
-
-               OutputPluginPrepareWrite(ctx, false);
-               logicalrep_write_rel(ctx->out, relation);
-               OutputPluginWrite(ctx, false);
-               relentry->schema_sent = true;
-       }
+       maybe_send_schema(ctx, relation, relentry);
 
        /* Send the data */
        switch (change->action)
@@ -363,6 +374,51 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        MemoryContextReset(data->context);
 }
 
+static void
+pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+                                 int nrelations, Relation relations[], ReorderBufferChange *change)
+{
+       PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+       MemoryContext old;
+       RelationSyncEntry *relentry;
+       int                     i;
+       int                     nrelids;
+       Oid                *relids;
+
+       old = MemoryContextSwitchTo(data->context);
+
+       relids = palloc0(nrelations * sizeof(Oid));
+       nrelids = 0;
+
+       for (i = 0; i < nrelations; i++)
+       {
+               Relation        relation = relations[i];
+               Oid                     relid = RelationGetRelid(relation);
+
+               if (!is_publishable_relation(relation))
+                       continue;
+
+               relentry = get_rel_sync_entry(data, relid);
+
+               if (!relentry->pubactions.pubtruncate)
+                       continue;
+
+               relids[nrelids++] = relid;
+               maybe_send_schema(ctx, relation, relentry);
+       }
+
+       OutputPluginPrepareWrite(ctx, true);
+       logicalrep_write_truncate(ctx->out,
+                                                         nrelids,
+                                                         relids,
+                                                         change->data.truncate.cascade,
+                                                         change->data.truncate.restart_seqs);
+       OutputPluginWrite(ctx, true);
+
+       MemoryContextSwitchTo(old);
+       MemoryContextReset(data->context);
+}
+
 /*
  * Currently we always forward.
  */
@@ -504,7 +560,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
                 * we only need to consider ones that the subscriber requested.
                 */
                entry->pubactions.pubinsert = entry->pubactions.pubupdate =
-                       entry->pubactions.pubdelete = false;
+                       entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
 
                foreach(lc, data->publications)
                {
@@ -515,10 +571,11 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
                                entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
                                entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
                                entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
+                               entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
                        }
 
                        if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-                               entry->pubactions.pubdelete)
+                               entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
                                break;
                }
 
index b6ed06d5b3c60202042af153cabf54de1a0f86dc..40a2c1df049847a9338e6d2e5d12095a95b45002 100644 (file)
@@ -5339,6 +5339,7 @@ GetRelationPublicationActions(Relation relation)
                pubactions->pubinsert |= pubform->pubinsert;
                pubactions->pubupdate |= pubform->pubupdate;
                pubactions->pubdelete |= pubform->pubdelete;
+               pubactions->pubtruncate |= pubform->pubtruncate;
 
                ReleaseSysCache(tup);
 
@@ -5347,7 +5348,7 @@ GetRelationPublicationActions(Relation relation)
                 * other publications.
                 */
                if (pubactions->pubinsert && pubactions->pubupdate &&
-                       pubactions->pubdelete)
+                       pubactions->pubdelete && pubactions->pubtruncate)
                        break;
        }
 
index d066f4f00b61ea222524621cccefb645028f7d07..69016a6c4d31a1f3bcefbf863797ea9d76045df8 100644 (file)
@@ -3712,6 +3712,7 @@ getPublications(Archive *fout)
        int                     i_pubinsert;
        int                     i_pubupdate;
        int                     i_pubdelete;
+       int                     i_pubtruncate;
        int                     i,
                                ntups;
 
@@ -3723,12 +3724,20 @@ getPublications(Archive *fout)
        resetPQExpBuffer(query);
 
        /* Get the publications. */
-       appendPQExpBuffer(query,
-                                         "SELECT p.tableoid, p.oid, p.pubname, "
-                                         "(%s p.pubowner) AS rolname, "
-                                         "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete "
-                                         "FROM pg_publication p",
-                                         username_subquery);
+       if (fout->remoteVersion >= 110000)
+               appendPQExpBuffer(query,
+                                                 "SELECT p.tableoid, p.oid, p.pubname, "
+                                                 "(%s p.pubowner) AS rolname, "
+                                                 "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate "
+                                                 "FROM pg_publication p",
+                                                 username_subquery);
+       else
+               appendPQExpBuffer(query,
+                                                 "SELECT p.tableoid, p.oid, p.pubname, "
+                                                 "(%s p.pubowner) AS rolname, "
+                                                 "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate "
+                                                 "FROM pg_publication p",
+                                                 username_subquery);
 
        res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
 
@@ -3742,6 +3751,7 @@ getPublications(Archive *fout)
        i_pubinsert = PQfnumber(res, "pubinsert");
        i_pubupdate = PQfnumber(res, "pubupdate");
        i_pubdelete = PQfnumber(res, "pubdelete");
+       i_pubtruncate = PQfnumber(res, "pubtruncate");
 
        pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
 
@@ -3762,6 +3772,8 @@ getPublications(Archive *fout)
                        (strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0);
                pubinfo[i].pubdelete =
                        (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
+               pubinfo[i].pubtruncate =
+                       (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
 
                if (strlen(pubinfo[i].rolname) == 0)
                        write_msg(NULL, "WARNING: owner of publication \"%s\" appears to be invalid\n",
@@ -3829,6 +3841,15 @@ dumpPublication(Archive *fout, PublicationInfo *pubinfo)
                first = false;
        }
 
+       if (pubinfo->pubtruncate)
+       {
+               if (!first)
+                       appendPQExpBufferStr(query, ", ");
+
+               appendPQExpBufferStr(query, "truncate");
+               first = false;
+       }
+
        appendPQExpBufferStr(query, "');\n");
 
        ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId,
index a4d6d926a81335b33d099512d20e5518ec5db994..c2314758dea374fcbf08e866d12054e13c66a920 100644 (file)
@@ -595,6 +595,7 @@ typedef struct _PublicationInfo
        bool            pubinsert;
        bool            pubupdate;
        bool            pubdelete;
+       bool            pubtruncate;
 } PublicationInfo;
 
 /*
index acdfde2a1e8c67b84e0e495542ef11384e542e2c..25852b903c057d41cea7804a19b068b7e04127dc 100644 (file)
@@ -2038,7 +2038,7 @@ qr/CREATE TRANSFORM FOR integer LANGUAGE sql \(FROM SQL WITH FUNCTION pg_catalog
                create_order => 50,
                create_sql   => 'CREATE PUBLICATION pub1;',
                regexp       => qr/^
-                       \QCREATE PUBLICATION pub1 WITH (publish = 'insert, update, delete');\E
+                       \QCREATE PUBLICATION pub1 WITH (publish = 'insert, update, delete, truncate');\E
                        /xm,
                like => {
                        %full_runs,
index 0c3be1f50463df1e5854992da796e80028b8e4aa..75a1e42ceea073e4026f814e14e883d7e319f7ac 100644 (file)
@@ -5187,7 +5187,7 @@ listPublications(const char *pattern)
        PQExpBufferData buf;
        PGresult   *res;
        printQueryOpt myopt = pset.popt;
-       static const bool translate_columns[] = {false, false, false, false, false, false};
+       static const bool translate_columns[] = {false, false, false, false, false, false, false};
 
        if (pset.sversion < 100000)
        {
@@ -5207,13 +5207,17 @@ listPublications(const char *pattern)
                                          "  puballtables AS \"%s\",\n"
                                          "  pubinsert AS \"%s\",\n"
                                          "  pubupdate AS \"%s\",\n"
-                                         "  pubdelete AS \"%s\"\n",
+                                         "  pubdelete AS \"%s\"",
                                          gettext_noop("Name"),
                                          gettext_noop("Owner"),
                                          gettext_noop("All tables"),
                                          gettext_noop("Inserts"),
                                          gettext_noop("Updates"),
                                          gettext_noop("Deletes"));
+       if (pset.sversion >= 110000)
+               appendPQExpBuffer(&buf,
+                                                 ",\n  pubtruncate AS \"%s\"",
+                                                 gettext_noop("Truncates"));
 
        appendPQExpBufferStr(&buf,
                                                 "\nFROM pg_catalog.pg_publication\n");
@@ -5254,6 +5258,7 @@ describePublications(const char *pattern)
        PQExpBufferData buf;
        int                     i;
        PGresult   *res;
+       bool            has_pubtruncate;
 
        if (pset.sversion < 100000)
        {
@@ -5265,13 +5270,19 @@ describePublications(const char *pattern)
                return true;
        }
 
+       has_pubtruncate = (pset.sversion >= 110000);
+
        initPQExpBuffer(&buf);
 
        printfPQExpBuffer(&buf,
                                          "SELECT oid, pubname,\n"
                                          "  pg_catalog.pg_get_userbyid(pubowner) AS owner,\n"
-                                         "  puballtables, pubinsert, pubupdate, pubdelete\n"
-                                         "FROM pg_catalog.pg_publication\n");
+                                         "  puballtables, pubinsert, pubupdate, pubdelete");
+       if (has_pubtruncate)
+               appendPQExpBuffer(&buf,
+                                                 ", pubtruncate");
+       appendPQExpBuffer(&buf,
+                                         "\nFROM pg_catalog.pg_publication\n");
 
        processSQLNamePattern(pset.db, &buf, pattern, false, false,
                                                  NULL, "pubname", NULL,
@@ -5317,6 +5328,9 @@ describePublications(const char *pattern)
                printTableOpt myopt = pset.popt.topt;
                printTableContent cont;
 
+               if (has_pubtruncate)
+                       ncols++;
+
                initPQExpBuffer(&title);
                printfPQExpBuffer(&title, _("Publication %s"), pubname);
                printTableInit(&cont, &myopt, title.data, ncols, nrows);
@@ -5326,12 +5340,16 @@ describePublications(const char *pattern)
                printTableAddHeader(&cont, gettext_noop("Inserts"), true, align);
                printTableAddHeader(&cont, gettext_noop("Updates"), true, align);
                printTableAddHeader(&cont, gettext_noop("Deletes"), true, align);
+               if (has_pubtruncate)
+                       printTableAddHeader(&cont, gettext_noop("Truncates"), true, align);
 
                printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false);
                printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false);
                printTableAddCell(&cont, PQgetvalue(res, i, 4), false, false);
                printTableAddCell(&cont, PQgetvalue(res, i, 5), false, false);
                printTableAddCell(&cont, PQgetvalue(res, i, 6), false, false);
+               if (has_pubtruncate)
+                       printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false);
 
                if (!puballtables)
                {
index 7a0c5d36db3b6d24bf69cbd0fe95445e61318552..d88a6bb4c1c5ce9e55573a3664052bfd650f6038 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     201804061
+#define CATALOG_VERSION_NO     201804071
 
 #endif
index 37e77b8be7efcd433528f23291fc3b31eaa5deef..b643c489cdf748ec8fd93e25cb29cc54f3308144 100644 (file)
@@ -49,6 +49,9 @@ CATALOG(pg_publication,6104)
        /* true if deletes are published */
        bool            pubdelete;
 
+       /* true if truncates are published */
+       bool            pubtruncate;
+
 } FormData_pg_publication;
 
 /* ----------------
@@ -63,19 +66,21 @@ typedef FormData_pg_publication *Form_pg_publication;
  * ----------------
  */
 
-#define Natts_pg_publication                           6
+#define Natts_pg_publication                           7
 #define Anum_pg_publication_pubname                    1
 #define Anum_pg_publication_pubowner           2
 #define Anum_pg_publication_puballtables       3
 #define Anum_pg_publication_pubinsert          4
 #define Anum_pg_publication_pubupdate          5
 #define Anum_pg_publication_pubdelete          6
+#define Anum_pg_publication_pubtruncate                7
 
 typedef struct PublicationActions
 {
        bool            pubinsert;
        bool            pubupdate;
        bool            pubdelete;
+       bool            pubtruncate;
 } PublicationActions;
 
 typedef struct Publication
index 116f16f42d10f8443d63a6027cdf826f3919462d..92e88d312791f984a4434af27b5005f4da039fb9 100644 (file)
@@ -97,6 +97,10 @@ extern void logicalrep_write_delete(StringInfo out, Relation rel,
                                                HeapTuple oldtuple);
 extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
                                           LogicalRepTupleData *oldtup);
+extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[],
+                                               bool cascade, bool restart_seqs);
+extern List *logicalrep_read_truncate(StringInfo in,
+                                               bool *cascade, bool *restart_seqs);
 extern void logicalrep_write_rel(StringInfo out, Relation rel);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
 extern void logicalrep_write_typ(StringInfo out, Oid typoid);
index 0c86c647bcabc6b066e52a6942f1c3a8479f0d09..afbbdd543dfb41fb1643fea92cb65c7933ebd682 100644 (file)
@@ -21,20 +21,20 @@ ERROR:  unrecognized publication parameter: foo
 CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum');
 ERROR:  unrecognized "publish" value: "cluster"
 \dRp
-                                   List of publications
-        Name        |          Owner           | All tables | Inserts | Updates | Deletes 
---------------------+--------------------------+------------+---------+---------+---------
- testpib_ins_trunct | regress_publication_user | f          | t       | f       | f
- testpub_default    | regress_publication_user | f          | f       | t       | f
+                                         List of publications
+        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
+--------------------+--------------------------+------------+---------+---------+---------+-----------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f
+ testpub_default    | regress_publication_user | f          | f       | t       | f       | f
 (2 rows)
 
 ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete');
 \dRp
-                                   List of publications
-        Name        |          Owner           | All tables | Inserts | Updates | Deletes 
---------------------+--------------------------+------------+---------+---------+---------
- testpib_ins_trunct | regress_publication_user | f          | t       | f       | f
- testpub_default    | regress_publication_user | f          | t       | t       | t
+                                         List of publications
+        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
+--------------------+--------------------------+------------+---------+---------+---------+-----------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f
+ testpub_default    | regress_publication_user | f          | t       | t       | t       | f
 (2 rows)
 
 --- adding tables
@@ -76,10 +76,10 @@ Publications:
     "testpub_foralltables"
 
 \dRp+ testpub_foralltables
-                  Publication testpub_foralltables
-          Owner           | All tables | Inserts | Updates | Deletes 
---------------------------+------------+---------+---------+---------
- regress_publication_user | t          | t       | t       | f
+                        Publication testpub_foralltables
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | t          | t       | t       | f       | f
 (1 row)
 
 DROP TABLE testpub_tbl2;
@@ -89,19 +89,19 @@ CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3);
 CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3;
 CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3;
 \dRp+ testpub3
-                        Publication testpub3
-          Owner           | All tables | Inserts | Updates | Deletes 
---------------------------+------------+---------+---------+---------
- regress_publication_user | f          | t       | t       | t
+                              Publication testpub3
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f          | t       | t       | t       | t
 Tables:
     "public.testpub_tbl3"
     "public.testpub_tbl3a"
 
 \dRp+ testpub4
-                        Publication testpub4
-          Owner           | All tables | Inserts | Updates | Deletes 
---------------------------+------------+---------+---------+---------
- regress_publication_user | f          | t       | t       | t
+                              Publication testpub4
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f          | t       | t       | t       | t
 Tables:
     "public.testpub_tbl3"
 
@@ -119,10 +119,10 @@ ERROR:  relation "testpub_tbl1" is already member of publication "testpub_fortbl
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
 ERROR:  publication "testpub_fortbl" already exists
 \dRp+ testpub_fortbl
-                     Publication testpub_fortbl
-          Owner           | All tables | Inserts | Updates | Deletes 
---------------------------+------------+---------+---------+---------
- regress_publication_user | f          | t       | t       | t
+                           Publication testpub_fortbl
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f          | t       | t       | t       | t
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -165,10 +165,10 @@ Publications:
     "testpub_fortbl"
 
 \dRp+ testpub_default
-                     Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes 
---------------------------+------------+---------+---------+---------
- regress_publication_user | f          | t       | t       | t
+                           Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f          | t       | t       | t       | f
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -210,10 +210,10 @@ DROP TABLE testpub_parted;
 DROP VIEW testpub_view;
 DROP TABLE testpub_tbl1;
 \dRp+ testpub_default
-                     Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes 
---------------------------+------------+---------+---------+---------
- regress_publication_user | f          | t       | t       | t
+                           Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f          | t       | t       | t       | f
 (1 row)
 
 -- fail - must be owner of publication
@@ -223,20 +223,20 @@ ERROR:  must be owner of publication testpub_default
 RESET ROLE;
 ALTER PUBLICATION testpub_default RENAME TO testpub_foo;
 \dRp testpub_foo
-                               List of publications
-    Name     |          Owner           | All tables | Inserts | Updates | Deletes 
--------------+--------------------------+------------+---------+---------+---------
- testpub_foo | regress_publication_user | f          | t       | t       | t
+                                     List of publications
+    Name     |          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
+-------------+--------------------------+------------+---------+---------+---------+-----------
+ testpub_foo | regress_publication_user | f          | t       | t       | t       | f
 (1 row)
 
 -- rename back to keep the rest simple
 ALTER PUBLICATION testpub_foo RENAME TO testpub_default;
 ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2;
 \dRp testpub_default
-                                  List of publications
-      Name       |           Owner           | All tables | Inserts | Updates | Deletes 
------------------+---------------------------+------------+---------+---------+---------
- testpub_default | regress_publication_user2 | f          | t       | t       | t
+                                        List of publications
+      Name       |           Owner           | All tables | Inserts | Updates | Deletes | Truncates 
+-----------------+---------------------------+------------+---------+---------+---------+-----------
+ testpub_default | regress_publication_user2 | f          | t       | t       | t       | f
 (1 row)
 
 DROP PUBLICATION testpub_default;
diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl
new file mode 100644 (file)
index 0000000..8ea4ab6
--- /dev/null
@@ -0,0 +1,161 @@
+# Test TRUNCATE
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 9;
+
+# setup
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab1 (a int PRIMARY KEY)");
+
+$node_subscriber->safe_psql('postgres',
+       "CREATE TABLE tab1 (a int PRIMARY KEY)");
+
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab2 (a int PRIMARY KEY)");
+
+$node_subscriber->safe_psql('postgres',
+       "CREATE TABLE tab2 (a int PRIMARY KEY)");
+
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab3 (a int PRIMARY KEY)");
+
+$node_subscriber->safe_psql('postgres',
+       "CREATE TABLE tab3 (a int PRIMARY KEY)");
+
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab4 (x int PRIMARY KEY, y int REFERENCES tab3)");
+
+$node_subscriber->safe_psql('postgres',
+       "CREATE TABLE tab4 (x int PRIMARY KEY, y int REFERENCES tab3)");
+
+$node_subscriber->safe_psql('postgres',
+       "CREATE SEQUENCE seq1 OWNED BY tab1.a"
+);
+$node_subscriber->safe_psql('postgres',
+       "ALTER SEQUENCE seq1 START 101"
+);
+
+$node_publisher->safe_psql('postgres',
+       "CREATE PUBLICATION pub1 FOR TABLE tab1");
+$node_publisher->safe_psql('postgres',
+       "CREATE PUBLICATION pub2 FOR TABLE tab2 WITH (publish = insert)");
+$node_publisher->safe_psql('postgres',
+       "CREATE PUBLICATION pub3 FOR TABLE tab3, tab4");
+$node_subscriber->safe_psql('postgres',
+       "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr application_name=sub1' PUBLICATION pub1");
+$node_subscriber->safe_psql('postgres',
+       "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr application_name=sub2' PUBLICATION pub2");
+$node_subscriber->safe_psql('postgres',
+       "CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr application_name=sub3' PUBLICATION pub3");
+
+$node_publisher->wait_for_catchup('sub1');
+
+# insert data to truncate
+
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (2), (3)");
+
+$node_publisher->wait_for_catchup('sub1');
+
+# truncate and check
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1");
+
+$node_publisher->wait_for_catchup('sub1');
+
+my $result = $node_subscriber->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(0||),
+       'truncate replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT nextval('seq1')");
+is($result, qq(1),
+       'sequence not restarted');
+
+# truncate with restart identity
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1 RESTART IDENTITY");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT nextval('seq1')");
+is($result, qq(101),
+       'truncate restarted identities');
+
+# test publication that does not replicate truncate
+
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab2 VALUES (1), (2), (3)");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab2");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab2");
+is($result, qq(3|1|3),
+       'truncate not replicated');
+
+$node_publisher->safe_psql('postgres',
+       "ALTER PUBLICATION pub2 SET (publish = 'insert, truncate')");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab2");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab2");
+is($result, qq(0||),
+       'truncate replicated after publication change');
+
+# test multiple tables connected by foreign keys
+
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab3 VALUES (1), (2), (3)");
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab4 VALUES (11, 1), (111, 1), (22, 2)");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab3, tab4");
+
+$node_publisher->wait_for_catchup('sub3');
+
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab3");
+is($result, qq(0||),
+       'truncate of multiple tables replicated');
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT count(*), min(x), max(x) FROM tab4");
+is($result, qq(0||),
+       'truncate of multiple tables replicated');
+
+# test truncate of multiple tables, some of which are not published
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub2");
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION pub2");
+
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (2), (3)");
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab2 VALUES (1), (2), (3)");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1, tab2");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(0||),
+       'truncate of multiple tables some not published');
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab2");
+is($result, qq(3|1|3),
+       'truncate of multiple tables some not published');