]> granicus.if.org Git - postgresql/commitdiff
Add subtransaction handling for table synchronization workers.
authorRobert Haas <rhaas@postgresql.org>
Mon, 16 Jul 2018 21:33:22 +0000 (17:33 -0400)
committerRobert Haas <rhaas@postgresql.org>
Mon, 16 Jul 2018 21:33:35 +0000 (17:33 -0400)
Since the old logic was completely unaware of subtransactions, a
change made in a subsequently-aborted subtransaction would still cause
workers to be stopped at toplevel transaction commit.  Fix that by
managing a stack of worker lists rather than just one.

Amit Khandekar and Robert Haas

Discussion: http://postgr.es/m/CAJ3gD9eaG_mWqiOTA2LfAug-VRNn1hrhf50Xi1YroxL37QkZNg@mail.gmail.com

src/backend/access/transam/xact.c
src/backend/replication/logical/launcher.c
src/include/replication/logicallauncher.h
src/tools/pgindent/typedefs.list

index 1da1f13ef33f0c882d0203db3fc5e579654293ed..9aa63c8792be56e4186ef2bbfc12198c00d3b6c3 100644 (file)
@@ -4637,6 +4637,7 @@ CommitSubTransaction(void)
        AtEOSubXact_HashTables(true, s->nestingLevel);
        AtEOSubXact_PgStat(true, s->nestingLevel);
        AtSubCommit_Snapshot(s->nestingLevel);
+       AtEOSubXact_ApplyLauncher(true, s->nestingLevel);
 
        /*
         * We need to restore the upper transaction's read-only state, in case the
@@ -4790,6 +4791,7 @@ AbortSubTransaction(void)
                AtEOSubXact_HashTables(false, s->nestingLevel);
                AtEOSubXact_PgStat(false, s->nestingLevel);
                AtSubAbort_Snapshot(s->nestingLevel);
+               AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
        }
 
        /*
index 6ef333b7257e993ce842ca91d365ae556d95099c..ada16adb67b1fc50de5cc0e0a07e7ac1316ef9f6 100644 (file)
@@ -79,7 +79,19 @@ typedef struct LogicalRepWorkerId
        Oid                     relid;
 } LogicalRepWorkerId;
 
-static List *on_commit_stop_workers = NIL;
+typedef struct StopWorkersData
+{
+       int                     nestDepth;              /* Sub-transaction nest level */
+       List       *workers;            /* List of LogicalRepWorkerId */
+       struct StopWorkersData *parent; /* This need not be an immediate
+                                                                        * subtransaction parent */
+} StopWorkersData;
+
+/*
+ * Stack of StopWorkersData elements. Each stack element contains the workers
+ * to be stopped for that subtransaction.
+ */
+static StopWorkersData *on_commit_stop_workers = NULL;
 
 static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
@@ -559,17 +571,41 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 void
 logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
 {
+       int                     nestDepth = GetCurrentTransactionNestLevel();
        LogicalRepWorkerId *wid;
        MemoryContext oldctx;
 
        /* Make sure we store the info in context that survives until commit. */
        oldctx = MemoryContextSwitchTo(TopTransactionContext);
 
+       /* Check that previous transactions were properly cleaned up. */
+       Assert(on_commit_stop_workers == NULL ||
+                  nestDepth >= on_commit_stop_workers->nestDepth);
+
+       /*
+        * Push a new stack element if we don't already have one for the current
+        * nestDepth.
+        */
+       if (on_commit_stop_workers == NULL ||
+               nestDepth > on_commit_stop_workers->nestDepth)
+       {
+               StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
+
+               newdata->nestDepth = nestDepth;
+               newdata->workers = NIL;
+               newdata->parent = on_commit_stop_workers;
+               on_commit_stop_workers = newdata;
+       }
+
+       /*
+        * Finally add a new worker into the worker list of the current
+        * subtransaction.
+        */
        wid = palloc(sizeof(LogicalRepWorkerId));
        wid->subid = subid;
        wid->relid = relid;
-
-       on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+       on_commit_stop_workers->workers =
+               lappend(on_commit_stop_workers->workers, wid);
 
        MemoryContextSwitchTo(oldctx);
 }
@@ -823,7 +859,7 @@ ApplyLauncherShmemInit(void)
 bool
 XactManipulatesLogicalReplicationWorkers(void)
 {
-       return (on_commit_stop_workers != NIL);
+       return (on_commit_stop_workers != NULL);
 }
 
 /*
@@ -832,15 +868,25 @@ XactManipulatesLogicalReplicationWorkers(void)
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
+
+       Assert(on_commit_stop_workers == NULL ||
+                  (on_commit_stop_workers->nestDepth == 1 &&
+                       on_commit_stop_workers->parent == NULL));
+
        if (isCommit)
        {
                ListCell   *lc;
 
-               foreach(lc, on_commit_stop_workers)
+               if (on_commit_stop_workers != NULL)
                {
-                       LogicalRepWorkerId *wid = lfirst(lc);
+                       List       *workers = on_commit_stop_workers->workers;
+
+                       foreach(lc, workers)
+                       {
+                               LogicalRepWorkerId *wid = lfirst(lc);
 
-                       logicalrep_worker_stop(wid->subid, wid->relid);
+                               logicalrep_worker_stop(wid->subid, wid->relid);
+                       }
                }
 
                if (on_commit_launcher_wakeup)
@@ -851,10 +897,64 @@ AtEOXact_ApplyLauncher(bool isCommit)
         * No need to pfree on_commit_stop_workers.  It was allocated in
         * transaction memory context, which is going to be cleaned soon.
         */
-       on_commit_stop_workers = NIL;
+       on_commit_stop_workers = NULL;
        on_commit_launcher_wakeup = false;
 }
 
+/*
+ * On commit, merge the current on_commit_stop_workers list into the
+ * immediate parent, if present.
+ * On rollback, discard the current on_commit_stop_workers list.
+ * Pop out the stack.
+ */
+void
+AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
+{
+       StopWorkersData *parent;
+
+       /* Exit immediately if there's no work to do at this level. */
+       if (on_commit_stop_workers == NULL ||
+               on_commit_stop_workers->nestDepth < nestDepth)
+               return;
+
+       Assert(on_commit_stop_workers->nestDepth == nestDepth);
+
+       parent = on_commit_stop_workers->parent;
+
+       if (isCommit)
+       {
+               /*
+                * If the upper stack element is not an immediate parent
+                * subtransaction, just decrement the notional nesting depth without
+                * doing any real work.  Else, we need to merge the current workers
+                * list into the parent.
+                */
+               if (!parent || parent->nestDepth < nestDepth - 1)
+               {
+                       on_commit_stop_workers->nestDepth--;
+                       return;
+               }
+
+               parent->workers =
+                       list_concat(parent->workers, on_commit_stop_workers->workers);
+       }
+       else
+       {
+               /*
+                * Abandon everything that was done at this nesting level.  Explicitly
+                * free memory to avoid a transaction-lifespan leak.
+                */
+               list_free_deep(on_commit_stop_workers->workers);
+       }
+
+       /*
+        * We have taken care of the current subtransaction workers list for both
+        * abort or commit. So we are ready to pop the stack.
+        */
+       pfree(on_commit_stop_workers);
+       on_commit_stop_workers = parent;
+}
+
 /*
  * Request wakeup of the launcher on commit of the transaction.
  *
index ef02512412e97a720f5112e4f86982be021851fb..9f840b7bc1330d58c7cf86195282bd64e43ac6bf 100644 (file)
@@ -24,6 +24,7 @@ extern void ApplyLauncherShmemInit(void);
 extern void ApplyLauncherWakeupAtCommit(void);
 extern bool XactManipulatesLogicalReplicationWorkers(void);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
+extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth);
 
 extern bool IsLogicalLauncher(void);
 
index 03867cbce558eeb2560957694bd17454bd738738..ed68cc4085e008bbf393ab2b820645740192454f 100644 (file)
@@ -2227,6 +2227,7 @@ StdAnalyzeData
 StdRdOptions
 Step
 StopList
+StopWorkersData
 StrategyNumber
 StreamCtl
 StringInfo