]> granicus.if.org Git - postgresql/commitdiff
Make tablesync worker exit when apply dies while it was waiting for it
authorPeter Eisentraut <peter_e@gmx.net>
Sat, 3 Jun 2017 13:18:52 +0000 (09:18 -0400)
committerPeter Eisentraut <peter_e@gmx.net>
Sat, 3 Jun 2017 13:20:56 +0000 (09:20 -0400)
This avoids "orphaned" sync workers.

This was caused by a thinko in wait_for_sync_status_change.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reported-by: Masahiko Sawada <sawada.mshk@gmail.com>
src/backend/replication/logical/tablesync.c

index 85e480db4bdd6c95eceafac6f42a9ce0b8112d19..6e268f3521dba4fec25976c09030a183e04a9b37 100644 (file)
@@ -146,7 +146,12 @@ finish_sync_worker(void)
 /*
  * Wait until the table synchronization change.
  *
- * Returns false if the relation subscription state disappeared.
+ * If called from apply worker, it will wait for the synchronization worker to
+ * change table state in shmem.  If called from synchronization worker, it
+ * will wait for apply worker to change table state in shmem.
+ *
+ * Returns false if the opposite worker has disappeared or the table state has
+ * been reset.
  */
 static bool
 wait_for_sync_status_change(Oid relid, char origstate)
@@ -161,14 +166,27 @@ wait_for_sync_status_change(Oid relid, char origstate)
                CHECK_FOR_INTERRUPTS();
 
                LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+               /* Check if the opposite worker is still running and bail if not. */
                worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
-                                                                               relid, false);
+                                                                               am_tablesync_worker() ? InvalidOid : relid,
+                                                                               false);
                if (!worker)
                {
                        LWLockRelease(LogicalRepWorkerLock);
                        return false;
                }
+
+               /*
+                * If I'm the synchronization worker, look at my own state.  Otherwise
+                * look at the state of the synchronization worker we found above.
+                */
+               if (am_tablesync_worker())
+                       worker = MyLogicalRepWorker;
+
+               Assert(worker->relid == relid);
                state = worker->relstate;
+
                LWLockRelease(LogicalRepWorkerLock);
 
                if (state == SUBREL_STATE_UNKNOWN)
@@ -179,7 +197,7 @@ wait_for_sync_status_change(Oid relid, char origstate)
 
                rc = WaitLatch(&MyProc->procLatch,
                                           WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
-                                          10000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
+                                          1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
 
                /* emergency bailout if postmaster has died */
                if (rc & WL_POSTMASTER_DEATH)