#include "replication/logicallauncher.h"
#include "replication/logicalworker.h"
#include "replication/slot.h"
+#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
static void logicalrep_worker_detach(void);
+static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
/* Flags set by signal handlers */
volatile sig_atomic_t got_SIGHUP = false;
/*
* Wait for a background worker to start up and attach to the shmem context.
*
- * This is like WaitForBackgroundWorkerStartup(), except that we wait for
- * attaching, not just start and we also just exit if postmaster died.
+ * This is only needed for cleaning up the shared memory in case the worker
+ * fails to attach.
*/
-static bool
+static void
WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
BackgroundWorkerHandle *handle)
{
BgwHandleStatus status;
int rc;
+ uint16 generation;
+
+ /* Remember generation for future identification. */
+ generation = worker->generation;
for (;;)
{
CHECK_FOR_INTERRUPTS();
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ /* Worker either died or has started; no need to do anything. */
+ if (!worker->in_use || worker->proc)
+ {
+ LWLockRelease(LogicalRepWorkerLock);
+ return;
+ }
+
+ LWLockRelease(LogicalRepWorkerLock);
+
+ /* Check if worker has died before attaching, and clean up after it. */
status = GetBackgroundWorkerPid(handle, &pid);
- /*
- * Worker started and attached to our shmem. This check is safe
- * because only launcher ever starts the workers, so nobody can steal
- * the worker slot.
- */
- if (status == BGWH_STARTED && worker->proc)
- return true;
- /* Worker didn't start or died before attaching to our shmem. */
if (status == BGWH_STOPPED)
- return false;
+ {
+ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+ /* Ensure that this was indeed the worker we waited for. */
+ if (generation == worker->generation)
+ logicalrep_worker_cleanup(worker);
+ LWLockRelease(LogicalRepWorkerLock);
+ return;
+ }
/*
* We need timeout because we generally don't get notified via latch
ResetLatch(MyLatch);
}
- return false;
+ return;
}
/*
for (i = 0; i < max_logical_replication_workers; i++)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
- if (w->subid == subid && w->relid == relid &&
- (!only_running || (w->proc && IsBackendPid(w->proc->pid))))
+ if (w->in_use && w->subid == subid && w->relid == relid &&
+ (!only_running || w->proc))
{
res = w;
break;
{
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
+ int i;
int slot;
LogicalRepWorker *worker = NULL;
+ int nsyncworkers;
+ TimestampTz now;
ereport(LOG,
(errmsg("starting logical replication worker for subscription \"%s\"",
*/
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+retry:
/* Find unused worker slot. */
- for (slot = 0; slot < max_logical_replication_workers; slot++)
+ for (i = 0; i < max_logical_replication_workers; i++)
{
- if (!LogicalRepCtx->workers[slot].proc)
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+ if (!w->in_use)
{
- worker = &LogicalRepCtx->workers[slot];
+ worker = w;
+ slot = i;
break;
}
}
- /* Bail if not found */
+ nsyncworkers = logicalrep_sync_worker_count(subid);
+
+ now = GetCurrentTimestamp();
+
+ /*
+ * If we didn't find a free slot, try to do garbage collection. The
+ * reason we do this is because if some worker failed to start up and its
+ * parent has crashed while waiting, the in_use state was never cleared.
+ */
+ if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
+ {
+ bool did_cleanup = false;
+
+ for (i = 0; i < max_logical_replication_workers; i++)
+ {
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+ /*
+ * If the worker was marked in use but didn't manage to attach in
+ * time, clean it up.
+ */
+ if (w->in_use && !w->proc &&
+ TimestampDifferenceExceeds(w->launch_time, now,
+ wal_receiver_timeout))
+ {
+ elog(WARNING,
+ "logical replication worker for subscription \"%d\" took too long to start; canceled",
+ worker->subid);
+
+ logicalrep_worker_cleanup(w);
+ did_cleanup = true;
+ }
+ }
+
+ if (did_cleanup)
+ goto retry;
+ }
+
+ /*
+ * If we reached the sync worker limit per subscription, just exit
+ * silently as we might get here because of an otherwise harmless race
+ * condition.
+ */
+ if (nsyncworkers >= max_sync_workers_per_subscription)
+ {
+ LWLockRelease(LogicalRepWorkerLock);
+ return;
+ }
+
+ /*
+ * However if there are no more free worker slots, inform user about it
+ * before exiting.
+ */
if (worker == NULL)
{
LWLockRelease(LogicalRepWorkerLock);
return;
}
- /* Prepare the worker info. */
+ /* Prepare the worker slot. */
+ worker->launch_time = now;
+ worker->in_use = true;
+ worker->generation++;
worker->proc = NULL;
worker->dbid = dbid;
worker->userid = userid;
logicalrep_worker_stop(Oid subid, Oid relid)
{
LogicalRepWorker *worker;
+ uint16 generation;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
return;
}
+ /*
+ * Remember which generation was our worker so we can check if what we see
+ * is still the same one.
+ */
+ generation = worker->generation;
+
/*
* If we found worker but it does not have proc set it is starting up,
* wait for it to finish and then kill it.
*/
- while (worker && !worker->proc)
+ while (worker->in_use && !worker->proc)
{
int rc;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
/*
- * Worker is no longer associated with subscription. It must have
- * exited, nothing more for us to do.
+ * Check whether the worker slot is no longer used, which would mean
+ * that the worker has exited, or whether the worker generation is
+ * different, meaning that a different worker has taken the slot.
*/
- if (worker->subid == InvalidOid)
+ if (!worker->in_use || worker->generation != generation)
{
LWLockRelease(LogicalRepWorkerLock);
return;
int rc;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- if (!worker->proc)
+ if (!worker->proc || worker->generation != generation)
{
LWLockRelease(LogicalRepWorkerLock);
break;
Assert(slot >= 0 && slot < max_logical_replication_workers);
MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
+ if (!MyLogicalRepWorker->in_use)
+ {
+ LWLockRelease(LogicalRepWorkerLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication worker slot %d is empty, cannot attach",
+ slot)));
+ }
+
if (MyLogicalRepWorker->proc)
+ {
+ LWLockRelease(LogicalRepWorkerLock);
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("logical replication worker slot %d already used by "
- "another worker", slot)));
+ errmsg("logical replication worker slot %d is already used by "
+ "another worker, cannot attach", slot)));
+ }
MyLogicalRepWorker->proc = MyProc;
before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
/* Block concurrent access. */
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
- MyLogicalRepWorker->dbid = InvalidOid;
- MyLogicalRepWorker->userid = InvalidOid;
- MyLogicalRepWorker->subid = InvalidOid;
- MyLogicalRepWorker->proc = NULL;
+ logicalrep_worker_cleanup(MyLogicalRepWorker);
LWLockRelease(LogicalRepWorkerLock);
}
+/*
+ * Clean up worker info.
+ */
+static void
+logicalrep_worker_cleanup(LogicalRepWorker *worker)
+{
+ Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
+
+ worker->in_use = false;
+ worker->proc = NULL;
+ worker->dbid = InvalidOid;
+ worker->userid = InvalidOid;
+ worker->subid = InvalidOid;
+ worker->relid = InvalidOid;
+}
+
/*
* Cleanup function for logical replication launcher.
*
if (sub->enabled && w == NULL)
{
- logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
- sub->owner, InvalidOid);
last_start_time = now;
wait_time = wal_retrieve_retry_interval;
- /* Limit to one worker per mainloop cycle. */
- break;
+
+ logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+ sub->owner, InvalidOid);
}
}