From: Peter Eisentraut Date: Fri, 3 Mar 2017 15:05:56 +0000 (-0500) Subject: Fix after trigger execution in logical replication X-Git-Tag: REL_10_BETA1~791 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=713f7c47d9c38654adbc36c47fd9e439f0d1f715;p=postgresql Fix after trigger execution in logical replication From: Petr Jelinek Tested-by: Thom Brown --- diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index f73bdcd673..718aafb78a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -173,6 +173,9 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel) if (resultRelInfo->ri_TrigDesc) estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate); + /* Prepare to catch AFTER triggers. */ + AfterTriggerBeginQuery(); + return estate; } @@ -533,6 +536,10 @@ apply_handle_insert(StringInfo s) /* Cleanup. */ ExecCloseIndices(estate->es_result_relation_info); PopActiveSnapshot(); + + /* Handle queued AFTER triggers. */ + AfterTriggerEndQuery(estate); + ExecResetTupleTable(estate->es_tupleTable, false); FreeExecutorState(estate); @@ -673,6 +680,10 @@ apply_handle_update(StringInfo s) /* Cleanup. */ ExecCloseIndices(estate->es_result_relation_info); PopActiveSnapshot(); + + /* Handle queued AFTER triggers. */ + AfterTriggerEndQuery(estate); + EvalPlanQualEnd(&epqstate); ExecResetTupleTable(estate->es_tupleTable, false); FreeExecutorState(estate); @@ -760,6 +771,10 @@ apply_handle_delete(StringInfo s) /* Cleanup. */ ExecCloseIndices(estate->es_result_relation_info); PopActiveSnapshot(); + + /* Handle queued AFTER triggers. */ + AfterTriggerEndQuery(estate); + EvalPlanQualEnd(&epqstate); ExecResetTupleTable(estate->es_tupleTable, false); FreeExecutorState(estate); diff --git a/src/test/subscription/t/003_constraints.pl b/src/test/subscription/t/003_constraints.pl new file mode 100644 index 0000000000..b785132f5b --- /dev/null +++ b/src/test/subscription/t/003_constraints.pl @@ -0,0 +1,113 @@ +# Basic logical replication test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 4; + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Setup structure on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_fk (bid int PRIMARY KEY);"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_fk (bid int PRIMARY KEY);"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR ALL TABLES;"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub;"); + +# Wait for subscriber to finish initialization +my $caughtup_query = +"SELECT pg_current_wal_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$appname';"; +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_fk (bid) VALUES (1);"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_fk_ref (id, bid) VALUES (1, 1);"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# Check data on subscriber +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk;"); +is($result, qq(1|1|1), 'check replicated tab_fk inserts on subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;"); +is($result, qq(1|1|1), 'check replicated tab_fk_ref inserts on subscriber'); + +# Drop the fk on publisher +$node_publisher->safe_psql('postgres', + "DROP TABLE tab_fk CASCADE;"); + +# Insert data +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_fk_ref (id, bid) VALUES (2, 2);"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# FK is not enforced on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;"); +is($result, qq(2|1|2), 'check FK ignored on subscriber'); + +# Add replica trigger +$node_subscriber->safe_psql('postgres', qq{ +CREATE FUNCTION filter_basic_dml_fn() RETURNS TRIGGER AS \$\$ +BEGIN + IF (TG_OP = 'INSERT') THEN + IF (NEW.id < 10) THEN + RETURN NEW; + ELSE + RETURN NULL; + END IF; + ELSE + RAISE WARNING 'Unknown action'; + RETURN NULL; + END IF; +END; +\$\$ LANGUAGE plpgsql; +CREATE TRIGGER filter_basic_dml_trg + BEFORE INSERT ON tab_fk_ref + FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn(); +ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg; +}); + +# Insert data +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_fk_ref (id, bid) VALUES (10, 10);"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# The row should be skipped on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;"); +is($result, qq(2|1|2), 'check replica trigger applied on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast');