bool am_cascading_walsender = false; /* Am I cascading WAL to
* another standby ? */
+static bool replication_started = false; /* Started streaming yet? */
+
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int replication_timeout = 60 * 1000; /* maximum time to send one
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
-volatile sig_atomic_t walsender_shutdown_requested = false;
volatile sig_atomic_t walsender_ready_to_stop = false;
/* Signal handlers */
static void WalSndSigHupHandler(SIGNAL_ARGS);
-static void WalSndShutdownHandler(SIGNAL_ARGS);
-static void WalSndQuickDieHandler(SIGNAL_ARGS);
static void WalSndXLogSendHandler(SIGNAL_ARGS);
static void WalSndLastCycleHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
-static bool HandleReplicationCommand(const char *cmd_string);
static void WalSndLoop(void) __attribute__((noreturn));
-static void InitWalSnd(void);
-static void WalSndHandshake(void);
+static void InitWalSenderSlot(void);
static void WalSndKill(int code, Datum arg);
static void XLogSend(char *msgbuf, bool *caughtup);
static void IdentifySystem(void);
static void WalSndKeepalive(char *msgbuf);
-/* Main entry point for walsender process */
+/* Initialize walsender process before entering the main command loop */
void
-WalSenderMain(void)
+InitWalSender(void)
{
- MemoryContext walsnd_context;
-
am_cascading_walsender = RecoveryInProgress();
/* Create a per-walsender data structure in shared memory */
- InitWalSnd();
-
- /*
- * Create a memory context that we will do all our work in. We do this so
- * that we can reset the context during error recovery and thereby avoid
- * possible memory leaks. Formerly this code just ran in
- * TopMemoryContext, but resetting that would be a really bad idea.
- *
- * XXX: we don't actually attempt error recovery in walsender, we just
- * close the connection and exit.
- */
- walsnd_context = AllocSetContextCreate(TopMemoryContext,
- "Wal Sender",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
- MemoryContextSwitchTo(walsnd_context);
+ InitWalSenderSlot();
/* Set up resource owner */
CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
- /* Unblock signals (they were blocked when the postmaster forked us) */
- PG_SETMASK(&UnBlockSig);
-
/*
* Use the recovery target timeline ID during recovery
*/
if (am_cascading_walsender)
ThisTimeLineID = GetRecoveryTargetTLI();
-
- /* Tell the standby that walsender is ready for receiving commands */
- ReadyForQuery(DestRemote);
-
- /* Handle handshake messages before streaming */
- WalSndHandshake();
-
- /* Initialize shared memory status */
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile WalSnd *walsnd = MyWalSnd;
-
- SpinLockAcquire(&walsnd->mutex);
- walsnd->sentPtr = sentPtr;
- SpinLockRelease(&walsnd->mutex);
- }
-
- SyncRepInitConfig();
-
- /* Main loop of walsender */
- WalSndLoop();
}
/*
- * Execute commands from walreceiver, until we enter streaming mode.
+ * Clean up after an error.
+ *
+ * WAL sender processes don't use transactions like regular backends do.
+ * This function does any cleanup requited after an error in a WAL sender
+ * process, similar to what transaction abort does in a regular backend.
*/
-static void
-WalSndHandshake(void)
+void
+WalSndErrorCleanup()
{
- StringInfoData input_message;
- bool replication_started = false;
-
- initStringInfo(&input_message);
-
- while (!replication_started)
+ if (sendFile >= 0)
{
- int firstchar;
-
- WalSndSetState(WALSNDSTATE_STARTUP);
- set_ps_display("idle", false);
-
- /* Wait for a command to arrive */
- firstchar = pq_getbyte();
-
- /*
- * Emergency bailout if postmaster has died. This is to avoid the
- * necessity for manual cleanup of all postmaster children.
- */
- if (!PostmasterIsAlive())
- exit(1);
-
- /*
- * Check for any other interesting events that happened while we
- * slept.
- */
- if (got_SIGHUP)
- {
- got_SIGHUP = false;
- ProcessConfigFile(PGC_SIGHUP);
- }
-
- if (firstchar != EOF)
- {
- /*
- * Read the message contents. This is expected to be done without
- * blocking because we've been able to get message type code.
- */
- if (pq_getmessage(&input_message, 0))
- firstchar = EOF; /* suitable message already logged */
- }
-
- /* Handle the very limited subset of commands expected in this phase */
- switch (firstchar)
- {
- case 'Q': /* Query message */
- {
- const char *query_string;
-
- query_string = pq_getmsgstring(&input_message);
- pq_getmsgend(&input_message);
-
- if (HandleReplicationCommand(query_string))
- replication_started = true;
- }
- break;
-
- case 'X':
- /* standby is closing the connection */
- proc_exit(0);
-
- case EOF:
- /* standby disconnected unexpectedly */
- ereport(COMMERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("unexpected EOF on standby connection")));
- proc_exit(0);
-
- default:
- ereport(FATAL,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("invalid standby handshake message type %d", firstchar)));
- }
+ close(sendFile);
+ sendFile = -1;
}
+
+ /*
+ * Don't return back to the command loop after we've started replicating.
+ * We've already marked us as an actively streaming WAL sender in the
+ * PMSignal slot, and there's currently no way to undo that.
+ */
+ if (replication_started)
+ proc_exit(0);
}
/*
pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
pq_endmessage(&buf);
-
- /* Send CommandComplete and ReadyForQuery messages */
- EndCommand("SELECT", DestRemote);
- ReadyForQuery(DestRemote);
- /* ReadyForQuery did pq_flush for us */
}
/*
- * START_REPLICATION
+ * Handle START_REPLICATION command.
+ *
+ * At the moment, this never returns, but an ereport(ERROR) will take us back
+ * to the main loop.
*/
static void
StartReplication(StartReplicationCmd *cmd)
*/
MarkPostmasterChildWalSender();
SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
+ replication_started = true;
/*
* When promoting a cascading standby, postmaster sends SIGUSR2 to any
* be shipped from that position
*/
sentPtr = cmd->startpoint;
+
+ /* Also update the start position status in shared memory */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->sentPtr = sentPtr;
+ SpinLockRelease(&walsnd->mutex);
+ }
+
+ SyncRepInitConfig();
+
+ /* Main loop of walsender */
+ WalSndLoop();
}
/*
* Execute an incoming replication command.
*/
-static bool
-HandleReplicationCommand(const char *cmd_string)
+void
+exec_replication_command(const char *cmd_string)
{
- bool replication_started = false;
int parse_rc;
Node *cmd_node;
MemoryContext cmd_context;
elog(DEBUG1, "received replication command: %s", cmd_string);
+ CHECK_FOR_INTERRUPTS();
+
cmd_context = AllocSetContextCreate(CurrentMemoryContext,
"Replication command context",
ALLOCSET_DEFAULT_MINSIZE,
case T_StartReplicationCmd:
StartReplication((StartReplicationCmd *) cmd_node);
-
- /* break out of the loop */
- replication_started = true;
break;
case T_BaseBackupCmd:
SendBaseBackup((BaseBackupCmd *) cmd_node);
-
- /* Send CommandComplete and ReadyForQuery messages */
- EndCommand("SELECT", DestRemote);
- ReadyForQuery(DestRemote);
- /* ReadyForQuery did pq_flush for us */
break;
default:
MemoryContextSwitchTo(old_context);
MemoryContextDelete(cmd_context);
- return replication_started;
+ /* Send CommandComplete message */
+ EndCommand("SELECT", DestRemote);
}
/*
MyPgXact->xmin = reply.xmin;
}
-/* Main loop of walsender process */
+/* Main loop of walsender process that streams the WAL over Copy messages. */
static void
WalSndLoop(void)
{
SyncRepInitConfig();
}
- /* Normal exit from the walsender is here */
- if (walsender_shutdown_requested)
- {
- /* Inform the standby that XLOG streaming is done */
- pq_puttextmessage('C', "COPY 0");
- pq_flush();
-
- proc_exit(0);
- }
+ CHECK_FOR_INTERRUPTS();
/* Check for input from the client */
ProcessRepliesIfAny();
XLogSend(output_message, &caughtup);
if (caughtup && !pq_is_send_pending())
{
- walsender_shutdown_requested = true;
+ ProcDiePending = true;
continue; /* don't want to wait more */
}
}
}
/* Sleep until something happens or replication timeout */
+ ImmediateInterruptOK = true;
+ CHECK_FOR_INTERRUPTS();
WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
MyProcPort->sock, sleeptime);
+ ImmediateInterruptOK = false;
/*
* Check for replication timeout. Note we ignore the corner case
/* Initialize a per-walsender data structure for this walsender process */
static void
-InitWalSnd(void)
+InitWalSenderSlot(void)
{
int i;
errno = save_errno;
}
-/* SIGTERM: set flag to shut down */
-static void
-WalSndShutdownHandler(SIGNAL_ARGS)
-{
- int save_errno = errno;
-
- walsender_shutdown_requested = true;
- if (MyWalSnd)
- SetLatch(&MyWalSnd->latch);
-
- /*
- * Set the standard (non-walsender) state as well, so that we can abort
- * things like do_pg_stop_backup().
- */
- InterruptPending = true;
- ProcDiePending = true;
-
- errno = save_errno;
-}
-
-/*
- * WalSndQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
- *
- * Some backend has bought the farm,
- * so we need to stop what we're doing and exit.
- */
-static void
-WalSndQuickDieHandler(SIGNAL_ARGS)
-{
- PG_SETMASK(&BlockSig);
-
- /*
- * We DO NOT want to run proc_exit() callbacks -- we're here because
- * shared memory may be corrupted, so we don't want to try to clean up our
- * transaction. Just nail the windows shut and get out of town. Now that
- * there's an atexit callback to prevent third-party code from breaking
- * things by calling exit() directly, we have to reset the callbacks
- * explicitly to make this work as intended.
- */
- on_exit_reset();
-
- /*
- * Note we do exit(2) not exit(0). This is to force the postmaster into a
- * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
- * backend. This is necessary precisely because we don't clean up our
- * shared memory state. (The "dead man switch" mechanism in pmsignal.c
- * should ensure the postmaster sees this as a crash, too, but no harm in
- * being doubly sure.)
- */
- exit(2);
-}
-
/* SIGUSR1: set flag to send WAL records */
static void
WalSndXLogSendHandler(SIGNAL_ARGS)
pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config
* file */
pqsignal(SIGINT, SIG_IGN); /* not used */
- pqsignal(SIGTERM, WalSndShutdownHandler); /* request shutdown */
- pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */
+ pqsignal(SIGTERM, die); /* request shutdown */
+ pqsignal(SIGQUIT, quickdie); /* hard crash time */
InitializeTimeouts(); /* establishes SIGALRM handler */
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
static int interactive_getc(void);
static int SocketBackend(StringInfo inBuf);
static int ReadCommand(StringInfo inBuf);
+static void forbidden_in_wal_sender(char firstchar);
static List *pg_rewrite_query(Query *query);
static bool check_log_statement(List *stmt_list);
static int errdetail_execute(List *raw_parsetree_list);
if (IsUnderPostmaster && Log_disconnections)
on_proc_exit(log_disconnections, 0);
- /* If this is a WAL sender process, we're done with initialization. */
+ /* Perform initialization specific to a WAL sender process. */
if (am_walsender)
- {
- WalSenderMain(); /* does not return */
- abort();
- }
+ InitWalSender();
/*
* process any libraries that should be preloaded at backend start (this
*/
AbortCurrentTransaction();
+ if (am_walsender)
+ WalSndErrorCleanup();
+
/*
* Now return to normal top-level context and clear ErrorContext for
* next time.
query_string = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message);
- exec_simple_query(query_string);
+ if (am_walsender)
+ exec_replication_command(query_string);
+ else
+ exec_simple_query(query_string);
send_ready_for_query = true;
}
int numParams;
Oid *paramTypes = NULL;
+ forbidden_in_wal_sender(firstchar);
+
/* Set statement_timestamp() */
SetCurrentStatementStartTimestamp();
break;
case 'B': /* bind */
+ forbidden_in_wal_sender(firstchar);
+
/* Set statement_timestamp() */
SetCurrentStatementStartTimestamp();
const char *portal_name;
int max_rows;
+ forbidden_in_wal_sender(firstchar);
+
/* Set statement_timestamp() */
SetCurrentStatementStartTimestamp();
break;
case 'F': /* fastpath function call */
+ forbidden_in_wal_sender(firstchar);
+
/* Set statement_timestamp() */
SetCurrentStatementStartTimestamp();
int close_type;
const char *close_target;
+ forbidden_in_wal_sender(firstchar);
+
close_type = pq_getmsgbyte(&input_message);
close_target = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message);
int describe_type;
const char *describe_target;
+ forbidden_in_wal_sender(firstchar);
+
/* Set statement_timestamp() (needed for xact) */
SetCurrentStatementStartTimestamp();
} /* end of input-reading loop */
}
+/*
+ * Throw an error if we're a WAL sender process.
+ *
+ * This is used to forbid anything else than simple query protocol messages
+ * in a WAL sender process. 'firstchar' specifies what kind of a forbidden
+ * message was received, and is used to construct the error message.
+ */
+static void
+forbidden_in_wal_sender(char firstchar)
+{
+ if (am_walsender)
+ {
+ if (firstchar == 'F')
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("fastpath function calls not supported in a replication connection")));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("extended query protocol not supported in a replication connection")));
+ }
+}
+
/*
* Obtain platform stack depth limit (in bytes)