]> granicus.if.org Git - postgresql/commitdiff
Fix crash when logical decoding is invoked from a PL function.
authorTom Lane <tgl@sss.pgh.pa.us>
Fri, 6 Oct 2017 23:18:58 +0000 (19:18 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Fri, 6 Oct 2017 23:18:58 +0000 (19:18 -0400)
The logical decoding functions do BeginInternalSubTransaction and
RollbackAndReleaseCurrentSubTransaction to clean up after themselves.
It turns out that AtEOSubXact_SPI has an unrecognized assumption that
we always need to cancel the active SPI operation in the SPI context
that surrounds the subtransaction (if there is one).  That's true
when the RollbackAndReleaseCurrentSubTransaction call is coming from
the SPI-using function itself, but not when it's happening inside
some unrelated function invoked by a SPI query.  In practice the
affected callers are the various PLs.

To fix, record the current subtransaction ID when we begin a SPI
operation, and clean up only if that ID is the subtransaction being
canceled.

Also, remove AtEOSubXact_SPI's assertion that it must have cleaned
up the surrounding SPI context's active tuptable.  That's proven
wrong by the same test case.

Also clarify (or, if you prefer, reinterpret) the calling conventions
for _SPI_begin_call and _SPI_end_call.  The memory context cleanup
in the latter means that these have always had the flavor of a matched
resource-management pair, but they weren't documented that way before.

Per report from Ben Chobot.

Back-patch to 9.4 where logical decoding came in.  In principle,
the SPI changes should go all the way back, since the problem dates
back to commit 7ec1c5a86.  But given the lack of field complaints
it seems few people are using internal subtransactions in this way.
So I don't feel a need to take any risks in 9.2/9.3.

Discussion: https://postgr.es/m/73FBA179-C68C-4540-9473-71E865408B15@silentmedia.com

contrib/test_decoding/expected/decoding_into_rel.out
contrib/test_decoding/sql/decoding_into_rel.sql
src/backend/executor/spi.c
src/include/executor/spi_priv.h

index be759caa31de8578c5023aef1b740d5be92a92b9..8fd3390066d9331549f775c9a8233f4f6d0e92ff 100644 (file)
@@ -59,6 +59,31 @@ SELECT * FROM changeresult;
 
 DROP TABLE changeresult;
 DROP TABLE somechange;
+-- check calling logical decoding from pl/pgsql
+CREATE FUNCTION slot_changes_wrapper(slot_name name) RETURNS SETOF TEXT AS $$
+BEGIN
+  RETURN QUERY
+    SELECT data FROM pg_logical_slot_peek_changes(slot_name, NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+END$$ LANGUAGE plpgsql;
+SELECT * FROM slot_changes_wrapper('regression_slot');
+                                                                                          slot_changes_wrapper                                                                                          
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''BEGIN'''''''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''table public.somechange: INSERT: id[integer]:1'''''''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''COMMIT'''''''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ COMMIT
+(14 rows)
+
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                                                                                                   data                                                                                                  
 --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
index 54670fd39e76ca41135eaee692edbc477c6ab175..1068cec58882c71b7dd43851340bf3e159c842c5 100644 (file)
@@ -27,5 +27,16 @@ INSERT INTO changeresult
 SELECT * FROM changeresult;
 DROP TABLE changeresult;
 DROP TABLE somechange;
+
+-- check calling logical decoding from pl/pgsql
+CREATE FUNCTION slot_changes_wrapper(slot_name name) RETURNS SETOF TEXT AS $$
+BEGIN
+  RETURN QUERY
+    SELECT data FROM pg_logical_slot_peek_changes(slot_name, NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+END$$ LANGUAGE plpgsql;
+
+SELECT * FROM slot_changes_wrapper('regression_slot');
+
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
 SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
index 38767ae4cedc7dabc80cd08f3f49e8620d1ba65d..b809bec616d7e1cee9671ff5a6c0917eee029581 100644 (file)
@@ -72,8 +72,8 @@ static void _SPI_cursor_operation(Portal portal,
 static SPIPlanPtr _SPI_make_plan_non_temp(SPIPlanPtr plan);
 static SPIPlanPtr _SPI_save_plan(SPIPlanPtr plan);
 
-static int     _SPI_begin_call(bool execmem);
-static int     _SPI_end_call(bool procmem);
+static int     _SPI_begin_call(bool use_exec);
+static int     _SPI_end_call(bool use_exec);
 static MemoryContext _SPI_execmem(void);
 static MemoryContext _SPI_procmem(void);
 static bool _SPI_checktuples(void);
@@ -127,6 +127,7 @@ SPI_connect(void)
        _SPI_current->processed = 0;
        _SPI_current->lastoid = InvalidOid;
        _SPI_current->tuptable = NULL;
+       _SPI_current->execSubid = InvalidSubTransactionId;
        slist_init(&_SPI_current->tuptables);
        _SPI_current->procCxt = NULL;           /* in case we fail to create 'em */
        _SPI_current->execCxt = NULL;
@@ -157,7 +158,7 @@ SPI_finish(void)
 {
        int                     res;
 
-       res = _SPI_begin_call(false);           /* live in procedure memory */
+       res = _SPI_begin_call(false);           /* just check we're connected */
        if (res < 0)
                return res;
 
@@ -282,8 +283,15 @@ AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid)
        {
                slist_mutable_iter siter;
 
-               /* free Executor memory the same as _SPI_end_call would do */
-               MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
+               /*
+                * Throw away executor state if current executor operation was started
+                * within current subxact (essentially, force a _SPI_end_call(true)).
+                */
+               if (_SPI_current->execSubid >= mySubid)
+               {
+                       _SPI_current->execSubid = InvalidSubTransactionId;
+                       MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
+               }
 
                /* throw away any tuple tables created within current subxact */
                slist_foreach_modify(siter, &_SPI_current->tuptables)
@@ -307,8 +315,6 @@ AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid)
                                MemoryContextDelete(tuptable->tuptabcxt);
                        }
                }
-               /* in particular we should have gotten rid of any in-progress table */
-               Assert(_SPI_current->tuptable == NULL);
        }
 }
 
@@ -2529,9 +2535,13 @@ _SPI_procmem(void)
 
 /*
  * _SPI_begin_call: begin a SPI operation within a connected procedure
+ *
+ * use_exec is true if we intend to make use of the procedure's execCxt
+ * during this SPI operation.  We'll switch into that context, and arrange
+ * for it to be cleaned up at _SPI_end_call or if an error occurs.
  */
 static int
-_SPI_begin_call(bool execmem)
+_SPI_begin_call(bool use_exec)
 {
        if (_SPI_curid + 1 != _SPI_connected)
                return SPI_ERROR_UNCONNECTED;
@@ -2539,8 +2549,13 @@ _SPI_begin_call(bool execmem)
        if (_SPI_current != &(_SPI_stack[_SPI_curid]))
                elog(ERROR, "SPI stack corrupted");
 
-       if (execmem)                            /* switch to the Executor memory context */
+       if (use_exec)
+       {
+               /* remember when the Executor operation started */
+               _SPI_current->execSubid = GetCurrentSubTransactionId();
+               /* switch to the Executor memory context */
                _SPI_execmem();
+       }
 
        return 0;
 }
@@ -2548,19 +2563,24 @@ _SPI_begin_call(bool execmem)
 /*
  * _SPI_end_call: end a SPI operation within a connected procedure
  *
+ * use_exec must be the same as in the previous _SPI_begin_call
+ *
  * Note: this currently has no failure return cases, so callers don't check
  */
 static int
-_SPI_end_call(bool procmem)
+_SPI_end_call(bool use_exec)
 {
        /*
         * We're returning to procedure where _SPI_curid == _SPI_connected - 1
         */
        _SPI_curid--;
 
-       if (procmem)                            /* switch to the procedure memory context */
+       if (use_exec)
        {
+               /* switch to the procedure memory context */
                _SPI_procmem();
+               /* mark Executor context no longer in use */
+               _SPI_current->execSubid = InvalidSubTransactionId;
                /* and free Executor memory */
                MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
        }
index e8084dff0911677115f16a5dd7d67c30206baa92..5936d86f7071684f9b8e91b5d44e77908a2c6e27 100644 (file)
@@ -31,6 +31,9 @@ typedef struct
        MemoryContext execCxt;          /* executor context */
        MemoryContext savedcxt;         /* context of SPI_connect's caller */
        SubTransactionId connectSubid;          /* ID of connecting subtransaction */
+
+       /* subtransaction in which current Executor call was started */
+       SubTransactionId execSubid;
 } _SPI_connection;
 
 /*