]> granicus.if.org Git - postgresql/commitdiff
Only kill sync workers at commit time in subscription DDL
authorPeter Eisentraut <peter_e@gmx.net>
Sat, 5 Aug 2017 01:14:35 +0000 (21:14 -0400)
committerPeter Eisentraut <peter_e@gmx.net>
Sat, 5 Aug 2017 01:17:47 +0000 (21:17 -0400)
This allows a transaction abort to avoid killing those workers.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>

src/backend/access/transam/xact.c
src/backend/commands/subscriptioncmds.c
src/backend/replication/logical/launcher.c
src/include/replication/logicallauncher.h
src/include/replication/worker_internal.h

index b0aa69fe4b43de546f0b4dedae97c313f2f4812f..50c3c3b5e5e3763d795294cf545c1f1bdaa6f0ed 100644 (file)
@@ -2277,6 +2277,15 @@ PrepareTransaction(void)
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                 errmsg("cannot PREPARE a transaction that has exported snapshots")));
 
+       /*
+        * Don't allow PREPARE but for transaction that has/might kill logical
+        * replication workers.
+        */
+       if (XactManipulatesLogicalReplicationWorkers())
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("cannot PREPARE a transaction that has manipulated logical replication workers")));
+
        /* Prevent cancel/die interrupt while cleaning up */
        HOLD_INTERRUPTS();
 
index 6dc3f6ee0009dedec1b7116b2fc167a849a8eb5c..87824b8fec3fe11e53012e4035dda6aaafe80b2d 100644 (file)
@@ -597,7 +597,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 
                        RemoveSubscriptionRel(sub->oid, relid);
 
-                       logicalrep_worker_stop(sub->oid, relid);
+                       logicalrep_worker_stop_at_commit(sub->oid, relid);
 
                        namespace = get_namespace_name(get_rel_namespace(relid));
                        ereport(NOTICE,
@@ -819,6 +819,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
        char       *subname;
        char       *conninfo;
        char       *slotname;
+       List       *subworkers;
+       ListCell   *lc;
        char            originname[NAMEDATALEN];
        char       *err = NULL;
        RepOriginId originid;
@@ -909,15 +911,33 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 
        ReleaseSysCache(tup);
 
+       /*
+        * If we are dropping the replication slot, stop all the subscription
+        * workers immediately, so that the slot becomes accessible.  Otherwise
+        * just schedule the stopping for the end of the transaction.
+        *
+        * New workers won't be started because we hold an exclusive lock on the
+        * subscription till the end of the transaction.
+        */
+       LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+       subworkers = logicalrep_workers_find(subid, false);
+       LWLockRelease(LogicalRepWorkerLock);
+       foreach (lc, subworkers)
+       {
+               LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+               if (slotname)
+                       logicalrep_worker_stop(w->subid, w->relid);
+               else
+                       logicalrep_worker_stop_at_commit(w->subid, w->relid);
+       }
+       list_free(subworkers);
+
        /* Clean up dependencies */
        deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
        /* Remove any associated relation synchronization states. */
        RemoveSubscriptionRel(subid, InvalidOid);
 
-       /* Kill the apply worker so that the slot becomes accessible. */
-       logicalrep_worker_stop(subid, InvalidOid);
-
        /* Remove the origin tracking if exists. */
        snprintf(originname, sizeof(originname), "pg_%u", subid);
        originid = replorigin_by_name(originname, true);
index d165d518e1b35d9630683641b2f1e67f27e3b932..0f9e5755b9e747b67f739e21664f6dbf7cea718b 100644 (file)
@@ -73,6 +73,14 @@ typedef struct LogicalRepCtxStruct
 
 LogicalRepCtxStruct *LogicalRepCtx;
 
+typedef struct LogicalRepWorkerId
+{
+       Oid     subid;
+       Oid relid;
+} LogicalRepWorkerId;
+
+static List *on_commit_stop_workers = NIL;
+
 static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
@@ -249,6 +257,30 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
        return res;
 }
 
+/*
+ * Similar to logicalrep_worker_find(), but returns list of all workers for
+ * the subscription, instead just one.
+ */
+List *
+logicalrep_workers_find(Oid subid, bool only_running)
+{
+       int                     i;
+       List       *res = NIL;
+
+       Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+       /* Search for attached worker for a given subscription id. */
+       for (i = 0; i < max_logical_replication_workers; i++)
+       {
+               LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+               if (w->in_use && w->subid == subid && (!only_running || w->proc))
+                       res = lappend(res, w);
+       }
+
+       return res;
+}
+
 /*
  * Start new apply background worker.
  */
@@ -513,6 +545,27 @@ logicalrep_worker_stop(Oid subid, Oid relid)
        LWLockRelease(LogicalRepWorkerLock);
 }
 
+/*
+ * Request worker for specified sub/rel to be stopped on commit.
+ */
+void
+logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
+{
+       LogicalRepWorkerId *wid;
+       MemoryContext           oldctx;
+
+       /* Make sure we store the info in context that survives until commit. */
+       oldctx = MemoryContextSwitchTo(TopTransactionContext);
+
+       wid = palloc(sizeof(LogicalRepWorkerId));
+       wid->subid = subid;
+       wid->relid = relid;
+
+       on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+
+       MemoryContextSwitchTo(oldctx);
+}
+
 /*
  * Wake up (using latch) any logical replication worker for specified sub/rel.
  */
@@ -753,15 +806,41 @@ ApplyLauncherShmemInit(void)
        }
 }
 
+/*
+ * Check whether current transaction has manipulated logical replication
+ * workers.
+ */
+bool
+XactManipulatesLogicalReplicationWorkers(void)
+{
+       return (on_commit_stop_workers != NIL);
+}
+
 /*
  * Wakeup the launcher on commit if requested.
  */
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
-       if (isCommit && on_commit_launcher_wakeup)
-               ApplyLauncherWakeup();
+       if (isCommit)
+       {
+               ListCell *lc;
 
+               foreach (lc, on_commit_stop_workers)
+               {
+                       LogicalRepWorkerId *wid = lfirst(lc);
+                       logicalrep_worker_stop(wid->subid, wid->relid);
+               }
+
+               if (on_commit_launcher_wakeup)
+                       ApplyLauncherWakeup();
+       }
+
+       /*
+        * No need to pfree on_commit_stop_workers.  It was allocated in
+        * transaction memory context, which is going to be cleaned soon.
+        */
+       on_commit_stop_workers = NIL;
        on_commit_launcher_wakeup = false;
 }
 
index aac7d326e22cf2d1cbe3dc9cca42f0384b7327b9..78016c448f34964507c23203bb57808010973edf 100644 (file)
@@ -22,6 +22,7 @@ extern Size ApplyLauncherShmemSize(void);
 extern void ApplyLauncherShmemInit(void);
 
 extern void ApplyLauncherWakeupAtCommit(void);
+extern bool XactManipulatesLogicalReplicationWorkers(void);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
 
 extern bool IsLogicalLauncher(void);
index 494a3a3d087d2dea9cccb5b2bfd3a0fbc40e5905..7b8728cced0bb1b0e7b4f1ff2abbf1a0d5f91f98 100644 (file)
@@ -71,9 +71,11 @@ extern bool in_remote_transaction;
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
                                           bool only_running);
+extern List *logicalrep_workers_find(Oid subid, bool only_running);
 extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
                                                 Oid userid, Oid relid);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);