From: Peter Eisentraut Date: Sat, 5 Aug 2017 01:14:35 +0000 (-0400) Subject: Only kill sync workers at commit time in subscription DDL X-Git-Tag: REL_10_BETA3~23 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=7e174fa793a2df89fe03d002a5087ef67abcdde8;p=postgresql Only kill sync workers at commit time in subscription DDL This allows a transaction abort to avoid killing those workers. Author: Petr Jelinek --- diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b0aa69fe4b..50c3c3b5e5 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2277,6 +2277,15 @@ PrepareTransaction(void) (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot PREPARE a transaction that has exported snapshots"))); + /* + * Don't allow PREPARE but for transaction that has/might kill logical + * replication workers. + */ + if (XactManipulatesLogicalReplicationWorkers()) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot PREPARE a transaction that has manipulated logical replication workers"))); + /* Prevent cancel/die interrupt while cleaning up */ HOLD_INTERRUPTS(); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 6dc3f6ee00..87824b8fec 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -597,7 +597,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) RemoveSubscriptionRel(sub->oid, relid); - logicalrep_worker_stop(sub->oid, relid); + logicalrep_worker_stop_at_commit(sub->oid, relid); namespace = get_namespace_name(get_rel_namespace(relid)); ereport(NOTICE, @@ -819,6 +819,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) char *subname; char *conninfo; char *slotname; + List *subworkers; + ListCell *lc; char originname[NAMEDATALEN]; char *err = NULL; RepOriginId originid; @@ -909,15 +911,33 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) ReleaseSysCache(tup); + /* + * If we are dropping the replication slot, stop all the subscription + * workers immediately, so that the slot becomes accessible. Otherwise + * just schedule the stopping for the end of the transaction. + * + * New workers won't be started because we hold an exclusive lock on the + * subscription till the end of the transaction. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + subworkers = logicalrep_workers_find(subid, false); + LWLockRelease(LogicalRepWorkerLock); + foreach (lc, subworkers) + { + LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); + if (slotname) + logicalrep_worker_stop(w->subid, w->relid); + else + logicalrep_worker_stop_at_commit(w->subid, w->relid); + } + list_free(subworkers); + /* Clean up dependencies */ deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); /* Remove any associated relation synchronization states. */ RemoveSubscriptionRel(subid, InvalidOid); - /* Kill the apply worker so that the slot becomes accessible. */ - logicalrep_worker_stop(subid, InvalidOid); - /* Remove the origin tracking if exists. */ snprintf(originname, sizeof(originname), "pg_%u", subid); originid = replorigin_by_name(originname, true); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index d165d518e1..0f9e5755b9 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -73,6 +73,14 @@ typedef struct LogicalRepCtxStruct LogicalRepCtxStruct *LogicalRepCtx; +typedef struct LogicalRepWorkerId +{ + Oid subid; + Oid relid; +} LogicalRepWorkerId; + +static List *on_commit_stop_workers = NIL; + static void ApplyLauncherWakeup(void); static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); @@ -249,6 +257,30 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) return res; } +/* + * Similar to logicalrep_worker_find(), but returns list of all workers for + * the subscription, instead just one. + */ +List * +logicalrep_workers_find(Oid subid, bool only_running) +{ + int i; + List *res = NIL; + + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + + /* Search for attached worker for a given subscription id. */ + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + if (w->in_use && w->subid == subid && (!only_running || w->proc)) + res = lappend(res, w); + } + + return res; +} + /* * Start new apply background worker. */ @@ -513,6 +545,27 @@ logicalrep_worker_stop(Oid subid, Oid relid) LWLockRelease(LogicalRepWorkerLock); } +/* + * Request worker for specified sub/rel to be stopped on commit. + */ +void +logicalrep_worker_stop_at_commit(Oid subid, Oid relid) +{ + LogicalRepWorkerId *wid; + MemoryContext oldctx; + + /* Make sure we store the info in context that survives until commit. */ + oldctx = MemoryContextSwitchTo(TopTransactionContext); + + wid = palloc(sizeof(LogicalRepWorkerId)); + wid->subid = subid; + wid->relid = relid; + + on_commit_stop_workers = lappend(on_commit_stop_workers, wid); + + MemoryContextSwitchTo(oldctx); +} + /* * Wake up (using latch) any logical replication worker for specified sub/rel. */ @@ -753,15 +806,41 @@ ApplyLauncherShmemInit(void) } } +/* + * Check whether current transaction has manipulated logical replication + * workers. + */ +bool +XactManipulatesLogicalReplicationWorkers(void) +{ + return (on_commit_stop_workers != NIL); +} + /* * Wakeup the launcher on commit if requested. */ void AtEOXact_ApplyLauncher(bool isCommit) { - if (isCommit && on_commit_launcher_wakeup) - ApplyLauncherWakeup(); + if (isCommit) + { + ListCell *lc; + foreach (lc, on_commit_stop_workers) + { + LogicalRepWorkerId *wid = lfirst(lc); + logicalrep_worker_stop(wid->subid, wid->relid); + } + + if (on_commit_launcher_wakeup) + ApplyLauncherWakeup(); + } + + /* + * No need to pfree on_commit_stop_workers. It was allocated in + * transaction memory context, which is going to be cleaned soon. + */ + on_commit_stop_workers = NIL; on_commit_launcher_wakeup = false; } diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index aac7d326e2..78016c448f 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -22,6 +22,7 @@ extern Size ApplyLauncherShmemSize(void); extern void ApplyLauncherShmemInit(void); extern void ApplyLauncherWakeupAtCommit(void); +extern bool XactManipulatesLogicalReplicationWorkers(void); extern void AtEOXact_ApplyLauncher(bool isCommit); extern bool IsLogicalLauncher(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 494a3a3d08..7b8728cced 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -71,9 +71,11 @@ extern bool in_remote_transaction; extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); +extern List *logicalrep_workers_find(Oid subid, bool only_running); extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid); extern void logicalrep_worker_stop(Oid subid, Oid relid); +extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);