]> granicus.if.org Git - postgresql/commitdiff
Logical decoding of TRUNCATE
authorPeter Eisentraut <peter_e@gmx.net>
Sat, 7 Apr 2018 15:17:56 +0000 (11:17 -0400)
committerPeter Eisentraut <peter_e@gmx.net>
Sat, 7 Apr 2018 15:34:10 +0000 (11:34 -0400)
Add a new WAL record type for TRUNCATE, which is only used when
wal_level >= logical.  (For physical replication, TRUNCATE is already
replicated via SMGR records.)  Add new callback for logical decoding
output plugins to receive TRUNCATE actions.

Author: Simon Riggs <simon@2ndquadrant.com>
Author: Marco Nenciarini <marco.nenciarini@2ndquadrant.it>
Author: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
15 files changed:
contrib/test_decoding/Makefile
contrib/test_decoding/expected/truncate.out [new file with mode: 0644]
contrib/test_decoding/sql/truncate.sql [new file with mode: 0644]
contrib/test_decoding/test_decoding.c
doc/src/sgml/logicaldecoding.sgml
src/backend/access/heap/heapam.c
src/backend/access/rmgrdesc/heapdesc.c
src/backend/commands/tablecmds.c
src/backend/replication/logical/decode.c
src/backend/replication/logical/logical.c
src/backend/replication/logical/reorderbuffer.c
src/include/access/heapam_xlog.h
src/include/commands/tablecmds.h
src/include/replication/output_plugin.h
src/include/replication/reorderbuffer.h

index 6c18189d9db58eef1d440fbc12410cb7cc1bbe20..1d601d8144ce0e843b0e32216c81e004f56c0fd1 100644 (file)
@@ -39,7 +39,7 @@ submake-test_decoding:
 
 REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
        decoding_into_rel binary prepared replorigin time messages \
-       spill slot
+       spill slot truncate
 
 regresscheck: | submake-regress submake-test_decoding temp-install
        $(pg_regress_check) \
diff --git a/contrib/test_decoding/expected/truncate.out b/contrib/test_decoding/expected/truncate.out
new file mode 100644 (file)
index 0000000..be85178
--- /dev/null
@@ -0,0 +1,25 @@
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+CREATE TABLE tab1 (id serial unique, data int);
+CREATE TABLE tab2 (a int primary key, b int);
+TRUNCATE tab1;
+TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE;
+TRUNCATE tab1, tab2;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                         data                         
+------------------------------------------------------
+ BEGIN
+ table public.tab1: TRUNCATE: (no-flags)
+ COMMIT
+ BEGIN
+ table public.tab1: TRUNCATE: restart_seqs cascade
+ COMMIT
+ BEGIN
+ table public.tab1, public.tab2: TRUNCATE: (no-flags)
+ COMMIT
+(9 rows)
+
diff --git a/contrib/test_decoding/sql/truncate.sql b/contrib/test_decoding/sql/truncate.sql
new file mode 100644 (file)
index 0000000..88f113f
--- /dev/null
@@ -0,0 +1,10 @@
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE tab1 (id serial unique, data int);
+CREATE TABLE tab2 (a int primary key, b int);
+
+TRUNCATE tab1;
+TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE;
+TRUNCATE tab1, tab2;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
index a94aeeae292287d54dce3784f216f624146f739e..e192d5b4ad4483c16989d77b765d8add5946715c 100644 (file)
@@ -52,6 +52,10 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
 static void pg_decode_change(LogicalDecodingContext *ctx,
                                 ReorderBufferTXN *txn, Relation rel,
                                 ReorderBufferChange *change);
+static void pg_decode_truncate(LogicalDecodingContext *ctx,
+                                                          ReorderBufferTXN *txn,
+                                                          int nrelations, Relation relations[],
+                                                          ReorderBufferChange *change);
 static bool pg_decode_filter(LogicalDecodingContext *ctx,
                                 RepOriginId origin_id);
 static void pg_decode_message(LogicalDecodingContext *ctx,
@@ -74,6 +78,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
        cb->startup_cb = pg_decode_startup;
        cb->begin_cb = pg_decode_begin_txn;
        cb->change_cb = pg_decode_change;
+       cb->truncate_cb = pg_decode_truncate;
        cb->commit_cb = pg_decode_commit_txn;
        cb->filter_by_origin_cb = pg_decode_filter;
        cb->shutdown_cb = pg_decode_shutdown;
@@ -480,6 +485,59 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        OutputPluginWrite(ctx, true);
 }
 
+static void
+pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+                                  int nrelations, Relation relations[], ReorderBufferChange *change)
+{
+       TestDecodingData *data;
+       MemoryContext old;
+       int                     i;
+
+       data = ctx->output_plugin_private;
+
+       /* output BEGIN if we haven't yet */
+       if (data->skip_empty_xacts && !data->xact_wrote_changes)
+       {
+               pg_output_begin(ctx, data, txn, false);
+       }
+       data->xact_wrote_changes = true;
+
+       /* Avoid leaking memory by using and resetting our own context */
+       old = MemoryContextSwitchTo(data->context);
+
+       OutputPluginPrepareWrite(ctx, true);
+
+       appendStringInfoString(ctx->out, "table ");
+
+       for (i = 0; i < nrelations; i++)
+       {
+               if (i > 0)
+                       appendStringInfoString(ctx->out, ", ");
+
+               appendStringInfoString(ctx->out,
+                                                          quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
+                                                                                                                 NameStr(relations[i]->rd_rel->relname)));
+       }
+
+       appendStringInfoString(ctx->out, ": TRUNCATE:");
+
+       if (change->data.truncate.restart_seqs
+               || change->data.truncate.cascade)
+       {
+               if (change->data.truncate.restart_seqs)
+                       appendStringInfo(ctx->out, " restart_seqs");
+               if (change->data.truncate.cascade)
+                       appendStringInfo(ctx->out, " cascade");
+       }
+       else
+               appendStringInfoString(ctx->out, " (no-flags)");
+
+       MemoryContextSwitchTo(old);
+       MemoryContextReset(data->context);
+
+       OutputPluginWrite(ctx, true);
+}
+
 static void
 pg_decode_message(LogicalDecodingContext *ctx,
                                  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
index f6b14dccb094206ec04e935637efeb4d18edd919..b29cfe6fb40ee993aecf6cd87f27525f783b1f5e 100644 (file)
@@ -383,6 +383,7 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeStartupCB startup_cb;
     LogicalDecodeBeginCB begin_cb;
     LogicalDecodeChangeCB change_cb;
+    LogicalDecodeTruncateCB truncate_cb;
     LogicalDecodeCommitCB commit_cb;
     LogicalDecodeMessageCB message_cb;
     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
@@ -394,8 +395,10 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
      The <function>begin_cb</function>, <function>change_cb</function>
      and <function>commit_cb</function> callbacks are required,
      while <function>startup_cb</function>,
-     <function>filter_by_origin_cb</function>
+     <function>filter_by_origin_cb</function>, <function>truncate_cb</function>,
      and <function>shutdown_cb</function> are optional.
+     If <function>truncate_cb</function> is not set but a
+     <command>TRUNCATE</command> is to be decoded, the action will be ignored.
     </para>
    </sect2>
 
@@ -590,6 +593,28 @@ typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
      </note>
     </sect3>
 
+    <sect3 id="logicaldecoding-output-plugin-truncate">
+     <title>Truncate Callback</title>
+
+     <para>
+      The <function>truncate_cb</function> callback is called for a
+      <command>TRUNCATE</command> command.
+<programlisting>
+typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
+                                         ReorderBufferTXN *txn,
+                                         int nrelations,
+                                         Relation relations[],
+                                         ReorderBufferChange *change);
+</programlisting>
+      The parameters are analogous to the <function>change_cb</function>
+      callback.  However, because <command>TRUNCATE</command> actions on
+      tables connected by foreign keys need to be executed together, this
+      callback receives an array of relations instead of just a single one.
+      See the description of the <xref linkend="sql-truncate"/> statement for
+      details.
+     </para>
+    </sect3>
+
      <sect3 id="logicaldecoding-output-plugin-filter-origin">
      <title>Origin Filter Callback</title>
 
index f96567f5d51fbaaa5afda1d0bc2184fd548512ad..0bafb4fefcdbba07f4d8a0fc4483e0076377289c 100644 (file)
@@ -9260,6 +9260,13 @@ heap_redo(XLogReaderState *record)
                case XLOG_HEAP_UPDATE:
                        heap_xlog_update(record, false);
                        break;
+               case XLOG_HEAP_TRUNCATE:
+                       /*
+                        * TRUNCATE is a no-op because the actions are already logged as
+                        * SMGR WAL records.  TRUNCATE WAL record only exists for logical
+                        * decoding.
+                        */
+                       break;
                case XLOG_HEAP_HOT_UPDATE:
                        heap_xlog_update(record, true);
                        break;
index b00c071cb62d2303a6b56f469f85614fcd55937c..318a281d7f2a493709747feb56a8d73befae6a9f 100644 (file)
@@ -75,6 +75,19 @@ heap_desc(StringInfo buf, XLogReaderState *record)
                                                 xlrec->new_offnum,
                                                 xlrec->new_xmax);
        }
+       else if (info == XLOG_HEAP_TRUNCATE)
+       {
+               xl_heap_truncate *xlrec = (xl_heap_truncate *) rec;
+               int                     i;
+
+               if (xlrec->flags & XLH_TRUNCATE_CASCADE)
+                       appendStringInfo(buf, "cascade ");
+               if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
+                       appendStringInfo(buf, "restart_seqs ");
+               appendStringInfo(buf, "nrelids %u relids", xlrec->nrelids);
+               for (i = 0; i < xlrec->nrelids; i++)
+                       appendStringInfo(buf, " %u", xlrec->relids[i]);
+       }
        else if (info == XLOG_HEAP_CONFIRM)
        {
                xl_heap_confirm *xlrec = (xl_heap_confirm *) rec;
@@ -186,6 +199,9 @@ heap_identify(uint8 info)
                case XLOG_HEAP_HOT_UPDATE | XLOG_HEAP_INIT_PAGE:
                        id = "HOT_UPDATE+INIT";
                        break;
+               case XLOG_HEAP_TRUNCATE:
+                       id = "TRUNCATE";
+                       break;
                case XLOG_HEAP_CONFIRM:
                        id = "HEAP_CONFIRM";
                        break;
index ec2f9616edd068aadf06745cc2eb6d27486a2771..801db12bee765fdde8433bb889f57a2d65183ac3 100644 (file)
@@ -16,6 +16,7 @@
 
 #include "access/genam.h"
 #include "access/heapam.h"
+#include "access/heapam_xlog.h"
 #include "access/multixact.h"
 #include "access/reloptions.h"
 #include "access/relscan.h"
@@ -1322,11 +1323,7 @@ ExecuteTruncate(TruncateStmt *stmt)
 {
        List       *rels = NIL;
        List       *relids = NIL;
-       List       *seq_relids = NIL;
-       EState     *estate;
-       ResultRelInfo *resultRelInfos;
-       ResultRelInfo *resultRelInfo;
-       SubTransactionId mySubid;
+       List       *relids_logged = NIL;
        ListCell   *cell;
 
        /*
@@ -1350,6 +1347,9 @@ ExecuteTruncate(TruncateStmt *stmt)
                truncate_check_rel(rel);
                rels = lappend(rels, rel);
                relids = lappend_oid(relids, myrelid);
+               /* Log this relation only if needed for logical decoding */
+               if (RelationIsLogicallyLogged(rel))
+                       relids_logged = lappend_oid(relids_logged, myrelid);
 
                if (recurse)
                {
@@ -1370,6 +1370,9 @@ ExecuteTruncate(TruncateStmt *stmt)
                                truncate_check_rel(rel);
                                rels = lappend(rels, rel);
                                relids = lappend_oid(relids, childrelid);
+                               /* Log this relation only if needed for logical decoding */
+                               if (RelationIsLogicallyLogged(rel))
+                                       relids_logged = lappend_oid(relids_logged, childrelid);
                        }
                }
                else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
@@ -1379,7 +1382,47 @@ ExecuteTruncate(TruncateStmt *stmt)
                                         errhint("Do not specify the ONLY keyword, or use TRUNCATE ONLY on the partitions directly.")));
        }
 
+       ExecuteTruncateGuts(rels, relids, relids_logged,
+                                               stmt->behavior, stmt->restart_seqs);
+
+       /* And close the rels */
+       foreach(cell, rels)
+       {
+               Relation        rel = (Relation) lfirst(cell);
+
+               heap_close(rel, NoLock);
+       }
+}
+
+/*
+ * ExecuteTruncateGuts
+ *
+ * Internal implementation of TRUNCATE.  This is called by the actual TRUNCATE
+ * command (see above) as well as replication subscribers that execute a
+ * replicated TRUNCATE action.
+ *
+ * explicit_rels is the list of Relations to truncate that the command
+ * specified.  relids is the list of Oids corresponding to explicit_rels.
+ * relids_logged is the list of Oids (a subset of relids) that require
+ * WAL-logging.  This is all a bit redundant, but the existing callers have
+ * this information handy in this form.
+ */
+void
+ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
+                                       DropBehavior behavior, bool restart_seqs)
+{
+       List       *rels;
+       List       *seq_relids = NIL;
+       EState     *estate;
+       ResultRelInfo *resultRelInfos;
+       ResultRelInfo *resultRelInfo;
+       SubTransactionId mySubid;
+       ListCell   *cell;
+       Oid                *logrelids;
+
        /*
+        * Open, exclusive-lock, and check all the explicitly-specified relations
+        *
         * In CASCADE mode, suck in all referencing relations as well.  This
         * requires multiple iterations to find indirectly-dependent relations. At
         * each phase, we need to exclusive-lock new rels before looking for their
@@ -1387,7 +1430,8 @@ ExecuteTruncate(TruncateStmt *stmt)
         * soon as we open it, to avoid a faux pas such as holding lock for a long
         * time on a rel we have no permissions for.
         */
-       if (stmt->behavior == DROP_CASCADE)
+       rels = list_copy(explicit_rels);
+       if (behavior == DROP_CASCADE)
        {
                for (;;)
                {
@@ -1409,6 +1453,9 @@ ExecuteTruncate(TruncateStmt *stmt)
                                truncate_check_rel(rel);
                                rels = lappend(rels, rel);
                                relids = lappend_oid(relids, relid);
+                               /* Log this relation only if needed for logical decoding */
+                               if (RelationIsLogicallyLogged(rel))
+                                       relids_logged = lappend_oid(relids_logged, relid);
                        }
                }
        }
@@ -1421,7 +1468,7 @@ ExecuteTruncate(TruncateStmt *stmt)
 #ifdef USE_ASSERT_CHECKING
        heap_truncate_check_FKs(rels, false);
 #else
-       if (stmt->behavior == DROP_RESTRICT)
+       if (behavior == DROP_RESTRICT)
                heap_truncate_check_FKs(rels, false);
 #endif
 
@@ -1431,7 +1478,7 @@ ExecuteTruncate(TruncateStmt *stmt)
         * We want to do this early since it's pointless to do all the truncation
         * work only to fail on sequence permissions.
         */
-       if (stmt->restart_seqs)
+       if (restart_seqs)
        {
                foreach(cell, rels)
                {
@@ -1586,6 +1633,41 @@ ExecuteTruncate(TruncateStmt *stmt)
                ResetSequence(seq_relid);
        }
 
+       /*
+        * Write a WAL record to allow this set of actions to be logically decoded.
+        *
+        * Assemble an array of relids so we can write a single WAL record for the
+        * whole action.
+        */
+       if (list_length(relids_logged) > 0)
+       {
+               xl_heap_truncate xlrec;
+               int                     i = 0;
+
+               /* should only get here if wal_level >= logical */
+               Assert(XLogLogicalInfoActive());
+
+               logrelids = palloc(list_length(relids_logged) * sizeof(Oid));
+               foreach (cell, relids_logged)
+                       logrelids[i++] = lfirst_oid(cell);
+
+               xlrec.dbId = MyDatabaseId;
+               xlrec.nrelids = list_length(relids_logged);
+               xlrec.flags = 0;
+               if (behavior == DROP_CASCADE)
+                       xlrec.flags |= XLH_TRUNCATE_CASCADE;
+               if (restart_seqs)
+                       xlrec.flags |= XLH_TRUNCATE_RESTART_SEQS;
+
+               XLogBeginInsert();
+               XLogRegisterData((char *) &xlrec, SizeOfHeapTruncate);
+               XLogRegisterData((char *) logrelids, list_length(relids_logged) * sizeof(Oid));
+
+               XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+               (void) XLogInsert(RM_HEAP_ID, XLOG_HEAP_TRUNCATE);
+       }
+
        /*
         * Process all AFTER STATEMENT TRUNCATE triggers.
         */
@@ -1603,7 +1685,11 @@ ExecuteTruncate(TruncateStmt *stmt)
        /* We can clean up the EState now */
        FreeExecutorState(estate);
 
-       /* And close the rels (can't do this while EState still holds refs) */
+       /*
+        * Close any rels opened by CASCADE (can't do this while EState still
+        * holds refs)
+        */
+       rels = list_difference_ptr(rels, explicit_rels);
        foreach(cell, rels)
        {
                Relation        rel = (Relation) lfirst(cell);
index 84183f82031340cfda828c8f700404f2227e3101..30d80e7c5428f773292d7b236af210af57df3f5f 100644 (file)
@@ -65,6 +65,7 @@ static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *bu
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
@@ -450,6 +451,11 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
                                DecodeDelete(ctx, buf);
                        break;
 
+               case XLOG_HEAP_TRUNCATE:
+                       if (SnapBuildProcessChange(builder, xid, buf->origptr))
+                               DecodeTruncate(ctx, buf);
+                       break;
+
                case XLOG_HEAP_INPLACE:
 
                        /*
@@ -826,6 +832,41 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
        ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
 }
 
+/*
+ * Parse XLOG_HEAP_TRUNCATE from wal
+ */
+static void
+DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+       XLogReaderState *r = buf->record;
+       xl_heap_truncate *xlrec;
+       ReorderBufferChange *change;
+
+       xlrec = (xl_heap_truncate *) XLogRecGetData(r);
+
+       /* only interested in our database */
+       if (xlrec->dbId != ctx->slot->data.database)
+               return;
+
+       /* output plugin doesn't look for this origin, no need to queue */
+       if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+               return;
+
+       change = ReorderBufferGetChange(ctx->reorder);
+       change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
+       change->origin_id = XLogRecGetOrigin(r);
+       if (xlrec->flags & XLH_TRUNCATE_CASCADE)
+               change->data.truncate.cascade = true;
+       if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
+               change->data.truncate.restart_seqs = true;
+       change->data.truncate.nrelids = xlrec->nrelids;
+       change->data.truncate.relids = palloc(xlrec->nrelids * sizeof(Oid));
+       memcpy(change->data.truncate.relids, xlrec->relids,
+                  xlrec->nrelids * sizeof(Oid));
+       ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
+                                                        buf->origptr, change);
+}
+
 /*
  * Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs.
  *
index 3d8ad7ddf82c178935e03a471bb7a714d7735a56..0737c7b1e75f3ba55636b7f5624109b83248523b 100644 (file)
@@ -62,6 +62,8 @@ static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                                  XLogRecPtr commit_lsn);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                                  Relation relation, ReorderBufferChange *change);
+static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                                 int nrelations, Relation relations[], ReorderBufferChange *change);
 static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                                   XLogRecPtr message_lsn, bool transactional,
                                   const char *prefix, Size message_size, const char *message);
@@ -183,6 +185,7 @@ StartupDecodingContext(List *output_plugin_options,
        /* wrap output plugin callbacks, so we can add error context information */
        ctx->reorder->begin = begin_cb_wrapper;
        ctx->reorder->apply_change = change_cb_wrapper;
+       ctx->reorder->apply_truncate = truncate_cb_wrapper;
        ctx->reorder->commit = commit_cb_wrapper;
        ctx->reorder->message = message_cb_wrapper;
 
@@ -734,6 +737,46 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
        error_context_stack = errcallback.previous;
 }
 
+static void
+truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                                       int nrelations, Relation relations[], ReorderBufferChange *change)
+{
+       LogicalDecodingContext *ctx = cache->private_data;
+       LogicalErrorCallbackState state;
+       ErrorContextCallback errcallback;
+
+       Assert(!ctx->fast_forward);
+
+       if (!ctx->callbacks.truncate_cb)
+               return;
+
+       /* Push callback + info on the error context stack */
+       state.ctx = ctx;
+       state.callback_name = "truncate";
+       state.report_location = change->lsn;
+       errcallback.callback = output_plugin_error_callback;
+       errcallback.arg = (void *) &state;
+       errcallback.previous = error_context_stack;
+       error_context_stack = &errcallback;
+
+       /* set output state */
+       ctx->accept_writes = true;
+       ctx->write_xid = txn->xid;
+
+       /*
+        * report this change's lsn so replies from clients can give an up2date
+        * answer. This won't ever be enough (and shouldn't be!) to confirm
+        * receipt of this transaction, but it might allow another transaction's
+        * commit to be confirmed with one message.
+        */
+       ctx->write_location = change->lsn;
+
+       ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
+
+       /* Pop the error context stack */
+       error_context_stack = errcallback.previous;
+}
+
 bool
 filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
index b4016ed52b076806913a040a4fff86f8ff4dd9d2..596c91e9a955c6c92c5891bedfc01c15679f4293 100644 (file)
@@ -415,6 +415,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
                case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
                case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
                case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+               case REORDER_BUFFER_CHANGE_TRUNCATE:
                        break;
        }
 
@@ -1491,6 +1492,38 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                                        specinsert = change;
                                        break;
 
+                               case REORDER_BUFFER_CHANGE_TRUNCATE:
+                               {
+                                       int                     i;
+                                       int                     nrelids = change->data.truncate.nrelids;
+                                       int                     nrelations = 0;
+                                       Relation   *relations;
+
+                                       relations = palloc0(nrelids * sizeof(Relation));
+                                       for (i = 0; i < nrelids; i++)
+                                       {
+                                               Oid                     relid = change->data.truncate.relids[i];
+                                               Relation        relation;
+
+                                               relation = RelationIdGetRelation(relid);
+
+                                               if (relation == NULL)
+                                                       elog(ERROR, "could not open relation with OID %u", relid);
+
+                                               if (!RelationIsLogicallyLogged(relation))
+                                                       continue;
+
+                                               relations[nrelations++] = relation;
+                                       }
+
+                                       rb->apply_truncate(rb, txn, nrelations, relations, change);
+
+                                       for (i = 0; i < nrelations; i++)
+                                               RelationClose(relations[i]);
+
+                                       break;
+                               }
+
                                case REORDER_BUFFER_CHANGE_MESSAGE:
                                        rb->message(rb, txn, change->lsn, true,
                                                                change->data.msg.prefix,
@@ -2255,6 +2288,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
                                }
                                break;
                        }
+               case REORDER_BUFFER_CHANGE_TRUNCATE:
                case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
                case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
                case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
@@ -2534,6 +2568,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
                                break;
                        }
                        /* the base struct contains all the data, easy peasy */
+               case REORDER_BUFFER_CHANGE_TRUNCATE:
                case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
                case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
                case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
index 700e25c36a19bdcbfa949ba1193db25fc33df4f1..0052e4c569aa1786eea4e860c831525103fcf9ef 100644 (file)
@@ -32,7 +32,7 @@
 #define XLOG_HEAP_INSERT               0x00
 #define XLOG_HEAP_DELETE               0x10
 #define XLOG_HEAP_UPDATE               0x20
-/* 0x030 is free, was XLOG_HEAP_MOVE */
+#define XLOG_HEAP_TRUNCATE             0x30
 #define XLOG_HEAP_HOT_UPDATE   0x40
 #define XLOG_HEAP_CONFIRM              0x50
 #define XLOG_HEAP_LOCK                 0x60
@@ -109,6 +109,27 @@ typedef struct xl_heap_delete
 
 #define SizeOfHeapDelete       (offsetof(xl_heap_delete, flags) + sizeof(uint8))
 
+/*
+ * xl_heap_delete flag values, 8 bits are available.
+ */
+#define XLH_TRUNCATE_CASCADE                                   (1<<0)
+#define XLH_TRUNCATE_RESTART_SEQS                              (1<<1)
+
+/*
+ * For truncate we list all truncated relids in an array, followed by all
+ * sequence relids that need to be restarted, if any.
+ * All rels are always within the same database, so we just list dbid once.
+ */
+typedef struct xl_heap_truncate
+{
+       Oid                     dbId;
+       uint32          nrelids;
+       uint8           flags;
+       Oid relids[FLEXIBLE_ARRAY_MEMBER];
+} xl_heap_truncate;
+
+#define SizeOfHeapTruncate     (offsetof(xl_heap_truncate, relids))
+
 /*
  * We don't store the whole fixed part (HeapTupleHeaderData) of an inserted
  * or updated tuple in WAL; we can save a few bytes by reconstructing the
index 04a961d3835c06748e2034f585396fa915fb660b..70ee3da76b88b7a4f964d5d2f6789715ff3ac990 100644 (file)
@@ -54,6 +54,8 @@ extern void AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
 extern void CheckTableNotInUse(Relation rel, const char *stmt);
 
 extern void ExecuteTruncate(TruncateStmt *stmt);
+extern void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
+                                                               DropBehavior behavior, bool restart_seqs);
 
 extern void SetRelationHasSubclass(Oid relationId, bool relhassubclass);
 
index 82875d6b3d5045a832afe76795851c058fa404c8..1ee0a56f034fe54dd23691189c0a12ea09769545 100644 (file)
@@ -61,6 +61,15 @@ typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                                                           Relation relation,
                                                                           ReorderBufferChange *change);
 
+/*
+ * Callback for every TRUNCATE in a successful transaction.
+ */
+typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
+                                                                                ReorderBufferTXN *txn,
+                                                                                int nrelations,
+                                                                                Relation relations[],
+                                                                                ReorderBufferChange *change);
+
 /*
  * Called for every (explicit or implicit) COMMIT of a successful transaction.
  */
@@ -98,6 +107,7 @@ typedef struct OutputPluginCallbacks
        LogicalDecodeStartupCB startup_cb;
        LogicalDecodeBeginCB begin_cb;
        LogicalDecodeChangeCB change_cb;
+       LogicalDecodeTruncateCB truncate_cb;
        LogicalDecodeCommitCB commit_cb;
        LogicalDecodeMessageCB message_cb;
        LogicalDecodeFilterByOriginCB filter_by_origin_cb;
index aa430c843c0b7e60fdd353d32a6d0be03145ac83..3867ce895058f4865d1bc51b31c34dd17601647f 100644 (file)
@@ -59,7 +59,8 @@ enum ReorderBufferChangeType
        REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
        REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
        REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
-       REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
+       REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
+       REORDER_BUFFER_CHANGE_TRUNCATE
 };
 
 /*
@@ -99,6 +100,18 @@ typedef struct ReorderBufferChange
                        ReorderBufferTupleBuf *newtuple;
                }                       tp;
 
+               /*
+                * Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing
+                * one set of relations to be truncated.
+                */
+               struct
+               {
+                       Size            nrelids;
+                       bool            cascade;
+                       bool            restart_seqs;
+                       Oid                *relids;
+               }       truncate;
+
                /* Message with arbitrary data. */
                struct
                {
@@ -283,6 +296,14 @@ typedef void (*ReorderBufferApplyChangeCB) (
                                                                                        Relation relation,
                                                                                        ReorderBufferChange *change);
 
+/* truncate callback signature */
+typedef void (*ReorderBufferApplyTruncateCB) (
+                                                                                         ReorderBuffer *rb,
+                                                                                         ReorderBufferTXN *txn,
+                                                                                         int nrelations,
+                                                                                         Relation relations[],
+                                                                                         ReorderBufferChange *change);
+
 /* begin callback signature */
 typedef void (*ReorderBufferBeginCB) (
                                                                          ReorderBuffer *rb,
@@ -328,6 +349,7 @@ struct ReorderBuffer
         */
        ReorderBufferBeginCB begin;
        ReorderBufferApplyChangeCB apply_change;
+       ReorderBufferApplyTruncateCB apply_truncate;
        ReorderBufferCommitCB commit;
        ReorderBufferMessageCB message;