DROP TABLE changeresult;
DROP TABLE somechange;
+-- check calling logical decoding from pl/pgsql
+CREATE FUNCTION slot_changes_wrapper(slot_name name) RETURNS SETOF TEXT AS $$
+BEGIN
+ RETURN QUERY
+ SELECT data FROM pg_logical_slot_peek_changes(slot_name, NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+END$$ LANGUAGE plpgsql;
+SELECT * FROM slot_changes_wrapper('regression_slot');
+ slot_changes_wrapper
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''BEGIN'''''''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''table public.somechange: INSERT: id[integer]:1'''''''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''COMMIT'''''''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ COMMIT
+(14 rows)
+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SELECT * FROM changeresult;
DROP TABLE changeresult;
DROP TABLE somechange;
+
+-- check calling logical decoding from pl/pgsql
+CREATE FUNCTION slot_changes_wrapper(slot_name name) RETURNS SETOF TEXT AS $$
+BEGIN
+ RETURN QUERY
+ SELECT data FROM pg_logical_slot_peek_changes(slot_name, NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+END$$ LANGUAGE plpgsql;
+
+SELECT * FROM slot_changes_wrapper('regression_slot');
+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
static SPIPlanPtr _SPI_make_plan_non_temp(SPIPlanPtr plan);
static SPIPlanPtr _SPI_save_plan(SPIPlanPtr plan);
-static int _SPI_begin_call(bool execmem);
-static int _SPI_end_call(bool procmem);
+static int _SPI_begin_call(bool use_exec);
+static int _SPI_end_call(bool use_exec);
static MemoryContext _SPI_execmem(void);
static MemoryContext _SPI_procmem(void);
static bool _SPI_checktuples(void);
_SPI_current->processed = 0;
_SPI_current->lastoid = InvalidOid;
_SPI_current->tuptable = NULL;
+ _SPI_current->execSubid = InvalidSubTransactionId;
slist_init(&_SPI_current->tuptables);
_SPI_current->procCxt = NULL; /* in case we fail to create 'em */
_SPI_current->execCxt = NULL;
{
int res;
- res = _SPI_begin_call(false); /* live in procedure memory */
+ res = _SPI_begin_call(false); /* just check we're connected */
if (res < 0)
return res;
{
slist_mutable_iter siter;
- /* free Executor memory the same as _SPI_end_call would do */
- MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
+ /*
+ * Throw away executor state if current executor operation was started
+ * within current subxact (essentially, force a _SPI_end_call(true)).
+ */
+ if (_SPI_current->execSubid >= mySubid)
+ {
+ _SPI_current->execSubid = InvalidSubTransactionId;
+ MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
+ }
/* throw away any tuple tables created within current subxact */
slist_foreach_modify(siter, &_SPI_current->tuptables)
MemoryContextDelete(tuptable->tuptabcxt);
}
}
- /* in particular we should have gotten rid of any in-progress table */
- Assert(_SPI_current->tuptable == NULL);
}
}
/*
* _SPI_begin_call: begin a SPI operation within a connected procedure
+ *
+ * use_exec is true if we intend to make use of the procedure's execCxt
+ * during this SPI operation. We'll switch into that context, and arrange
+ * for it to be cleaned up at _SPI_end_call or if an error occurs.
*/
static int
-_SPI_begin_call(bool execmem)
+_SPI_begin_call(bool use_exec)
{
if (_SPI_curid + 1 != _SPI_connected)
return SPI_ERROR_UNCONNECTED;
if (_SPI_current != &(_SPI_stack[_SPI_curid]))
elog(ERROR, "SPI stack corrupted");
- if (execmem) /* switch to the Executor memory context */
+ if (use_exec)
+ {
+ /* remember when the Executor operation started */
+ _SPI_current->execSubid = GetCurrentSubTransactionId();
+ /* switch to the Executor memory context */
_SPI_execmem();
+ }
return 0;
}
/*
* _SPI_end_call: end a SPI operation within a connected procedure
*
+ * use_exec must be the same as in the previous _SPI_begin_call
+ *
* Note: this currently has no failure return cases, so callers don't check
*/
static int
-_SPI_end_call(bool procmem)
+_SPI_end_call(bool use_exec)
{
/*
* We're returning to procedure where _SPI_curid == _SPI_connected - 1
*/
_SPI_curid--;
- if (procmem) /* switch to the procedure memory context */
+ if (use_exec)
{
+ /* switch to the procedure memory context */
_SPI_procmem();
+ /* mark Executor context no longer in use */
+ _SPI_current->execSubid = InvalidSubTransactionId;
/* and free Executor memory */
MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
}