From 89fd41b390a46202937f647043043d5b0a4eadae Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Thu, 13 Nov 2014 19:06:43 +0100 Subject: [PATCH] Fix and improve cache invalidation logic for logical decoding. There are basically three situations in which logical decoding needs to perform cache invalidation. During/After replaying a transaction with catalog changes, when skipping a uninteresting transaction that performed catalog changes and when erroring out while replaying a transaction. Unfortunately these three cases were all done slightly differently - partially because 8de3e410fa, which greatly simplifies matters, got committed in the midst of the development of logical decoding. The actually problematic case was when logical decoding skipped transaction commits (and thus processed invalidations). When used via the SQL interface cache invalidation could access the catalog - bad, because we didn't set up enough state to allow that correctly. It'd not be hard to setup sufficient state, but the simpler solution is to always perform cache invalidation outside a valid transaction. Also make the different cache invalidation cases look as similar as possible, to ease code review. This fixes the assertion failure reported by Antonin Houska in 53EE02D9.7040702@gmail.com. The presented testcase has been expanded into a regression test. Backpatch to 9.4, where logical decoding was introduced. --- contrib/test_decoding/Makefile | 2 +- .../expected/decoding_into_rel.out | 84 +++++++++++++++++++ .../test_decoding/sql/decoding_into_rel.sql | 27 ++++++ .../replication/logical/reorderbuffer.c | 81 +++++++++--------- 4 files changed, 152 insertions(+), 42 deletions(-) create mode 100644 contrib/test_decoding/expected/decoding_into_rel.out create mode 100644 contrib/test_decoding/sql/decoding_into_rel.sql diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 2e5a01bd73..438be44afc 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -37,7 +37,7 @@ submake-isolation: submake-test_decoding: $(MAKE) -C $(top_builddir)/contrib/test_decoding -REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact binary prepared +REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel binary prepared regresscheck: all | submake-regress submake-test_decoding $(MKDIR_P) regression_output diff --git a/contrib/test_decoding/expected/decoding_into_rel.out b/contrib/test_decoding/expected/decoding_into_rel.out new file mode 100644 index 0000000000..2671258f5d --- /dev/null +++ b/contrib/test_decoding/expected/decoding_into_rel.out @@ -0,0 +1,84 @@ +-- test that we can insert the result of a get_changes call into a +-- logged relation. That's really not a good idea in practical terms, +-- but provides a nice test. +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +-- slot works +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +-- create some changes +CREATE TABLE somechange(id serial primary key); +INSERT INTO somechange DEFAULT VALUES; +CREATE TABLE changeresult AS + SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT * FROM changeresult; + data +------------------------------------------------ + BEGIN + table public.somechange: INSERT: id[integer]:1 + COMMIT +(3 rows) + +INSERT INTO changeresult + SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +INSERT INTO changeresult + SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT * FROM changeresult; + data +-------------------------------------------------------------------------------------------------------------------------------------------------- + BEGIN + table public.somechange: INSERT: id[integer]:1 + COMMIT + BEGIN + table public.changeresult: INSERT: data[text]:'BEGIN' + table public.changeresult: INSERT: data[text]:'table public.somechange: INSERT: id[integer]:1' + table public.changeresult: INSERT: data[text]:'COMMIT' + COMMIT + BEGIN + table public.changeresult: INSERT: data[text]:'BEGIN' + table public.changeresult: INSERT: data[text]:'table public.somechange: INSERT: id[integer]:1' + table public.changeresult: INSERT: data[text]:'COMMIT' + COMMIT + BEGIN + table public.changeresult: INSERT: data[text]:'BEGIN' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT''' + table public.changeresult: INSERT: data[text]:'COMMIT' + COMMIT +(20 rows) + +DROP TABLE changeresult; +DROP TABLE somechange; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + BEGIN + table public.changeresult: INSERT: data[text]:'BEGIN' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT''' + table public.changeresult: INSERT: data[text]:'COMMIT' + table public.changeresult: INSERT: data[text]:'BEGIN' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''BEGIN''''''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''table public.somechange: INSERT: id[integer]:1''''''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''COMMIT''''''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT''' + table public.changeresult: INSERT: data[text]:'COMMIT' + COMMIT +(14 rows) + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + stop +(1 row) + diff --git a/contrib/test_decoding/sql/decoding_into_rel.sql b/contrib/test_decoding/sql/decoding_into_rel.sql new file mode 100644 index 0000000000..3704821bcc --- /dev/null +++ b/contrib/test_decoding/sql/decoding_into_rel.sql @@ -0,0 +1,27 @@ +-- test that we can insert the result of a get_changes call into a +-- logged relation. That's really not a good idea in practical terms, +-- but provides a nice test. +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +-- slot works +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- create some changes +CREATE TABLE somechange(id serial primary key); +INSERT INTO somechange DEFAULT VALUES; + +CREATE TABLE changeresult AS + SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +SELECT * FROM changeresult; + +INSERT INTO changeresult + SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +INSERT INTO changeresult + SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +SELECT * FROM changeresult; +DROP TABLE changeresult; +DROP TABLE somechange; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index ece1bc8064..7d8f40738d 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1264,8 +1264,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, volatile CommandId command_id = FirstCommandId; volatile Snapshot snapshot_now = NULL; - volatile bool txn_started = false; - volatile bool subtxn_started = false; + volatile bool using_subtxn = false; txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); @@ -1305,7 +1304,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, PG_TRY(); { - txn_started = false; /* * Decoding needs access to syscaches et al., which in turn use @@ -1317,16 +1315,12 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, * When we're called via the SQL SRF there's already a transaction * started, so start an explicit subtransaction there. */ - if (IsTransactionOrTransactionBlock()) - { + using_subtxn = IsTransactionOrTransactionBlock(); + + if (using_subtxn) BeginInternalSubTransaction("replay"); - subtxn_started = true; - } else - { StartTransactionCommand(); - txn_started = true; - } rb->begin(rb, txn); @@ -1489,22 +1483,22 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, elog(ERROR, "output plugin used XID %u", GetCurrentTransactionId()); - /* make sure there's no cache pollution */ - ReorderBufferExecuteInvalidations(rb, txn); - /* cleanup */ TeardownHistoricSnapshot(false); /* - * Abort subtransaction or the transaction as a whole has the right + * Aborting the current (sub-)transaction as a whole has the right * semantics. We want all locks acquired in here to be released, not * reassigned to the parent and we do not want any database access * have persistent effects. */ - if (subtxn_started) + AbortCurrentTransaction(); + + /* make sure there's no cache pollution */ + ReorderBufferExecuteInvalidations(rb, txn); + + if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); - else if (txn_started) - AbortCurrentTransaction(); if (snapshot_now->copied) ReorderBufferFreeSnap(rb, snapshot_now); @@ -1520,20 +1514,21 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, TeardownHistoricSnapshot(true); - if (snapshot_now->copied) - ReorderBufferFreeSnap(rb, snapshot_now); - - if (subtxn_started) - RollbackAndReleaseCurrentSubTransaction(); - else if (txn_started) - AbortCurrentTransaction(); - /* - * Invalidations in an aborted transactions aren't allowed to do - * catalog access, so we don't need to still have the snapshot setup. + * Force cache invalidation to happen outside of a valid transaction + * to prevent catalog access as we just caught an error. */ + AbortCurrentTransaction(); + + /* make sure there's no cache pollution */ ReorderBufferExecuteInvalidations(rb, txn); + if (using_subtxn) + RollbackAndReleaseCurrentSubTransaction(); + + if (snapshot_now->copied) + ReorderBufferFreeSnap(rb, snapshot_now); + /* remove potential on-disk data, and deallocate */ ReorderBufferCleanupTXN(rb, txn); @@ -1645,20 +1640,24 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) */ if (txn->base_snapshot != NULL && txn->ninvalidations > 0) { - /* setup snapshot to perform the invalidations in */ - SetupHistoricSnapshot(txn->base_snapshot, txn->tuplecid_hash); - PG_TRY(); - { - ReorderBufferExecuteInvalidations(rb, txn); - TeardownHistoricSnapshot(false); - } - PG_CATCH(); - { - /* cleanup */ - TeardownHistoricSnapshot(true); - PG_RE_THROW(); - } - PG_END_TRY(); + bool use_subtxn = IsTransactionOrTransactionBlock(); + + if (use_subtxn) + BeginInternalSubTransaction("replay"); + + /* + * Force invalidations to happen outside of a valid transaction - that + * way entries will just be marked as invalid without accessing the + * catalog. That's advantageous because we don't need to setup the + * full state necessary for catalog access. + */ + if (use_subtxn) + AbortCurrentTransaction(); + + ReorderBufferExecuteInvalidations(rb, txn); + + if (use_subtxn) + RollbackAndReleaseCurrentSubTransaction(); } else Assert(txn->ninvalidations == 0); -- 2.40.0