There are basically three situations in which logical decoding needs
to perform cache invalidation. During/After replaying a transaction
with catalog changes, when skipping a uninteresting transaction that
performed catalog changes and when erroring out while replaying a
transaction. Unfortunately these three cases were all done slightly
differently - partially because
8de3e410fa, which greatly simplifies
matters, got committed in the midst of the development of logical
decoding.
The actually problematic case was when logical decoding skipped
transaction commits (and thus processed invalidations). When used via
the SQL interface cache invalidation could access the catalog - bad,
because we didn't set up enough state to allow that correctly. It'd
not be hard to setup sufficient state, but the simpler solution is to
always perform cache invalidation outside a valid transaction.
Also make the different cache invalidation cases look as similar as
possible, to ease code review.
This fixes the assertion failure reported by Antonin Houska in
53EE02D9.
7040702@gmail.com. The presented testcase has been expanded
into a regression test.
Backpatch to 9.4, where logical decoding was introduced.
submake-test_decoding:
$(MAKE) -C $(top_builddir)/contrib/test_decoding
-REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact binary prepared
+REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel binary prepared
regresscheck: all | submake-regress submake-test_decoding
$(MKDIR_P) regression_output
--- /dev/null
+-- test that we can insert the result of a get_changes call into a
+-- logged relation. That's really not a good idea in practical terms,
+-- but provides a nice test.
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+-- slot works
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+------
+(0 rows)
+
+-- create some changes
+CREATE TABLE somechange(id serial primary key);
+INSERT INTO somechange DEFAULT VALUES;
+CREATE TABLE changeresult AS
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT * FROM changeresult;
+ data
+------------------------------------------------
+ BEGIN
+ table public.somechange: INSERT: id[integer]:1
+ COMMIT
+(3 rows)
+
+INSERT INTO changeresult
+ SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+INSERT INTO changeresult
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT * FROM changeresult;
+ data
+--------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.somechange: INSERT: id[integer]:1
+ COMMIT
+ BEGIN
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.somechange: INSERT: id[integer]:1'
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ COMMIT
+ BEGIN
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.somechange: INSERT: id[integer]:1'
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ COMMIT
+ BEGIN
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ COMMIT
+(20 rows)
+
+DROP TABLE changeresult;
+DROP TABLE somechange;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''BEGIN'''''''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''table public.somechange: INSERT: id[integer]:1'''''''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''COMMIT'''''''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ COMMIT
+(14 rows)
+
+SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
+ ?column?
+----------
+ stop
+(1 row)
+
--- /dev/null
+-- test that we can insert the result of a get_changes call into a
+-- logged relation. That's really not a good idea in practical terms,
+-- but provides a nice test.
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+-- slot works
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- create some changes
+CREATE TABLE somechange(id serial primary key);
+INSERT INTO somechange DEFAULT VALUES;
+
+CREATE TABLE changeresult AS
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+SELECT * FROM changeresult;
+
+INSERT INTO changeresult
+ SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+INSERT INTO changeresult
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+SELECT * FROM changeresult;
+DROP TABLE changeresult;
+DROP TABLE somechange;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
volatile CommandId command_id = FirstCommandId;
volatile Snapshot snapshot_now = NULL;
- volatile bool txn_started = false;
- volatile bool subtxn_started = false;
+ volatile bool using_subtxn = false;
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
false);
PG_TRY();
{
- txn_started = false;
/*
* Decoding needs access to syscaches et al., which in turn use
* When we're called via the SQL SRF there's already a transaction
* started, so start an explicit subtransaction there.
*/
- if (IsTransactionOrTransactionBlock())
- {
+ using_subtxn = IsTransactionOrTransactionBlock();
+
+ if (using_subtxn)
BeginInternalSubTransaction("replay");
- subtxn_started = true;
- }
else
- {
StartTransactionCommand();
- txn_started = true;
- }
rb->begin(rb, txn);
elog(ERROR, "output plugin used XID %u",
GetCurrentTransactionId());
- /* make sure there's no cache pollution */
- ReorderBufferExecuteInvalidations(rb, txn);
-
/* cleanup */
TeardownHistoricSnapshot(false);
/*
- * Abort subtransaction or the transaction as a whole has the right
+ * Aborting the current (sub-)transaction as a whole has the right
* semantics. We want all locks acquired in here to be released, not
* reassigned to the parent and we do not want any database access
* have persistent effects.
*/
- if (subtxn_started)
+ AbortCurrentTransaction();
+
+ /* make sure there's no cache pollution */
+ ReorderBufferExecuteInvalidations(rb, txn);
+
+ if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
- else if (txn_started)
- AbortCurrentTransaction();
if (snapshot_now->copied)
ReorderBufferFreeSnap(rb, snapshot_now);
TeardownHistoricSnapshot(true);
- if (snapshot_now->copied)
- ReorderBufferFreeSnap(rb, snapshot_now);
-
- if (subtxn_started)
- RollbackAndReleaseCurrentSubTransaction();
- else if (txn_started)
- AbortCurrentTransaction();
-
/*
- * Invalidations in an aborted transactions aren't allowed to do
- * catalog access, so we don't need to still have the snapshot setup.
+ * Force cache invalidation to happen outside of a valid transaction
+ * to prevent catalog access as we just caught an error.
*/
+ AbortCurrentTransaction();
+
+ /* make sure there's no cache pollution */
ReorderBufferExecuteInvalidations(rb, txn);
+ if (using_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
+
+ if (snapshot_now->copied)
+ ReorderBufferFreeSnap(rb, snapshot_now);
+
/* remove potential on-disk data, and deallocate */
ReorderBufferCleanupTXN(rb, txn);
*/
if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
{
- /* setup snapshot to perform the invalidations in */
- SetupHistoricSnapshot(txn->base_snapshot, txn->tuplecid_hash);
- PG_TRY();
- {
- ReorderBufferExecuteInvalidations(rb, txn);
- TeardownHistoricSnapshot(false);
- }
- PG_CATCH();
- {
- /* cleanup */
- TeardownHistoricSnapshot(true);
- PG_RE_THROW();
- }
- PG_END_TRY();
+ bool use_subtxn = IsTransactionOrTransactionBlock();
+
+ if (use_subtxn)
+ BeginInternalSubTransaction("replay");
+
+ /*
+ * Force invalidations to happen outside of a valid transaction - that
+ * way entries will just be marked as invalid without accessing the
+ * catalog. That's advantageous because we don't need to setup the
+ * full state necessary for catalog access.
+ */
+ if (use_subtxn)
+ AbortCurrentTransaction();
+
+ ReorderBufferExecuteInvalidations(rb, txn);
+
+ if (use_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
}
else
Assert(txn->ninvalidations == 0);