]> granicus.if.org Git - postgresql/commitdiff
Fix signal handling in logical replication workers
authorPeter Eisentraut <peter_e@gmx.net>
Fri, 2 Jun 2017 18:46:00 +0000 (14:46 -0400)
committerPeter Eisentraut <peter_e@gmx.net>
Fri, 2 Jun 2017 18:49:23 +0000 (14:49 -0400)
The logical replication worker processes now use the normal die()
handler for SIGTERM and CHECK_FOR_INTERRUPTS() instead of custom code.
One problem before was that the apply worker would not exit promptly
when a subscription was dropped, which could lead to deadlocks.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reported-by: Masahiko Sawada <sawada.mshk@gmail.com>
src/backend/replication/logical/launcher.c
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c
src/backend/tcop/postgres.c
src/include/replication/logicalworker.h
src/include/replication/worker_internal.h

index 63d903ac0217281816074803b17d03f83f071959..345a415212331ee2ea4f739df76ec23c75b7ca7a 100644 (file)
@@ -80,8 +80,8 @@ static void logicalrep_worker_detach(void);
 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
 
 /* Flags set by signal handlers */
-volatile sig_atomic_t got_SIGHUP = false;
-volatile sig_atomic_t got_SIGTERM = false;
+static volatile sig_atomic_t got_SIGHUP = false;
+static volatile sig_atomic_t got_SIGTERM = false;
 
 static bool on_commit_launcher_wakeup = false;
 
@@ -624,8 +624,8 @@ logicalrep_worker_onexit(int code, Datum arg)
 }
 
 /* SIGTERM: set flag to exit at next convenient time */
-void
-logicalrep_worker_sigterm(SIGNAL_ARGS)
+static void
+logicalrep_launcher_sigterm(SIGNAL_ARGS)
 {
        int                     save_errno = errno;
 
@@ -638,8 +638,8 @@ logicalrep_worker_sigterm(SIGNAL_ARGS)
 }
 
 /* SIGHUP: set flag to reload configuration at next convenient time */
-void
-logicalrep_worker_sighup(SIGNAL_ARGS)
+static void
+logicalrep_launcher_sighup(SIGNAL_ARGS)
 {
        int                     save_errno = errno;
 
@@ -799,8 +799,8 @@ ApplyLauncherMain(Datum main_arg)
        before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
 
        /* Establish signal handlers. */
-       pqsignal(SIGHUP, logicalrep_worker_sighup);
-       pqsignal(SIGTERM, logicalrep_worker_sigterm);
+       pqsignal(SIGHUP, logicalrep_launcher_sighup);
+       pqsignal(SIGTERM, logicalrep_launcher_sigterm);
        BackgroundWorkerUnblockSignals();
 
        /* Make it easy to identify our processes. */
index 515724e1026f05f9097f7d60f7788f077d4dafca..85e480db4bdd6c95eceafac6f42a9ce0b8112d19 100644 (file)
@@ -154,10 +154,12 @@ wait_for_sync_status_change(Oid relid, char origstate)
        int                     rc;
        char            state = origstate;
 
-       while (!got_SIGTERM)
+       for (;;)
        {
                LogicalRepWorker *worker;
 
+               CHECK_FOR_INTERRUPTS();
+
                LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
                                                                                relid, false);
@@ -525,7 +527,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
                bytesread += avail;
        }
 
-       while (!got_SIGTERM && maxread > 0 && bytesread < minread)
+       while (maxread > 0 && bytesread < minread)
        {
                pgsocket        fd = PGINVALID_SOCKET;
                int                     rc;
@@ -579,10 +581,6 @@ copy_read_data(void *outbuf, int minread, int maxread)
                ResetLatch(&MyProc->procLatch);
        }
 
-       /* Check for exit condition. */
-       if (got_SIGTERM)
-               proc_exit(0);
-
        return bytesread;
 }
 
index ea3ba1d5b4858bb5ca8827eb2f360d1b1792e319..e31551340c9266f601d4ad934c15c14c30737ebf 100644 (file)
@@ -72,6 +72,8 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 
+#include "tcop/tcopprot.h"
+
 #include "utils/builtins.h"
 #include "utils/catcache.h"
 #include "utils/datum.h"
@@ -118,6 +120,9 @@ static void store_flush_position(XLogRecPtr remote_lsn);
 
 static void reread_subscription(void);
 
+/* Flags set by signal handlers */
+static volatile sig_atomic_t got_SIGHUP = false;
+
 /*
  * Should this worker apply changes for given relation.
  *
@@ -1005,7 +1010,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
        /* mark as idle, before starting to loop */
        pgstat_report_activity(STATE_IDLE, NULL);
 
-       while (!got_SIGTERM)
+       for (;;)
        {
                pgsocket        fd = PGINVALID_SOCKET;
                int                     rc;
@@ -1015,6 +1020,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
                TimestampTz last_recv_timestamp = GetCurrentTimestamp();
                bool            ping_sent = false;
 
+               CHECK_FOR_INTERRUPTS();
+
                MemoryContextSwitchTo(ApplyMessageContext);
 
                len = walrcv_receive(wrconn, &buf, &fd);
@@ -1437,6 +1444,19 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
        MySubscriptionValid = false;
 }
 
+/* SIGHUP: set flag to reload configuration at next convenient time */
+static void
+logicalrep_worker_sighup(SIGNAL_ARGS)
+{
+       int                     save_errno = errno;
+
+       got_SIGHUP = true;
+
+       /* Waken anything waiting on the process latch */
+       SetLatch(MyLatch);
+
+       errno = save_errno;
+}
 
 /* Logical Replication Apply worker entry point */
 void
@@ -1454,7 +1474,7 @@ ApplyWorkerMain(Datum main_arg)
 
        /* Setup signal handling */
        pqsignal(SIGHUP, logicalrep_worker_sighup);
-       pqsignal(SIGTERM, logicalrep_worker_sigterm);
+       pqsignal(SIGTERM, die);
        BackgroundWorkerUnblockSignals();
 
        /* Initialise stats to a sanish value */
@@ -1604,6 +1624,14 @@ ApplyWorkerMain(Datum main_arg)
        /* Run the main loop. */
        LogicalRepApplyLoop(origin_startpos);
 
-       /* We should only get here if we received SIGTERM */
        proc_exit(0);
 }
+
+/*
+ * Is current process a logical replication worker?
+ */
+bool
+IsLogicalWorker(void)
+{
+       return MyLogicalRepWorker != NULL;
+}
index 75c2d9a61d0dc067e9844f986cf23e23f724e565..13577691505b3654132ddd7177685f008cc0805b 100644 (file)
@@ -55,6 +55,7 @@
 #include "pg_getopt.h"
 #include "postmaster/autovacuum.h"
 #include "postmaster/postmaster.h"
+#include "replication/logicalworker.h"
 #include "replication/slot.h"
 #include "replication/walsender.h"
 #include "rewrite/rewriteHandler.h"
@@ -2845,6 +2846,10 @@ ProcessInterrupts(void)
                        ereport(FATAL,
                                        (errcode(ERRCODE_ADMIN_SHUTDOWN),
                                         errmsg("terminating autovacuum process due to administrator command")));
+               else if (IsLogicalWorker())
+                       ereport(FATAL,
+                                       (errcode(ERRCODE_ADMIN_SHUTDOWN),
+                                        errmsg("terminating logical replication worker due to administrator command")));
                else if (RecoveryConflictPending && RecoveryConflictRetryable)
                {
                        pgstat_report_recovery_conflict(RecoveryConflictReason);
index 3e0affa190b728b01adf0e90fe4ec062fd37c861..5877a930f68ca635c68c25fa031a28e09b824b80 100644 (file)
@@ -14,4 +14,6 @@
 
 extern void ApplyWorkerMain(Datum main_arg);
 
+extern bool IsLogicalWorker(void);
+
 #endif   /* LOGICALWORKER_H */
index 0654461305b33cb420f62d871b71eb88e2394254..2bfff5c1205dc499f845f2b725db56473773ab51 100644 (file)
@@ -67,8 +67,6 @@ extern Subscription *MySubscription;
 extern LogicalRepWorker *MyLogicalRepWorker;
 
 extern bool in_remote_transaction;
-extern volatile sig_atomic_t got_SIGHUP;
-extern volatile sig_atomic_t got_SIGTERM;
 
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
@@ -81,8 +79,6 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
 extern int     logicalrep_sync_worker_count(Oid subid);
 
-extern void logicalrep_worker_sighup(SIGNAL_ARGS);
-extern void logicalrep_worker_sigterm(SIGNAL_ARGS);
 extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
 void           process_syncing_tables(XLogRecPtr current_lsn);
 void invalidate_syncing_table_states(Datum arg, int cacheid,