]> granicus.if.org Git - postgresql/commitdiff
Fix after trigger execution in logical replication
authorPeter Eisentraut <peter_e@gmx.net>
Fri, 3 Mar 2017 15:05:56 +0000 (10:05 -0500)
committerPeter Eisentraut <peter_e@gmx.net>
Fri, 3 Mar 2017 15:05:56 +0000 (10:05 -0500)
From: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Tested-by: Thom Brown <thom@linux.com>
src/backend/replication/logical/worker.c
src/test/subscription/t/003_constraints.pl [new file with mode: 0644]

index f73bdcd673ffc57658eb36deba9c120614c0d9e7..718aafb78a7550cc0de93a664e24f02ebd4d87a7 100644 (file)
@@ -173,6 +173,9 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
        if (resultRelInfo->ri_TrigDesc)
                estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
 
+       /* Prepare to catch AFTER triggers. */
+       AfterTriggerBeginQuery();
+
        return estate;
 }
 
@@ -533,6 +536,10 @@ apply_handle_insert(StringInfo s)
        /* Cleanup. */
        ExecCloseIndices(estate->es_result_relation_info);
        PopActiveSnapshot();
+
+       /* Handle queued AFTER triggers. */
+       AfterTriggerEndQuery(estate);
+
        ExecResetTupleTable(estate->es_tupleTable, false);
        FreeExecutorState(estate);
 
@@ -673,6 +680,10 @@ apply_handle_update(StringInfo s)
        /* Cleanup. */
        ExecCloseIndices(estate->es_result_relation_info);
        PopActiveSnapshot();
+
+       /* Handle queued AFTER triggers. */
+       AfterTriggerEndQuery(estate);
+
        EvalPlanQualEnd(&epqstate);
        ExecResetTupleTable(estate->es_tupleTable, false);
        FreeExecutorState(estate);
@@ -760,6 +771,10 @@ apply_handle_delete(StringInfo s)
        /* Cleanup. */
        ExecCloseIndices(estate->es_result_relation_info);
        PopActiveSnapshot();
+
+       /* Handle queued AFTER triggers. */
+       AfterTriggerEndQuery(estate);
+
        EvalPlanQualEnd(&epqstate);
        ExecResetTupleTable(estate->es_tupleTable, false);
        FreeExecutorState(estate);
diff --git a/src/test/subscription/t/003_constraints.pl b/src/test/subscription/t/003_constraints.pl
new file mode 100644 (file)
index 0000000..b785132
--- /dev/null
@@ -0,0 +1,113 @@
+# Basic logical replication test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 4;
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Setup structure on publisher
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab_fk (bid int PRIMARY KEY);");
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+       "CREATE TABLE tab_fk (bid int PRIMARY KEY);");
+$node_subscriber->safe_psql('postgres',
+       "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+       "CREATE PUBLICATION tap_pub FOR ALL TABLES;");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+       "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub;");
+
+# Wait for subscriber to finish initialization
+my $caughtup_query =
+"SELECT pg_current_wal_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$appname';";
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab_fk (bid) VALUES (1);");
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab_fk_ref (id, bid) VALUES (1, 1);");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+
+# Check data on subscriber
+my $result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk;");
+is($result, qq(1|1|1), 'check replicated tab_fk inserts on subscriber');
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
+is($result, qq(1|1|1), 'check replicated tab_fk_ref inserts on subscriber');
+
+# Drop the fk on publisher
+$node_publisher->safe_psql('postgres',
+       "DROP TABLE tab_fk CASCADE;");
+
+# Insert data
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab_fk_ref (id, bid) VALUES (2, 2);");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+
+# FK is not enforced on subscriber
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
+is($result, qq(2|1|2), 'check FK ignored on subscriber');
+
+# Add replica trigger
+$node_subscriber->safe_psql('postgres', qq{
+CREATE FUNCTION filter_basic_dml_fn() RETURNS TRIGGER AS \$\$
+BEGIN
+    IF (TG_OP = 'INSERT') THEN
+        IF (NEW.id < 10) THEN
+            RETURN NEW;
+        ELSE
+            RETURN NULL;
+        END IF;
+    ELSE
+        RAISE WARNING 'Unknown action';
+        RETURN NULL;
+    END IF;
+END;
+\$\$ LANGUAGE plpgsql;
+CREATE TRIGGER filter_basic_dml_trg
+    BEFORE INSERT ON tab_fk_ref
+    FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn();
+ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg;
+});
+
+# Insert data
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab_fk_ref (id, bid) VALUES (10, 10);");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+
+# The row should be skipped on subscriber
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
+is($result, qq(2|1|2), 'check replica trigger applied on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');