*/
static void
WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
+ uint16 generation,
BackgroundWorkerHandle *handle)
{
BgwHandleStatus status;
int rc;
- uint16 generation;
-
- /* Remember generation for future identification. */
- generation = worker->generation;
for (;;)
{
}
/*
- * Start new apply background worker.
+ * Start new apply background worker, if possible.
*/
void
logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
{
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
+ uint16 generation;
int i;
int slot = 0;
LogicalRepWorker *worker = NULL;
worker->reply_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->reply_time);
+ /* Before releasing lock, remember generation for future identification. */
+ generation = worker->generation;
+
LWLockRelease(LogicalRepWorkerLock);
/* Register the new dynamic worker. */
if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
{
+ /* Failed to start worker, so clean up the worker slot. */
+ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+ Assert(generation == worker->generation);
+ logicalrep_worker_cleanup(worker);
+ LWLockRelease(LogicalRepWorkerLock);
+
ereport(WARNING,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of background worker slots"),
}
/* Now wait until it attaches. */
- WaitForReplicationWorkerAttach(worker, bgw_handle);
+ WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
}
/*