]> granicus.if.org Git - postgresql/commitdiff
Use the regular main processing loop also in walsenders.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 5 Oct 2012 14:13:07 +0000 (17:13 +0300)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 5 Oct 2012 14:21:12 +0000 (17:21 +0300)
The regular backend's main loop handles signal handling and error recovery
better than the current WAL sender command loop does. For example, if the
client hangs and a SIGTERM is received before starting streaming, the
walsender will now terminate immediately, rather than hang until the
connection times out.

src/backend/replication/basebackup.c
src/backend/replication/walsender.c
src/backend/tcop/postgres.c
src/include/replication/walsender.h

index 4636e8d1c6fc74b93e5c48a282fc9f444bc91270..04681f4196299ce74739cc4cccf5ca305cffae97 100644 (file)
@@ -22,6 +22,7 @@
 #include "lib/stringinfo.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
+#include "miscadmin.h"
 #include "nodes/pg_list.h"
 #include "replication/basebackup.h"
 #include "replication/walsender.h"
@@ -30,7 +31,6 @@
 #include "storage/ipc.h"
 #include "utils/builtins.h"
 #include "utils/elog.h"
-#include "utils/memutils.h"
 #include "utils/ps_status.h"
 
 typedef struct
@@ -370,19 +370,10 @@ void
 SendBaseBackup(BaseBackupCmd *cmd)
 {
        DIR                *dir;
-       MemoryContext backup_context;
-       MemoryContext old_context;
        basebackup_options opt;
 
        parse_basebackup_options(cmd->options, &opt);
 
-       backup_context = AllocSetContextCreate(CurrentMemoryContext,
-                                                                                  "Streaming base backup context",
-                                                                                  ALLOCSET_DEFAULT_MINSIZE,
-                                                                                  ALLOCSET_DEFAULT_INITSIZE,
-                                                                                  ALLOCSET_DEFAULT_MAXSIZE);
-       old_context = MemoryContextSwitchTo(backup_context);
-
        WalSndSetState(WALSNDSTATE_BACKUP);
 
        if (update_process_title)
@@ -403,9 +394,6 @@ SendBaseBackup(BaseBackupCmd *cmd)
        perform_base_backup(&opt, dir);
 
        FreeDir(dir);
-
-       MemoryContextSwitchTo(old_context);
-       MemoryContextDelete(backup_context);
 }
 
 static void
@@ -606,7 +594,7 @@ sendDir(char *path, int basepathlen, bool sizeonly)
                 * error in that case. The error handler further up will call
                 * do_pg_abort_backup() for us.
                 */
-               if (walsender_shutdown_requested || walsender_ready_to_stop)
+               if (ProcDiePending || walsender_ready_to_stop)
                        ereport(ERROR,
                                (errmsg("shutdown requested, aborting active base backup")));
 
index cc27848318bc223e931b842bcd94003a9fa556d7..0ba2ad4414062ead6c20c50cd6d5c84ba97b6086 100644 (file)
@@ -78,6 +78,8 @@ bool          am_walsender = false;           /* Am I a walsender process ? */
 bool           am_cascading_walsender = false;         /* Am I cascading WAL to
                                                                                                 * another standby ? */
 
+static bool    replication_started = false; /* Started streaming yet? */
+
 /* User-settable parameters for walsender */
 int                    max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
 int                    replication_timeout = 60 * 1000;        /* maximum time to send one
@@ -113,21 +115,16 @@ static TimestampTz last_reply_timestamp;
 
 /* Flags set by signal handlers for later service in main loop */
 static volatile sig_atomic_t got_SIGHUP = false;
-volatile sig_atomic_t walsender_shutdown_requested = false;
 volatile sig_atomic_t walsender_ready_to_stop = false;
 
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
-static void WalSndShutdownHandler(SIGNAL_ARGS);
-static void WalSndQuickDieHandler(SIGNAL_ARGS);
 static void WalSndXLogSendHandler(SIGNAL_ARGS);
 static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
-static bool HandleReplicationCommand(const char *cmd_string);
 static void WalSndLoop(void) __attribute__((noreturn));
-static void InitWalSnd(void);
-static void WalSndHandshake(void);
+static void InitWalSenderSlot(void);
 static void WalSndKill(int code, Datum arg);
 static void XLogSend(char *msgbuf, bool *caughtup);
 static void IdentifySystem(void);
@@ -139,147 +136,48 @@ static void ProcessRepliesIfAny(void);
 static void WalSndKeepalive(char *msgbuf);
 
 
-/* Main entry point for walsender process */
+/* Initialize walsender process before entering the main command loop */
 void
-WalSenderMain(void)
+InitWalSender(void)
 {
-       MemoryContext walsnd_context;
-
        am_cascading_walsender = RecoveryInProgress();
 
        /* Create a per-walsender data structure in shared memory */
-       InitWalSnd();
-
-       /*
-        * Create a memory context that we will do all our work in.  We do this so
-        * that we can reset the context during error recovery and thereby avoid
-        * possible memory leaks.  Formerly this code just ran in
-        * TopMemoryContext, but resetting that would be a really bad idea.
-        *
-        * XXX: we don't actually attempt error recovery in walsender, we just
-        * close the connection and exit.
-        */
-       walsnd_context = AllocSetContextCreate(TopMemoryContext,
-                                                                                  "Wal Sender",
-                                                                                  ALLOCSET_DEFAULT_MINSIZE,
-                                                                                  ALLOCSET_DEFAULT_INITSIZE,
-                                                                                  ALLOCSET_DEFAULT_MAXSIZE);
-       MemoryContextSwitchTo(walsnd_context);
+       InitWalSenderSlot();
 
        /* Set up resource owner */
        CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
 
-       /* Unblock signals (they were blocked when the postmaster forked us) */
-       PG_SETMASK(&UnBlockSig);
-
        /*
         * Use the recovery target timeline ID during recovery
         */
        if (am_cascading_walsender)
                ThisTimeLineID = GetRecoveryTargetTLI();
-
-       /* Tell the standby that walsender is ready for receiving commands */
-       ReadyForQuery(DestRemote);
-
-       /* Handle handshake messages before streaming */
-       WalSndHandshake();
-
-       /* Initialize shared memory status */
-       {
-               /* use volatile pointer to prevent code rearrangement */
-               volatile WalSnd *walsnd = MyWalSnd;
-
-               SpinLockAcquire(&walsnd->mutex);
-               walsnd->sentPtr = sentPtr;
-               SpinLockRelease(&walsnd->mutex);
-       }
-
-       SyncRepInitConfig();
-
-       /* Main loop of walsender */
-       WalSndLoop();
 }
 
 /*
- * Execute commands from walreceiver, until we enter streaming mode.
+ * Clean up after an error.
+ *
+ * WAL sender processes don't use transactions like regular backends do.
+ * This function does any cleanup requited after an error in a WAL sender
+ * process, similar to what transaction abort does in a regular backend.
  */
-static void
-WalSndHandshake(void)
+void
+WalSndErrorCleanup()
 {
-       StringInfoData input_message;
-       bool            replication_started = false;
-
-       initStringInfo(&input_message);
-
-       while (!replication_started)
+       if (sendFile >= 0)
        {
-               int                     firstchar;
-
-               WalSndSetState(WALSNDSTATE_STARTUP);
-               set_ps_display("idle", false);
-
-               /* Wait for a command to arrive */
-               firstchar = pq_getbyte();
-
-               /*
-                * Emergency bailout if postmaster has died.  This is to avoid the
-                * necessity for manual cleanup of all postmaster children.
-                */
-               if (!PostmasterIsAlive())
-                       exit(1);
-
-               /*
-                * Check for any other interesting events that happened while we
-                * slept.
-                */
-               if (got_SIGHUP)
-               {
-                       got_SIGHUP = false;
-                       ProcessConfigFile(PGC_SIGHUP);
-               }
-
-               if (firstchar != EOF)
-               {
-                       /*
-                        * Read the message contents. This is expected to be done without
-                        * blocking because we've been able to get message type code.
-                        */
-                       if (pq_getmessage(&input_message, 0))
-                               firstchar = EOF;        /* suitable message already logged */
-               }
-
-               /* Handle the very limited subset of commands expected in this phase */
-               switch (firstchar)
-               {
-                       case 'Q':                       /* Query message */
-                               {
-                                       const char *query_string;
-
-                                       query_string = pq_getmsgstring(&input_message);
-                                       pq_getmsgend(&input_message);
-
-                                       if (HandleReplicationCommand(query_string))
-                                               replication_started = true;
-                               }
-                               break;
-
-                       case 'X':
-                               /* standby is closing the connection */
-                               proc_exit(0);
-
-                       case EOF:
-                               /* standby disconnected unexpectedly */
-                               ereport(COMMERROR,
-                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                                errmsg("unexpected EOF on standby connection")));
-                               proc_exit(0);
-
-                       default:
-                               ereport(FATAL,
-                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                                errmsg("invalid standby handshake message type %d", firstchar)));
-               }
+               close(sendFile);
+               sendFile = -1;
        }
+
+       /*
+        * Don't return back to the command loop after we've started replicating.
+        * We've already marked us as an actively streaming WAL sender in the
+        * PMSignal slot, and there's currently no way to undo that.
+        */
+       if (replication_started)
+               proc_exit(0);
 }
 
 /*
@@ -350,15 +248,13 @@ IdentifySystem(void)
        pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
 
        pq_endmessage(&buf);
-
-       /* Send CommandComplete and ReadyForQuery messages */
-       EndCommand("SELECT", DestRemote);
-       ReadyForQuery(DestRemote);
-       /* ReadyForQuery did pq_flush for us */
 }
 
 /*
- * START_REPLICATION
+ * Handle START_REPLICATION command.
+ *
+ * At the moment, this never returns, but an ereport(ERROR) will take us back
+ * to the main loop.
  */
 static void
 StartReplication(StartReplicationCmd *cmd)
@@ -374,6 +270,7 @@ StartReplication(StartReplicationCmd *cmd)
         */
        MarkPostmasterChildWalSender();
        SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
+       replication_started = true;
 
        /*
         * When promoting a cascading standby, postmaster sends SIGUSR2 to any
@@ -435,15 +332,29 @@ StartReplication(StartReplicationCmd *cmd)
         * be shipped from that position
         */
        sentPtr = cmd->startpoint;
+
+       /* Also update the start position status in shared memory */
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = MyWalSnd;
+
+               SpinLockAcquire(&walsnd->mutex);
+               walsnd->sentPtr = sentPtr;
+               SpinLockRelease(&walsnd->mutex);
+       }
+
+       SyncRepInitConfig();
+
+       /* Main loop of walsender */
+       WalSndLoop();
 }
 
 /*
  * Execute an incoming replication command.
  */
-static bool
-HandleReplicationCommand(const char *cmd_string)
+void
+exec_replication_command(const char *cmd_string)
 {
-       bool            replication_started = false;
        int                     parse_rc;
        Node       *cmd_node;
        MemoryContext cmd_context;
@@ -451,6 +362,8 @@ HandleReplicationCommand(const char *cmd_string)
 
        elog(DEBUG1, "received replication command: %s", cmd_string);
 
+       CHECK_FOR_INTERRUPTS();
+
        cmd_context = AllocSetContextCreate(CurrentMemoryContext,
                                                                                "Replication command context",
                                                                                ALLOCSET_DEFAULT_MINSIZE,
@@ -476,18 +389,10 @@ HandleReplicationCommand(const char *cmd_string)
 
                case T_StartReplicationCmd:
                        StartReplication((StartReplicationCmd *) cmd_node);
-
-                       /* break out of the loop */
-                       replication_started = true;
                        break;
 
                case T_BaseBackupCmd:
                        SendBaseBackup((BaseBackupCmd *) cmd_node);
-
-                       /* Send CommandComplete and ReadyForQuery messages */
-                       EndCommand("SELECT", DestRemote);
-                       ReadyForQuery(DestRemote);
-                       /* ReadyForQuery did pq_flush for us */
                        break;
 
                default:
@@ -500,7 +405,8 @@ HandleReplicationCommand(const char *cmd_string)
        MemoryContextSwitchTo(old_context);
        MemoryContextDelete(cmd_context);
 
-       return replication_started;
+       /* Send CommandComplete message */
+       EndCommand("SELECT", DestRemote);
 }
 
 /*
@@ -710,7 +616,7 @@ ProcessStandbyHSFeedbackMessage(void)
        MyPgXact->xmin = reply.xmin;
 }
 
-/* Main loop of walsender process */
+/* Main loop of walsender process that streams the WAL over Copy messages. */
 static void
 WalSndLoop(void)
 {
@@ -754,15 +660,7 @@ WalSndLoop(void)
                        SyncRepInitConfig();
                }
 
-               /* Normal exit from the walsender is here */
-               if (walsender_shutdown_requested)
-               {
-                       /* Inform the standby that XLOG streaming is done */
-                       pq_puttextmessage('C', "COPY 0");
-                       pq_flush();
-
-                       proc_exit(0);
-               }
+               CHECK_FOR_INTERRUPTS();
 
                /* Check for input from the client */
                ProcessRepliesIfAny();
@@ -813,7 +711,7 @@ WalSndLoop(void)
                                XLogSend(output_message, &caughtup);
                                if (caughtup && !pq_is_send_pending())
                                {
-                                       walsender_shutdown_requested = true;
+                                       ProcDiePending = true;
                                        continue;       /* don't want to wait more */
                                }
                        }
@@ -854,8 +752,11 @@ WalSndLoop(void)
                        }
 
                        /* Sleep until something happens or replication timeout */
+                       ImmediateInterruptOK = true;
+                       CHECK_FOR_INTERRUPTS();
                        WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
                                                          MyProcPort->sock, sleeptime);
+                       ImmediateInterruptOK = false;
 
                        /*
                         * Check for replication timeout.  Note we ignore the corner case
@@ -892,7 +793,7 @@ WalSndLoop(void)
 
 /* Initialize a per-walsender data structure for this walsender process */
 static void
-InitWalSnd(void)
+InitWalSenderSlot(void)
 {
        int                     i;
 
@@ -1284,58 +1185,6 @@ WalSndSigHupHandler(SIGNAL_ARGS)
        errno = save_errno;
 }
 
-/* SIGTERM: set flag to shut down */
-static void
-WalSndShutdownHandler(SIGNAL_ARGS)
-{
-       int                     save_errno = errno;
-
-       walsender_shutdown_requested = true;
-       if (MyWalSnd)
-               SetLatch(&MyWalSnd->latch);
-
-       /*
-        * Set the standard (non-walsender) state as well, so that we can abort
-        * things like do_pg_stop_backup().
-        */
-       InterruptPending = true;
-       ProcDiePending = true;
-
-       errno = save_errno;
-}
-
-/*
- * WalSndQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
- *
- * Some backend has bought the farm,
- * so we need to stop what we're doing and exit.
- */
-static void
-WalSndQuickDieHandler(SIGNAL_ARGS)
-{
-       PG_SETMASK(&BlockSig);
-
-       /*
-        * We DO NOT want to run proc_exit() callbacks -- we're here because
-        * shared memory may be corrupted, so we don't want to try to clean up our
-        * transaction.  Just nail the windows shut and get out of town.  Now that
-        * there's an atexit callback to prevent third-party code from breaking
-        * things by calling exit() directly, we have to reset the callbacks
-        * explicitly to make this work as intended.
-        */
-       on_exit_reset();
-
-       /*
-        * Note we do exit(2) not exit(0).      This is to force the postmaster into a
-        * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
-        * backend.  This is necessary precisely because we don't clean up our
-        * shared memory state.  (The "dead man switch" mechanism in pmsignal.c
-        * should ensure the postmaster sees this as a crash, too, but no harm in
-        * being doubly sure.)
-        */
-       exit(2);
-}
-
 /* SIGUSR1: set flag to send WAL records */
 static void
 WalSndXLogSendHandler(SIGNAL_ARGS)
@@ -1368,8 +1217,8 @@ WalSndSignals(void)
        pqsignal(SIGHUP, WalSndSigHupHandler);          /* set flag to read config
                                                                                                 * file */
        pqsignal(SIGINT, SIG_IGN);      /* not used */
-       pqsignal(SIGTERM, WalSndShutdownHandler);       /* request shutdown */
-       pqsignal(SIGQUIT, WalSndQuickDieHandler);       /* hard crash time */
+       pqsignal(SIGTERM, die);                                         /* request shutdown */
+       pqsignal(SIGQUIT, quickdie);                            /* hard crash time */
        InitializeTimeouts();           /* establishes SIGALRM handler */
        pqsignal(SIGPIPE, SIG_IGN);
        pqsignal(SIGUSR1, WalSndXLogSendHandler);       /* request WAL sending */
index f1248a851bf90188da8d3a7e8b61ac99bf78ebbd..585db1af89cd37ba7e8f5327bbd992c45beaf288 100644 (file)
@@ -192,6 +192,7 @@ static int  InteractiveBackend(StringInfo inBuf);
 static int     interactive_getc(void);
 static int     SocketBackend(StringInfo inBuf);
 static int     ReadCommand(StringInfo inBuf);
+static void forbidden_in_wal_sender(char firstchar);
 static List *pg_rewrite_query(Query *query);
 static bool check_log_statement(List *stmt_list);
 static int     errdetail_execute(List *raw_parsetree_list);
@@ -3720,12 +3721,9 @@ PostgresMain(int argc, char *argv[], const char *username)
        if (IsUnderPostmaster && Log_disconnections)
                on_proc_exit(log_disconnections, 0);
 
-       /* If this is a WAL sender process, we're done with initialization. */
+       /* Perform initialization specific to a WAL sender process. */
        if (am_walsender)
-       {
-               WalSenderMain();                /* does not return */
-               abort();
-       }
+               InitWalSender();
 
        /*
         * process any libraries that should be preloaded at backend start (this
@@ -3835,6 +3833,9 @@ PostgresMain(int argc, char *argv[], const char *username)
                 */
                AbortCurrentTransaction();
 
+               if (am_walsender)
+                       WalSndErrorCleanup();
+
                /*
                 * Now return to normal top-level context and clear ErrorContext for
                 * next time.
@@ -3969,7 +3970,10 @@ PostgresMain(int argc, char *argv[], const char *username)
                                        query_string = pq_getmsgstring(&input_message);
                                        pq_getmsgend(&input_message);
 
-                                       exec_simple_query(query_string);
+                                       if (am_walsender)
+                                               exec_replication_command(query_string);
+                                       else
+                                               exec_simple_query(query_string);
 
                                        send_ready_for_query = true;
                                }
@@ -3982,6 +3986,8 @@ PostgresMain(int argc, char *argv[], const char *username)
                                        int                     numParams;
                                        Oid                *paramTypes = NULL;
 
+                                       forbidden_in_wal_sender(firstchar);
+
                                        /* Set statement_timestamp() */
                                        SetCurrentStatementStartTimestamp();
 
@@ -4004,6 +4010,8 @@ PostgresMain(int argc, char *argv[], const char *username)
                                break;
 
                        case 'B':                       /* bind */
+                               forbidden_in_wal_sender(firstchar);
+
                                /* Set statement_timestamp() */
                                SetCurrentStatementStartTimestamp();
 
@@ -4019,6 +4027,8 @@ PostgresMain(int argc, char *argv[], const char *username)
                                        const char *portal_name;
                                        int                     max_rows;
 
+                                       forbidden_in_wal_sender(firstchar);
+
                                        /* Set statement_timestamp() */
                                        SetCurrentStatementStartTimestamp();
 
@@ -4031,6 +4041,8 @@ PostgresMain(int argc, char *argv[], const char *username)
                                break;
 
                        case 'F':                       /* fastpath function call */
+                               forbidden_in_wal_sender(firstchar);
+
                                /* Set statement_timestamp() */
                                SetCurrentStatementStartTimestamp();
 
@@ -4078,6 +4090,8 @@ PostgresMain(int argc, char *argv[], const char *username)
                                        int                     close_type;
                                        const char *close_target;
 
+                                       forbidden_in_wal_sender(firstchar);
+
                                        close_type = pq_getmsgbyte(&input_message);
                                        close_target = pq_getmsgstring(&input_message);
                                        pq_getmsgend(&input_message);
@@ -4120,6 +4134,8 @@ PostgresMain(int argc, char *argv[], const char *username)
                                        int                     describe_type;
                                        const char *describe_target;
 
+                                       forbidden_in_wal_sender(firstchar);
+
                                        /* Set statement_timestamp() (needed for xact) */
                                        SetCurrentStatementStartTimestamp();
 
@@ -4201,6 +4217,29 @@ PostgresMain(int argc, char *argv[], const char *username)
        }                                                       /* end of input-reading loop */
 }
 
+/*
+ * Throw an error if we're a WAL sender process.
+ *
+ * This is used to forbid anything else than simple query protocol messages
+ * in a WAL sender process.  'firstchar' specifies what kind of a forbidden
+ * message was received, and is used to construct the error message.
+ */
+static void
+forbidden_in_wal_sender(char firstchar)
+{
+       if (am_walsender)
+       {
+               if (firstchar == 'F')
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                        errmsg("fastpath function calls not supported in a replication connection")));
+               else
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                        errmsg("extended query protocol not supported in a replication connection")));
+       }
+}
+
 
 /*
  * Obtain platform stack depth limit (in bytes)
index bb85ccf7b22cea2a927a82be0ba6219c6fb5efe8..78e8558299ce844d8a7bd31ebeafc5a8f507f5a2 100644 (file)
@@ -19,7 +19,6 @@
 /* global state */
 extern bool am_walsender;
 extern bool am_cascading_walsender;
-extern volatile sig_atomic_t walsender_shutdown_requested;
 extern volatile sig_atomic_t walsender_ready_to_stop;
 extern bool wake_wal_senders;
 
@@ -27,7 +26,9 @@ extern bool wake_wal_senders;
 extern int     max_wal_senders;
 extern int     replication_timeout;
 
-extern void WalSenderMain(void) __attribute__((noreturn));
+extern void InitWalSender(void);
+extern void exec_replication_command(const char *query_string);
+extern void WalSndErrorCleanup(void);
 extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
 extern void WalSndShmemInit(void);