<entry>If true, <command>DELETE</command> operations are replicated for
tables in the publication.</entry>
</row>
+
+ <row>
+ <entry><structfield>pubtruncate</structfield></entry>
+ <entry><type>bool</type></entry>
+ <entry></entry>
+ <entry>If true, <command>TRUNCATE</command> operations are replicated for
+ tables in the publication.</entry>
+ </row>
</tbody>
</tgroup>
</table>
<para>
Publications can choose to limit the changes they produce to
- any combination of <command>INSERT</command>, <command>UPDATE</command>, and
- <command>DELETE</command>, similar to how triggers are fired by
+ any combination of <command>INSERT</command>, <command>UPDATE</command>,
+ <command>DELETE</command>, and <command>TRUNCATE</command>, similar to how triggers are fired by
particular event types. By default, all operation types are replicated.
</para>
</para>
</listitem>
- <listitem>
- <para>
- <command>TRUNCATE</command> commands are not replicated. This can, of
- course, be worked around by using <command>DELETE</command> instead. To
- avoid accidental <command>TRUNCATE</command> invocations, you can revoke
- the <literal>TRUNCATE</literal> privilege from tables.
- </para>
- </listitem>
-
<listitem>
<para>
Large objects (see <xref linkend="largeobjects"/>) are not replicated.
</listitem>
</varlistentry>
+<varlistentry>
+<term>
+Truncate
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+ Byte1('T')
+</term>
+<listitem>
+<para>
+ Identifies the message as a truncate message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+ Int32
+</term>
+<listitem>
+<para>
+ Number of relations
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+ Int8
+</term>
+<listitem>
+<para>
+ Option bits for <command>TRUNCATE</command>:
+ 1 for <literal>CASCADE</literal>, 2 for <literal>RESTART IDENTITY</literal>
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+ Int32
+</term>
+<listitem>
+<para>
+ ID of the relation corresponding to the ID in the relation
+ message. This field is repeated for each relation.
+</para>
+</listitem>
+</varlistentry>
+
+</variablelist>
+</para>
+</listitem>
+</varlistentry>
+
</variablelist>
<para>
This parameter determines which DML operations will be published by
the new publication to the subscribers. The value is
comma-separated list of operations. The allowed operations are
- <literal>insert</literal>, <literal>update</literal>, and
- <literal>delete</literal>. The default is to publish all actions,
+ <literal>insert</literal>, <literal>update</literal>,
+ <literal>delete</literal>, and <literal>truncate</literal>.
+ The default is to publish all actions,
and so the default value for this option is
- <literal>'insert, update, delete'</literal>.
+ <literal>'insert, update, delete, truncate'</literal>.
</para>
</listitem>
</varlistentry>
</para>
<para>
- <command>TRUNCATE</command> and <acronym>DDL</acronym> operations
- are not published.
+ <acronym>DDL</acronym> operations are not published.
</para>
</refsect1>
pub->pubactions.pubinsert = pubform->pubinsert;
pub->pubactions.pubupdate = pubform->pubupdate;
pub->pubactions.pubdelete = pubform->pubdelete;
+ pub->pubactions.pubtruncate = pubform->pubtruncate;
ReleaseSysCache(tup);
bool *publish_given,
bool *publish_insert,
bool *publish_update,
- bool *publish_delete)
+ bool *publish_delete,
+ bool *publish_truncate)
{
ListCell *lc;
*publish_insert = true;
*publish_update = true;
*publish_delete = true;
+ *publish_truncate = true;
/* Parse options */
foreach(lc, options)
*publish_insert = false;
*publish_update = false;
*publish_delete = false;
+ *publish_truncate = false;
*publish_given = true;
publish = defGetString(defel);
*publish_update = true;
else if (strcmp(publish_opt, "delete") == 0)
*publish_delete = true;
+ else if (strcmp(publish_opt, "truncate") == 0)
+ *publish_truncate = true;
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
bool publish_insert;
bool publish_update;
bool publish_delete;
+ bool publish_truncate;
AclResult aclresult;
/* must have CREATE privilege on database */
parse_publication_options(stmt->options,
&publish_given, &publish_insert,
- &publish_update, &publish_delete);
+ &publish_update, &publish_delete,
+ &publish_truncate);
values[Anum_pg_publication_puballtables - 1] =
BoolGetDatum(stmt->for_all_tables);
BoolGetDatum(publish_update);
values[Anum_pg_publication_pubdelete - 1] =
BoolGetDatum(publish_delete);
+ values[Anum_pg_publication_pubtruncate - 1] =
+ BoolGetDatum(publish_truncate);
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
bool publish_insert;
bool publish_update;
bool publish_delete;
+ bool publish_truncate;
ObjectAddress obj;
parse_publication_options(stmt->options,
&publish_given, &publish_insert,
- &publish_update, &publish_delete);
+ &publish_update, &publish_delete,
+ &publish_truncate);
/* Everything ok, form a new tuple. */
memset(values, 0, sizeof(values));
values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
replaces[Anum_pg_publication_pubdelete - 1] = true;
+
+ values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate);
+ replaces[Anum_pg_publication_pubtruncate - 1] = true;
}
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
*/
#define LOGICALREP_IS_REPLICA_IDENTITY 1
+#define TRUNCATE_CASCADE (1<<0)
+#define TRUNCATE_RESTART_SEQS (1<<1)
+
static void logicalrep_write_attrs(StringInfo out, Relation rel);
static void logicalrep_write_tuple(StringInfo out, Relation rel,
HeapTuple tuple);
return relid;
}
+/*
+ * Write TRUNCATE to the output stream.
+ */
+void
+logicalrep_write_truncate(StringInfo out,
+ int nrelids,
+ Oid relids[],
+ bool cascade, bool restart_seqs)
+{
+ int i;
+ uint8 flags = 0;
+
+ pq_sendbyte(out, 'T'); /* action TRUNCATE */
+
+ pq_sendint32(out, nrelids);
+
+ /* encode and send truncate flags */
+ if (cascade)
+ flags |= TRUNCATE_CASCADE;
+ if (restart_seqs)
+ flags |= TRUNCATE_RESTART_SEQS;
+ pq_sendint8(out, flags);
+
+ for (i = 0; i < nrelids; i++)
+ pq_sendint32(out, relids[i]);
+}
+
+/*
+ * Read TRUNCATE from stream.
+ */
+List *
+logicalrep_read_truncate(StringInfo in,
+ bool *cascade, bool *restart_seqs)
+{
+ int i;
+ int nrelids;
+ List *relids = NIL;
+ uint8 flags;
+
+ nrelids = pq_getmsgint(in, 4);
+
+ /* read and decode truncate flags */
+ flags = pq_getmsgint(in, 1);
+ *cascade = (flags & TRUNCATE_CASCADE) > 0;
+ *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
+
+ for (i = 0; i < nrelids; i++)
+ relids = lappend_oid(relids, pq_getmsgint(in, 4));
+
+ return relids;
+}
+
/*
* Write relation description to the output stream.
*/
#include "access/xact.h"
#include "access/xlog_internal.h"
+#include "catalog/catalog.h"
#include "catalog/namespace.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
+#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/rel.h"
#include "utils/timeout.h"
#include "utils/tqual.h"
#include "utils/syscache.h"
CommandCounterIncrement();
}
+/*
+ * Handle TRUNCATE message.
+ *
+ * TODO: FDW support
+ */
+static void
+apply_handle_truncate(StringInfo s)
+{
+ bool cascade = false;
+ bool restart_seqs = false;
+ List *remote_relids = NIL;
+ List *remote_rels = NIL;
+ List *rels = NIL;
+ List *relids = NIL;
+ List *relids_logged = NIL;
+ ListCell *lc;
+
+ ensure_transaction();
+
+ remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
+
+ foreach(lc, remote_relids)
+ {
+ LogicalRepRelId relid = lfirst_oid(lc);
+ LogicalRepRelMapEntry *rel;
+
+ rel = logicalrep_rel_open(relid, RowExclusiveLock);
+ if (!should_apply_changes_for_rel(rel))
+ {
+ /*
+ * The relation can't become interesting in the middle of the
+ * transaction so it's safe to unlock it.
+ */
+ logicalrep_rel_close(rel, RowExclusiveLock);
+ continue;
+ }
+
+ remote_rels = lappend(remote_rels, rel);
+ rels = lappend(rels, rel->localrel);
+ relids = lappend_oid(relids, rel->localreloid);
+ if (RelationIsLogicallyLogged(rel->localrel))
+ relids_logged = lappend_oid(relids, rel->localreloid);
+ }
+
+ /*
+ * Even if we used CASCADE on the upstream master we explicitly
+ * default to replaying changes without further cascading.
+ * This might be later changeable with a user specified option.
+ */
+ ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
+
+ foreach(lc, remote_rels)
+ {
+ LogicalRepRelMapEntry *rel = lfirst(lc);
+
+ logicalrep_rel_close(rel, NoLock);
+ }
+
+ CommandCounterIncrement();
+}
+
/*
* Logical replication protocol message dispatcher.
case 'D':
apply_handle_delete(s);
break;
+ /* TRUNCATE */
+ case 'T':
+ apply_handle_truncate(s);
+ break;
/* RELATION */
case 'R':
apply_handle_relation(s);
static void pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, Relation rel,
ReorderBufferChange *change);
+static void pgoutput_truncate(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, int nrelations, Relation relations[],
+ ReorderBufferChange *change);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
cb->startup_cb = pgoutput_startup;
cb->begin_cb = pgoutput_begin_txn;
cb->change_cb = pgoutput_change;
+ cb->truncate_cb = pgoutput_truncate;
cb->commit_cb = pgoutput_commit_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
cb->shutdown_cb = pgoutput_shutdown;
OutputPluginWrite(ctx, true);
}
+/*
+ * Write the relation schema if the current schema hasn't been sent yet.
+ */
+static void
+maybe_send_schema(LogicalDecodingContext *ctx,
+ Relation relation, RelationSyncEntry *relentry)
+{
+ if (!relentry->schema_sent)
+ {
+ TupleDesc desc;
+ int i;
+
+ desc = RelationGetDescr(relation);
+
+ /*
+ * Write out type info if needed. We do that only for user created
+ * types.
+ */
+ for (i = 0; i < desc->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ if (att->attisdropped)
+ continue;
+
+ if (att->atttypid < FirstNormalObjectId)
+ continue;
+
+ OutputPluginPrepareWrite(ctx, false);
+ logicalrep_write_typ(ctx->out, att->atttypid);
+ OutputPluginWrite(ctx, false);
+ }
+
+ OutputPluginPrepareWrite(ctx, false);
+ logicalrep_write_rel(ctx->out, relation);
+ OutputPluginWrite(ctx, false);
+ relentry->schema_sent = true;
+ }
+}
+
/*
* Sends the decoded DML over wire.
*/
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
- /*
- * Write the relation schema if the current schema haven't been sent yet.
- */
- if (!relentry->schema_sent)
- {
- TupleDesc desc;
- int i;
-
- desc = RelationGetDescr(relation);
-
- /*
- * Write out type info if needed. We do that only for user created
- * types.
- */
- for (i = 0; i < desc->natts; i++)
- {
- Form_pg_attribute att = TupleDescAttr(desc, i);
-
- if (att->attisdropped)
- continue;
-
- if (att->atttypid < FirstNormalObjectId)
- continue;
-
- OutputPluginPrepareWrite(ctx, false);
- logicalrep_write_typ(ctx->out, att->atttypid);
- OutputPluginWrite(ctx, false);
- }
-
- OutputPluginPrepareWrite(ctx, false);
- logicalrep_write_rel(ctx->out, relation);
- OutputPluginWrite(ctx, false);
- relentry->schema_sent = true;
- }
+ maybe_send_schema(ctx, relation, relentry);
/* Send the data */
switch (change->action)
MemoryContextReset(data->context);
}
+static void
+pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ int nrelations, Relation relations[], ReorderBufferChange *change)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ MemoryContext old;
+ RelationSyncEntry *relentry;
+ int i;
+ int nrelids;
+ Oid *relids;
+
+ old = MemoryContextSwitchTo(data->context);
+
+ relids = palloc0(nrelations * sizeof(Oid));
+ nrelids = 0;
+
+ for (i = 0; i < nrelations; i++)
+ {
+ Relation relation = relations[i];
+ Oid relid = RelationGetRelid(relation);
+
+ if (!is_publishable_relation(relation))
+ continue;
+
+ relentry = get_rel_sync_entry(data, relid);
+
+ if (!relentry->pubactions.pubtruncate)
+ continue;
+
+ relids[nrelids++] = relid;
+ maybe_send_schema(ctx, relation, relentry);
+ }
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_truncate(ctx->out,
+ nrelids,
+ relids,
+ change->data.truncate.cascade,
+ change->data.truncate.restart_seqs);
+ OutputPluginWrite(ctx, true);
+
+ MemoryContextSwitchTo(old);
+ MemoryContextReset(data->context);
+}
+
/*
* Currently we always forward.
*/
* we only need to consider ones that the subscriber requested.
*/
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
- entry->pubactions.pubdelete = false;
+ entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
foreach(lc, data->publications)
{
entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
+ entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
}
if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
- entry->pubactions.pubdelete)
+ entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
break;
}
pubactions->pubinsert |= pubform->pubinsert;
pubactions->pubupdate |= pubform->pubupdate;
pubactions->pubdelete |= pubform->pubdelete;
+ pubactions->pubtruncate |= pubform->pubtruncate;
ReleaseSysCache(tup);
* other publications.
*/
if (pubactions->pubinsert && pubactions->pubupdate &&
- pubactions->pubdelete)
+ pubactions->pubdelete && pubactions->pubtruncate)
break;
}
int i_pubinsert;
int i_pubupdate;
int i_pubdelete;
+ int i_pubtruncate;
int i,
ntups;
resetPQExpBuffer(query);
/* Get the publications. */
- appendPQExpBuffer(query,
- "SELECT p.tableoid, p.oid, p.pubname, "
- "(%s p.pubowner) AS rolname, "
- "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete "
- "FROM pg_publication p",
- username_subquery);
+ if (fout->remoteVersion >= 110000)
+ appendPQExpBuffer(query,
+ "SELECT p.tableoid, p.oid, p.pubname, "
+ "(%s p.pubowner) AS rolname, "
+ "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate "
+ "FROM pg_publication p",
+ username_subquery);
+ else
+ appendPQExpBuffer(query,
+ "SELECT p.tableoid, p.oid, p.pubname, "
+ "(%s p.pubowner) AS rolname, "
+ "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate "
+ "FROM pg_publication p",
+ username_subquery);
res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
i_pubinsert = PQfnumber(res, "pubinsert");
i_pubupdate = PQfnumber(res, "pubupdate");
i_pubdelete = PQfnumber(res, "pubdelete");
+ i_pubtruncate = PQfnumber(res, "pubtruncate");
pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
(strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0);
pubinfo[i].pubdelete =
(strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
+ pubinfo[i].pubtruncate =
+ (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
if (strlen(pubinfo[i].rolname) == 0)
write_msg(NULL, "WARNING: owner of publication \"%s\" appears to be invalid\n",
first = false;
}
+ if (pubinfo->pubtruncate)
+ {
+ if (!first)
+ appendPQExpBufferStr(query, ", ");
+
+ appendPQExpBufferStr(query, "truncate");
+ first = false;
+ }
+
appendPQExpBufferStr(query, "');\n");
ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId,
bool pubinsert;
bool pubupdate;
bool pubdelete;
+ bool pubtruncate;
} PublicationInfo;
/*
create_order => 50,
create_sql => 'CREATE PUBLICATION pub1;',
regexp => qr/^
- \QCREATE PUBLICATION pub1 WITH (publish = 'insert, update, delete');\E
+ \QCREATE PUBLICATION pub1 WITH (publish = 'insert, update, delete, truncate');\E
/xm,
like => {
%full_runs,
PQExpBufferData buf;
PGresult *res;
printQueryOpt myopt = pset.popt;
- static const bool translate_columns[] = {false, false, false, false, false, false};
+ static const bool translate_columns[] = {false, false, false, false, false, false, false};
if (pset.sversion < 100000)
{
" puballtables AS \"%s\",\n"
" pubinsert AS \"%s\",\n"
" pubupdate AS \"%s\",\n"
- " pubdelete AS \"%s\"\n",
+ " pubdelete AS \"%s\"",
gettext_noop("Name"),
gettext_noop("Owner"),
gettext_noop("All tables"),
gettext_noop("Inserts"),
gettext_noop("Updates"),
gettext_noop("Deletes"));
+ if (pset.sversion >= 110000)
+ appendPQExpBuffer(&buf,
+ ",\n pubtruncate AS \"%s\"",
+ gettext_noop("Truncates"));
appendPQExpBufferStr(&buf,
"\nFROM pg_catalog.pg_publication\n");
PQExpBufferData buf;
int i;
PGresult *res;
+ bool has_pubtruncate;
if (pset.sversion < 100000)
{
return true;
}
+ has_pubtruncate = (pset.sversion >= 110000);
+
initPQExpBuffer(&buf);
printfPQExpBuffer(&buf,
"SELECT oid, pubname,\n"
" pg_catalog.pg_get_userbyid(pubowner) AS owner,\n"
- " puballtables, pubinsert, pubupdate, pubdelete\n"
- "FROM pg_catalog.pg_publication\n");
+ " puballtables, pubinsert, pubupdate, pubdelete");
+ if (has_pubtruncate)
+ appendPQExpBuffer(&buf,
+ ", pubtruncate");
+ appendPQExpBuffer(&buf,
+ "\nFROM pg_catalog.pg_publication\n");
processSQLNamePattern(pset.db, &buf, pattern, false, false,
NULL, "pubname", NULL,
printTableOpt myopt = pset.popt.topt;
printTableContent cont;
+ if (has_pubtruncate)
+ ncols++;
+
initPQExpBuffer(&title);
printfPQExpBuffer(&title, _("Publication %s"), pubname);
printTableInit(&cont, &myopt, title.data, ncols, nrows);
printTableAddHeader(&cont, gettext_noop("Inserts"), true, align);
printTableAddHeader(&cont, gettext_noop("Updates"), true, align);
printTableAddHeader(&cont, gettext_noop("Deletes"), true, align);
+ if (has_pubtruncate)
+ printTableAddHeader(&cont, gettext_noop("Truncates"), true, align);
printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false);
printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false);
printTableAddCell(&cont, PQgetvalue(res, i, 4), false, false);
printTableAddCell(&cont, PQgetvalue(res, i, 5), false, false);
printTableAddCell(&cont, PQgetvalue(res, i, 6), false, false);
+ if (has_pubtruncate)
+ printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false);
if (!puballtables)
{
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 201804061
+#define CATALOG_VERSION_NO 201804071
#endif
/* true if deletes are published */
bool pubdelete;
+ /* true if truncates are published */
+ bool pubtruncate;
+
} FormData_pg_publication;
/* ----------------
* ----------------
*/
-#define Natts_pg_publication 6
+#define Natts_pg_publication 7
#define Anum_pg_publication_pubname 1
#define Anum_pg_publication_pubowner 2
#define Anum_pg_publication_puballtables 3
#define Anum_pg_publication_pubinsert 4
#define Anum_pg_publication_pubupdate 5
#define Anum_pg_publication_pubdelete 6
+#define Anum_pg_publication_pubtruncate 7
typedef struct PublicationActions
{
bool pubinsert;
bool pubupdate;
bool pubdelete;
+ bool pubtruncate;
} PublicationActions;
typedef struct Publication
HeapTuple oldtuple);
extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
LogicalRepTupleData *oldtup);
+extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[],
+ bool cascade, bool restart_seqs);
+extern List *logicalrep_read_truncate(StringInfo in,
+ bool *cascade, bool *restart_seqs);
extern void logicalrep_write_rel(StringInfo out, Relation rel);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
extern void logicalrep_write_typ(StringInfo out, Oid typoid);
CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum');
ERROR: unrecognized "publish" value: "cluster"
\dRp
- List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes
---------------------+--------------------------+------------+---------+---------+---------
- testpib_ins_trunct | regress_publication_user | f | t | f | f
- testpub_default | regress_publication_user | f | f | t | f
+ List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates
+--------------------+--------------------------+------------+---------+---------+---------+-----------
+ testpib_ins_trunct | regress_publication_user | f | t | f | f | f
+ testpub_default | regress_publication_user | f | f | t | f | f
(2 rows)
ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete');
\dRp
- List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes
---------------------+--------------------------+------------+---------+---------+---------
- testpib_ins_trunct | regress_publication_user | f | t | f | f
- testpub_default | regress_publication_user | f | t | t | t
+ List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates
+--------------------+--------------------------+------------+---------+---------+---------+-----------
+ testpib_ins_trunct | regress_publication_user | f | t | f | f | f
+ testpub_default | regress_publication_user | f | t | t | t | f
(2 rows)
--- adding tables
"testpub_foralltables"
\dRp+ testpub_foralltables
- Publication testpub_foralltables
- Owner | All tables | Inserts | Updates | Deletes
---------------------------+------------+---------+---------+---------
- regress_publication_user | t | t | t | f
+ Publication testpub_foralltables
+ Owner | All tables | Inserts | Updates | Deletes | Truncates
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | t | t | t | f | f
(1 row)
DROP TABLE testpub_tbl2;
CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3;
CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3;
\dRp+ testpub3
- Publication testpub3
- Owner | All tables | Inserts | Updates | Deletes
---------------------------+------------+---------+---------+---------
- regress_publication_user | f | t | t | t
+ Publication testpub3
+ Owner | All tables | Inserts | Updates | Deletes | Truncates
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f | t | t | t | t
Tables:
"public.testpub_tbl3"
"public.testpub_tbl3a"
\dRp+ testpub4
- Publication testpub4
- Owner | All tables | Inserts | Updates | Deletes
---------------------------+------------+---------+---------+---------
- regress_publication_user | f | t | t | t
+ Publication testpub4
+ Owner | All tables | Inserts | Updates | Deletes | Truncates
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f | t | t | t | t
Tables:
"public.testpub_tbl3"
CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
ERROR: publication "testpub_fortbl" already exists
\dRp+ testpub_fortbl
- Publication testpub_fortbl
- Owner | All tables | Inserts | Updates | Deletes
---------------------------+------------+---------+---------+---------
- regress_publication_user | f | t | t | t
+ Publication testpub_fortbl
+ Owner | All tables | Inserts | Updates | Deletes | Truncates
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f | t | t | t | t
Tables:
"pub_test.testpub_nopk"
"public.testpub_tbl1"
"testpub_fortbl"
\dRp+ testpub_default
- Publication testpub_default
- Owner | All tables | Inserts | Updates | Deletes
---------------------------+------------+---------+---------+---------
- regress_publication_user | f | t | t | t
+ Publication testpub_default
+ Owner | All tables | Inserts | Updates | Deletes | Truncates
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f | t | t | t | f
Tables:
"pub_test.testpub_nopk"
"public.testpub_tbl1"
DROP VIEW testpub_view;
DROP TABLE testpub_tbl1;
\dRp+ testpub_default
- Publication testpub_default
- Owner | All tables | Inserts | Updates | Deletes
---------------------------+------------+---------+---------+---------
- regress_publication_user | f | t | t | t
+ Publication testpub_default
+ Owner | All tables | Inserts | Updates | Deletes | Truncates
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f | t | t | t | f
(1 row)
-- fail - must be owner of publication
RESET ROLE;
ALTER PUBLICATION testpub_default RENAME TO testpub_foo;
\dRp testpub_foo
- List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes
--------------+--------------------------+------------+---------+---------+---------
- testpub_foo | regress_publication_user | f | t | t | t
+ List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates
+-------------+--------------------------+------------+---------+---------+---------+-----------
+ testpub_foo | regress_publication_user | f | t | t | t | f
(1 row)
-- rename back to keep the rest simple
ALTER PUBLICATION testpub_foo RENAME TO testpub_default;
ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2;
\dRp testpub_default
- List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes
------------------+---------------------------+------------+---------+---------+---------
- testpub_default | regress_publication_user2 | f | t | t | t
+ List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates
+-----------------+---------------------------+------------+---------+---------+---------+-----------
+ testpub_default | regress_publication_user2 | f | t | t | t | f
(1 row)
DROP PUBLICATION testpub_default;
--- /dev/null
+# Test TRUNCATE
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 9;
+
+# setup
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab1 (a int PRIMARY KEY)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab1 (a int PRIMARY KEY)");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab2 (a int PRIMARY KEY)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab2 (a int PRIMARY KEY)");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab3 (a int PRIMARY KEY)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab3 (a int PRIMARY KEY)");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab4 (x int PRIMARY KEY, y int REFERENCES tab3)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab4 (x int PRIMARY KEY, y int REFERENCES tab3)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SEQUENCE seq1 OWNED BY tab1.a"
+);
+$node_subscriber->safe_psql('postgres',
+ "ALTER SEQUENCE seq1 START 101"
+);
+
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub1 FOR TABLE tab1");
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub2 FOR TABLE tab2 WITH (publish = insert)");
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub3 FOR TABLE tab3, tab4");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr application_name=sub1' PUBLICATION pub1");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr application_name=sub2' PUBLICATION pub2");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr application_name=sub3' PUBLICATION pub3");
+
+$node_publisher->wait_for_catchup('sub1');
+
+# insert data to truncate
+
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (2), (3)");
+
+$node_publisher->wait_for_catchup('sub1');
+
+# truncate and check
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1");
+
+$node_publisher->wait_for_catchup('sub1');
+
+my $result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(0||),
+ 'truncate replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT nextval('seq1')");
+is($result, qq(1),
+ 'sequence not restarted');
+
+# truncate with restart identity
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1 RESTART IDENTITY");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT nextval('seq1')");
+is($result, qq(101),
+ 'truncate restarted identities');
+
+# test publication that does not replicate truncate
+
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab2 VALUES (1), (2), (3)");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab2");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab2");
+is($result, qq(3|1|3),
+ 'truncate not replicated');
+
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION pub2 SET (publish = 'insert, truncate')");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab2");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab2");
+is($result, qq(0||),
+ 'truncate replicated after publication change');
+
+# test multiple tables connected by foreign keys
+
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab3 VALUES (1), (2), (3)");
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab4 VALUES (11, 1), (111, 1), (22, 2)");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab3, tab4");
+
+$node_publisher->wait_for_catchup('sub3');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab3");
+is($result, qq(0||),
+ 'truncate of multiple tables replicated');
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(x), max(x) FROM tab4");
+is($result, qq(0||),
+ 'truncate of multiple tables replicated');
+
+# test truncate of multiple tables, some of which are not published
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub2");
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION pub2");
+
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (2), (3)");
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab2 VALUES (1), (2), (3)");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1, tab2");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(0||),
+ 'truncate of multiple tables some not published');
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab2");
+is($result, qq(3|1|3),
+ 'truncate of multiple tables some not published');