From: Peter Eisentraut Date: Fri, 2 Jun 2017 18:46:00 +0000 (-0400) Subject: Fix signal handling in logical replication workers X-Git-Tag: REL_10_BETA2~249 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=9fcf670c2efdf31233d429f557ab77937f0f1e6a;p=postgresql Fix signal handling in logical replication workers The logical replication worker processes now use the normal die() handler for SIGTERM and CHECK_FOR_INTERRUPTS() instead of custom code. One problem before was that the apply worker would not exit promptly when a subscription was dropped, which could lead to deadlocks. Author: Petr Jelinek Reported-by: Masahiko Sawada --- diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 63d903ac02..345a415212 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -80,8 +80,8 @@ static void logicalrep_worker_detach(void); static void logicalrep_worker_cleanup(LogicalRepWorker *worker); /* Flags set by signal handlers */ -volatile sig_atomic_t got_SIGHUP = false; -volatile sig_atomic_t got_SIGTERM = false; +static volatile sig_atomic_t got_SIGHUP = false; +static volatile sig_atomic_t got_SIGTERM = false; static bool on_commit_launcher_wakeup = false; @@ -624,8 +624,8 @@ logicalrep_worker_onexit(int code, Datum arg) } /* SIGTERM: set flag to exit at next convenient time */ -void -logicalrep_worker_sigterm(SIGNAL_ARGS) +static void +logicalrep_launcher_sigterm(SIGNAL_ARGS) { int save_errno = errno; @@ -638,8 +638,8 @@ logicalrep_worker_sigterm(SIGNAL_ARGS) } /* SIGHUP: set flag to reload configuration at next convenient time */ -void -logicalrep_worker_sighup(SIGNAL_ARGS) +static void +logicalrep_launcher_sighup(SIGNAL_ARGS) { int save_errno = errno; @@ -799,8 +799,8 @@ ApplyLauncherMain(Datum main_arg) before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0); /* Establish signal handlers. */ - pqsignal(SIGHUP, logicalrep_worker_sighup); - pqsignal(SIGTERM, logicalrep_worker_sigterm); + pqsignal(SIGHUP, logicalrep_launcher_sighup); + pqsignal(SIGTERM, logicalrep_launcher_sigterm); BackgroundWorkerUnblockSignals(); /* Make it easy to identify our processes. */ diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 515724e102..85e480db4b 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -154,10 +154,12 @@ wait_for_sync_status_change(Oid relid, char origstate) int rc; char state = origstate; - while (!got_SIGTERM) + for (;;) { LogicalRepWorker *worker; + CHECK_FOR_INTERRUPTS(); + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid, false); @@ -525,7 +527,7 @@ copy_read_data(void *outbuf, int minread, int maxread) bytesread += avail; } - while (!got_SIGTERM && maxread > 0 && bytesread < minread) + while (maxread > 0 && bytesread < minread) { pgsocket fd = PGINVALID_SOCKET; int rc; @@ -579,10 +581,6 @@ copy_read_data(void *outbuf, int minread, int maxread) ResetLatch(&MyProc->procLatch); } - /* Check for exit condition. */ - if (got_SIGTERM) - proc_exit(0); - return bytesread; } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ea3ba1d5b4..e31551340c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -72,6 +72,8 @@ #include "storage/proc.h" #include "storage/procarray.h" +#include "tcop/tcopprot.h" + #include "utils/builtins.h" #include "utils/catcache.h" #include "utils/datum.h" @@ -118,6 +120,9 @@ static void store_flush_position(XLogRecPtr remote_lsn); static void reread_subscription(void); +/* Flags set by signal handlers */ +static volatile sig_atomic_t got_SIGHUP = false; + /* * Should this worker apply changes for given relation. * @@ -1005,7 +1010,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); - while (!got_SIGTERM) + for (;;) { pgsocket fd = PGINVALID_SOCKET; int rc; @@ -1015,6 +1020,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) TimestampTz last_recv_timestamp = GetCurrentTimestamp(); bool ping_sent = false; + CHECK_FOR_INTERRUPTS(); + MemoryContextSwitchTo(ApplyMessageContext); len = walrcv_receive(wrconn, &buf, &fd); @@ -1437,6 +1444,19 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) MySubscriptionValid = false; } +/* SIGHUP: set flag to reload configuration at next convenient time */ +static void +logicalrep_worker_sighup(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGHUP = true; + + /* Waken anything waiting on the process latch */ + SetLatch(MyLatch); + + errno = save_errno; +} /* Logical Replication Apply worker entry point */ void @@ -1454,7 +1474,7 @@ ApplyWorkerMain(Datum main_arg) /* Setup signal handling */ pqsignal(SIGHUP, logicalrep_worker_sighup); - pqsignal(SIGTERM, logicalrep_worker_sigterm); + pqsignal(SIGTERM, die); BackgroundWorkerUnblockSignals(); /* Initialise stats to a sanish value */ @@ -1604,6 +1624,14 @@ ApplyWorkerMain(Datum main_arg) /* Run the main loop. */ LogicalRepApplyLoop(origin_startpos); - /* We should only get here if we received SIGTERM */ proc_exit(0); } + +/* + * Is current process a logical replication worker? + */ +bool +IsLogicalWorker(void) +{ + return MyLogicalRepWorker != NULL; +} diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 75c2d9a61d..1357769150 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -55,6 +55,7 @@ #include "pg_getopt.h" #include "postmaster/autovacuum.h" #include "postmaster/postmaster.h" +#include "replication/logicalworker.h" #include "replication/slot.h" #include "replication/walsender.h" #include "rewrite/rewriteHandler.h" @@ -2845,6 +2846,10 @@ ProcessInterrupts(void) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("terminating autovacuum process due to administrator command"))); + else if (IsLogicalWorker()) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating logical replication worker due to administrator command"))); else if (RecoveryConflictPending && RecoveryConflictRetryable) { pgstat_report_recovery_conflict(RecoveryConflictReason); diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 3e0affa190..5877a930f6 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -14,4 +14,6 @@ extern void ApplyWorkerMain(Datum main_arg); +extern bool IsLogicalWorker(void); + #endif /* LOGICALWORKER_H */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 0654461305..2bfff5c120 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -67,8 +67,6 @@ extern Subscription *MySubscription; extern LogicalRepWorker *MyLogicalRepWorker; extern bool in_remote_transaction; -extern volatile sig_atomic_t got_SIGHUP; -extern volatile sig_atomic_t got_SIGTERM; extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, @@ -81,8 +79,6 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); extern int logicalrep_sync_worker_count(Oid subid); -extern void logicalrep_worker_sighup(SIGNAL_ARGS); -extern void logicalrep_worker_sigterm(SIGNAL_ARGS); extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); void process_syncing_tables(XLogRecPtr current_lsn); void invalidate_syncing_table_states(Datum arg, int cacheid,