]> granicus.if.org Git - postgresql/commitdiff
Add subtransaction handling for table synchronization workers.
authorRobert Haas <rhaas@postgresql.org>
Mon, 16 Jul 2018 21:33:22 +0000 (17:33 -0400)
committerRobert Haas <rhaas@postgresql.org>
Mon, 16 Jul 2018 21:55:13 +0000 (17:55 -0400)
Since the old logic was completely unaware of subtransactions, a
change made in a subsequently-aborted subtransaction would still cause
workers to be stopped at toplevel transaction commit.  Fix that by
managing a stack of worker lists rather than just one.

Amit Khandekar and Robert Haas

Discussion: http://postgr.es/m/CAJ3gD9eaG_mWqiOTA2LfAug-VRNn1hrhf50Xi1YroxL37QkZNg@mail.gmail.com

src/backend/access/transam/xact.c
src/backend/replication/logical/launcher.c
src/include/replication/logicallauncher.h
src/tools/pgindent/typedefs.list

index df3dc1c168a5a7ead1af2eb1ff3e66956852a762..9004e38e6d4405f35161da3b6d55c6fb13c2220b 100644 (file)
@@ -4542,6 +4542,7 @@ CommitSubTransaction(void)
        AtEOSubXact_HashTables(true, s->nestingLevel);
        AtEOSubXact_PgStat(true, s->nestingLevel);
        AtSubCommit_Snapshot(s->nestingLevel);
+       AtEOSubXact_ApplyLauncher(true, s->nestingLevel);
 
        /*
         * We need to restore the upper transaction's read-only state, in case the
@@ -4695,6 +4696,7 @@ AbortSubTransaction(void)
                AtEOSubXact_HashTables(false, s->nestingLevel);
                AtEOSubXact_PgStat(false, s->nestingLevel);
                AtSubAbort_Snapshot(s->nestingLevel);
+               AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
        }
 
        /*
index 44bdcab3b9796fb21d0c3957cb47d6202e166792..ef803b86fef09ea183377c5261a1694dd975aa31 100644 (file)
@@ -79,7 +79,19 @@ typedef struct LogicalRepWorkerId
        Oid                     relid;
 } LogicalRepWorkerId;
 
-static List *on_commit_stop_workers = NIL;
+typedef struct StopWorkersData
+{
+       int                     nestDepth;              /* Sub-transaction nest level */
+       List       *workers;            /* List of LogicalRepWorkerId */
+       struct StopWorkersData *parent; /* This need not be an immediate
+                                                                        * subtransaction parent */
+} StopWorkersData;
+
+/*
+ * Stack of StopWorkersData elements. Each stack element contains the workers
+ * to be stopped for that subtransaction.
+ */
+static StopWorkersData *on_commit_stop_workers = NULL;
 
 static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
@@ -558,17 +570,41 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 void
 logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
 {
+       int                     nestDepth = GetCurrentTransactionNestLevel();
        LogicalRepWorkerId *wid;
        MemoryContext oldctx;
 
        /* Make sure we store the info in context that survives until commit. */
        oldctx = MemoryContextSwitchTo(TopTransactionContext);
 
+       /* Check that previous transactions were properly cleaned up. */
+       Assert(on_commit_stop_workers == NULL ||
+                  nestDepth >= on_commit_stop_workers->nestDepth);
+
+       /*
+        * Push a new stack element if we don't already have one for the current
+        * nestDepth.
+        */
+       if (on_commit_stop_workers == NULL ||
+               nestDepth > on_commit_stop_workers->nestDepth)
+       {
+               StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
+
+               newdata->nestDepth = nestDepth;
+               newdata->workers = NIL;
+               newdata->parent = on_commit_stop_workers;
+               on_commit_stop_workers = newdata;
+       }
+
+       /*
+        * Finally add a new worker into the worker list of the current
+        * subtransaction.
+        */
        wid = palloc(sizeof(LogicalRepWorkerId));
        wid->subid = subid;
        wid->relid = relid;
-
-       on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+       on_commit_stop_workers->workers =
+               lappend(on_commit_stop_workers->workers, wid);
 
        MemoryContextSwitchTo(oldctx);
 }
@@ -820,7 +856,7 @@ ApplyLauncherShmemInit(void)
 bool
 XactManipulatesLogicalReplicationWorkers(void)
 {
-       return (on_commit_stop_workers != NIL);
+       return (on_commit_stop_workers != NULL);
 }
 
 /*
@@ -829,15 +865,25 @@ XactManipulatesLogicalReplicationWorkers(void)
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
+
+       Assert(on_commit_stop_workers == NULL ||
+                  (on_commit_stop_workers->nestDepth == 1 &&
+                       on_commit_stop_workers->parent == NULL));
+
        if (isCommit)
        {
                ListCell   *lc;
 
-               foreach(lc, on_commit_stop_workers)
+               if (on_commit_stop_workers != NULL)
                {
-                       LogicalRepWorkerId *wid = lfirst(lc);
+                       List       *workers = on_commit_stop_workers->workers;
+
+                       foreach(lc, workers)
+                       {
+                               LogicalRepWorkerId *wid = lfirst(lc);
 
-                       logicalrep_worker_stop(wid->subid, wid->relid);
+                               logicalrep_worker_stop(wid->subid, wid->relid);
+                       }
                }
 
                if (on_commit_launcher_wakeup)
@@ -848,10 +894,64 @@ AtEOXact_ApplyLauncher(bool isCommit)
         * No need to pfree on_commit_stop_workers.  It was allocated in
         * transaction memory context, which is going to be cleaned soon.
         */
-       on_commit_stop_workers = NIL;
+       on_commit_stop_workers = NULL;
        on_commit_launcher_wakeup = false;
 }
 
+/*
+ * On commit, merge the current on_commit_stop_workers list into the
+ * immediate parent, if present.
+ * On rollback, discard the current on_commit_stop_workers list.
+ * Pop out the stack.
+ */
+void
+AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
+{
+       StopWorkersData *parent;
+
+       /* Exit immediately if there's no work to do at this level. */
+       if (on_commit_stop_workers == NULL ||
+               on_commit_stop_workers->nestDepth < nestDepth)
+               return;
+
+       Assert(on_commit_stop_workers->nestDepth == nestDepth);
+
+       parent = on_commit_stop_workers->parent;
+
+       if (isCommit)
+       {
+               /*
+                * If the upper stack element is not an immediate parent
+                * subtransaction, just decrement the notional nesting depth without
+                * doing any real work.  Else, we need to merge the current workers
+                * list into the parent.
+                */
+               if (!parent || parent->nestDepth < nestDepth - 1)
+               {
+                       on_commit_stop_workers->nestDepth--;
+                       return;
+               }
+
+               parent->workers =
+                       list_concat(parent->workers, on_commit_stop_workers->workers);
+       }
+       else
+       {
+               /*
+                * Abandon everything that was done at this nesting level.  Explicitly
+                * free memory to avoid a transaction-lifespan leak.
+                */
+               list_free_deep(on_commit_stop_workers->workers);
+       }
+
+       /*
+        * We have taken care of the current subtransaction workers list for both
+        * abort or commit. So we are ready to pop the stack.
+        */
+       pfree(on_commit_stop_workers);
+       on_commit_stop_workers = parent;
+}
+
 /*
  * Request wakeup of the launcher on commit of the transaction.
  *
index 78016c448f34964507c23203bb57808010973edf..84f6041e727a89f7392d4543efffbafe85a9ac68 100644 (file)
@@ -24,6 +24,7 @@ extern void ApplyLauncherShmemInit(void);
 extern void ApplyLauncherWakeupAtCommit(void);
 extern bool XactManipulatesLogicalReplicationWorkers(void);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
+extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth);
 
 extern bool IsLogicalLauncher(void);
 
index 8166d86ca1d971d2f55607a378065fc6aeebd784..6da8ae40a523c21b5021303332bafc8a4ce2e852 100644 (file)
@@ -2112,6 +2112,7 @@ StdAnalyzeData
 StdRdOptions
 Step
 StopList
+StopWorkersData
 StrategyNumber
 StreamCtl
 StringInfo