]> granicus.if.org Git - postgresql/commitdiff
Fix and improve cache invalidation logic for logical decoding.
authorAndres Freund <andres@anarazel.de>
Thu, 13 Nov 2014 18:06:43 +0000 (19:06 +0100)
committerAndres Freund <andres@anarazel.de>
Thu, 13 Nov 2014 19:34:58 +0000 (20:34 +0100)
There are basically three situations in which logical decoding needs
to perform cache invalidation. During/After replaying a transaction
with catalog changes, when skipping a uninteresting transaction that
performed catalog changes and when erroring out while replaying a
transaction. Unfortunately these three cases were all done slightly
differently - partially because 8de3e410fa, which greatly simplifies
matters, got committed in the midst of the development of logical
decoding.

The actually problematic case was when logical decoding skipped
transaction commits (and thus processed invalidations). When used via
the SQL interface cache invalidation could access the catalog - bad,
because we didn't set up enough state to allow that correctly. It'd
not be hard to setup sufficient state, but the simpler solution is to
always perform cache invalidation outside a valid transaction.

Also make the different cache invalidation cases look as similar as
possible, to ease code review.

This fixes the assertion failure reported by Antonin Houska in
53EE02D9.7040702@gmail.com. The presented testcase has been expanded
into a regression test.

Backpatch to 9.4, where logical decoding was introduced.

contrib/test_decoding/Makefile
contrib/test_decoding/expected/decoding_into_rel.out [new file with mode: 0644]
contrib/test_decoding/sql/decoding_into_rel.sql [new file with mode: 0644]
src/backend/replication/logical/reorderbuffer.c

index 69e99fa0ac5b9839512cc51cb9425034421197eb..c3e16fbae6dd87f620544e15cb69ca459cc3b876 100644 (file)
@@ -37,7 +37,7 @@ submake-isolation:
 submake-test_decoding:
        $(MAKE) -C $(top_builddir)/contrib/test_decoding
 
-REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact binary prepared
+REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel binary prepared
 
 regresscheck: all | submake-regress submake-test_decoding
        $(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/decoding_into_rel.out b/contrib/test_decoding/expected/decoding_into_rel.out
new file mode 100644 (file)
index 0000000..2671258
--- /dev/null
@@ -0,0 +1,84 @@
+-- test that we can insert the result of a get_changes call into a
+-- logged relation. That's really not a good idea in practical terms,
+-- but provides a nice test.
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- slot works
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+-- create some changes
+CREATE TABLE somechange(id serial primary key);
+INSERT INTO somechange DEFAULT VALUES;
+CREATE TABLE changeresult AS
+    SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT * FROM changeresult;
+                      data                      
+------------------------------------------------
+ BEGIN
+ table public.somechange: INSERT: id[integer]:1
+ COMMIT
+(3 rows)
+
+INSERT INTO changeresult
+    SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+INSERT INTO changeresult
+    SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT * FROM changeresult;
+                                                                       data                                                                       
+--------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.somechange: INSERT: id[integer]:1
+ COMMIT
+ BEGIN
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.somechange: INSERT: id[integer]:1'
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ COMMIT
+ BEGIN
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.somechange: INSERT: id[integer]:1'
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ COMMIT
+ BEGIN
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ COMMIT
+(20 rows)
+
+DROP TABLE changeresult;
+DROP TABLE somechange;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                                                                                  data                                                                                                  
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''BEGIN'''''''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''table public.somechange: INSERT: id[integer]:1'''''''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''COMMIT'''''''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ COMMIT
+(14 rows)
+
+SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+----------
+ stop
+(1 row)
+
diff --git a/contrib/test_decoding/sql/decoding_into_rel.sql b/contrib/test_decoding/sql/decoding_into_rel.sql
new file mode 100644 (file)
index 0000000..3704821
--- /dev/null
@@ -0,0 +1,27 @@
+-- test that we can insert the result of a get_changes call into a
+-- logged relation. That's really not a good idea in practical terms,
+-- but provides a nice test.
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+-- slot works
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- create some changes
+CREATE TABLE somechange(id serial primary key);
+INSERT INTO somechange DEFAULT VALUES;
+
+CREATE TABLE changeresult AS
+    SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+SELECT * FROM changeresult;
+
+INSERT INTO changeresult
+    SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+INSERT INTO changeresult
+    SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+SELECT * FROM changeresult;
+DROP TABLE changeresult;
+DROP TABLE somechange;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
index ece1bc8064022667c1e0c4ae4fc413079348dfc6..7d8f40738d4f0190b40382ec7be86ae75321280d 100644 (file)
@@ -1264,8 +1264,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 
        volatile CommandId command_id = FirstCommandId;
        volatile Snapshot snapshot_now = NULL;
-       volatile bool txn_started = false;
-       volatile bool subtxn_started = false;
+       volatile bool using_subtxn = false;
 
        txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
                                                                false);
@@ -1305,7 +1304,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 
        PG_TRY();
        {
-               txn_started = false;
 
                /*
                 * Decoding needs access to syscaches et al., which in turn use
@@ -1317,16 +1315,12 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                 * When we're called via the SQL SRF there's already a transaction
                 * started, so start an explicit subtransaction there.
                 */
-               if (IsTransactionOrTransactionBlock())
-               {
+               using_subtxn = IsTransactionOrTransactionBlock();
+
+               if (using_subtxn)
                        BeginInternalSubTransaction("replay");
-                       subtxn_started = true;
-               }
                else
-               {
                        StartTransactionCommand();
-                       txn_started = true;
-               }
 
                rb->begin(rb, txn);
 
@@ -1489,22 +1483,22 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                        elog(ERROR, "output plugin used XID %u",
                                 GetCurrentTransactionId());
 
-               /* make sure there's no cache pollution */
-               ReorderBufferExecuteInvalidations(rb, txn);
-
                /* cleanup */
                TeardownHistoricSnapshot(false);
 
                /*
-                * Abort subtransaction or the transaction as a whole has the right
+                * Aborting the current (sub-)transaction as a whole has the right
                 * semantics. We want all locks acquired in here to be released, not
                 * reassigned to the parent and we do not want any database access
                 * have persistent effects.
                 */
-               if (subtxn_started)
+               AbortCurrentTransaction();
+
+               /* make sure there's no cache pollution */
+               ReorderBufferExecuteInvalidations(rb, txn);
+
+               if (using_subtxn)
                        RollbackAndReleaseCurrentSubTransaction();
-               else if (txn_started)
-                       AbortCurrentTransaction();
 
                if (snapshot_now->copied)
                        ReorderBufferFreeSnap(rb, snapshot_now);
@@ -1520,20 +1514,21 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 
                TeardownHistoricSnapshot(true);
 
-               if (snapshot_now->copied)
-                       ReorderBufferFreeSnap(rb, snapshot_now);
-
-               if (subtxn_started)
-                       RollbackAndReleaseCurrentSubTransaction();
-               else if (txn_started)
-                       AbortCurrentTransaction();
-
                /*
-                * Invalidations in an aborted transactions aren't allowed to do
-                * catalog access, so we don't need to still have the snapshot setup.
+                * Force cache invalidation to happen outside of a valid transaction
+                * to prevent catalog access as we just caught an error.
                 */
+               AbortCurrentTransaction();
+
+               /* make sure there's no cache pollution */
                ReorderBufferExecuteInvalidations(rb, txn);
 
+               if (using_subtxn)
+                       RollbackAndReleaseCurrentSubTransaction();
+
+               if (snapshot_now->copied)
+                       ReorderBufferFreeSnap(rb, snapshot_now);
+
                /* remove potential on-disk data, and deallocate */
                ReorderBufferCleanupTXN(rb, txn);
 
@@ -1645,20 +1640,24 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
         */
        if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
        {
-               /* setup snapshot to perform the invalidations in */
-               SetupHistoricSnapshot(txn->base_snapshot, txn->tuplecid_hash);
-               PG_TRY();
-               {
-                       ReorderBufferExecuteInvalidations(rb, txn);
-                       TeardownHistoricSnapshot(false);
-               }
-               PG_CATCH();
-               {
-                       /* cleanup */
-                       TeardownHistoricSnapshot(true);
-                       PG_RE_THROW();
-               }
-               PG_END_TRY();
+               bool use_subtxn = IsTransactionOrTransactionBlock();
+
+               if (use_subtxn)
+                       BeginInternalSubTransaction("replay");
+
+               /*
+                * Force invalidations to happen outside of a valid transaction - that
+                * way entries will just be marked as invalid without accessing the
+                * catalog. That's advantageous because we don't need to setup the
+                * full state necessary for catalog access.
+                */
+               if (use_subtxn)
+                       AbortCurrentTransaction();
+
+               ReorderBufferExecuteInvalidations(rb, txn);
+
+               if (use_subtxn)
+                       RollbackAndReleaseCurrentSubTransaction();
        }
        else
                Assert(txn->ninvalidations == 0);