/*
* Wait until the table synchronization change.
*
- * Returns false if the relation subscription state disappeared.
+ * If called from apply worker, it will wait for the synchronization worker to
+ * change table state in shmem. If called from synchronization worker, it
+ * will wait for apply worker to change table state in shmem.
+ *
+ * Returns false if the opposite worker has disappeared or the table state has
+ * been reset.
*/
static bool
wait_for_sync_status_change(Oid relid, char origstate)
CHECK_FOR_INTERRUPTS();
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ /* Check if the opposite worker is still running and bail if not. */
worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
- relid, false);
+ am_tablesync_worker() ? InvalidOid : relid,
+ false);
if (!worker)
{
LWLockRelease(LogicalRepWorkerLock);
return false;
}
+
+ /*
+ * If I'm the synchronization worker, look at my own state. Otherwise
+ * look at the state of the synchronization worker we found above.
+ */
+ if (am_tablesync_worker())
+ worker = MyLogicalRepWorker;
+
+ Assert(worker->relid == relid);
state = worker->relstate;
+
LWLockRelease(LogicalRepWorkerLock);
if (state == SUBREL_STATE_UNKNOWN)
rc = WaitLatch(&MyProc->procLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
- 10000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
+ 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
/* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)