/*------------------------------------------------------------------------- * * walsender.c * * The WAL sender process (walsender) is new as of Postgres 9.0. It takes * care of sending XLOG from the primary server to a single recipient. * (Note that there can be more than one walsender process concurrently.) * It is started by the postmaster when the walreceiver of a standby server * connects to the primary server and requests XLOG streaming replication. * It attempts to keep reading XLOG records from the disk and sending them * to the standby server, as long as the connection is alive (i.e., like * any backend, there is a one-to-one relationship between a connection * and a walsender process). * * Normal termination is by SIGTERM, which instructs the walsender to * close the connection and exit(0) at next convenient moment. Emergency * termination is by SIGQUIT; like any backend, the walsender will simply * abort and exit on SIGQUIT. A close of the connection and a FATAL error * are treated as not a crash but approximately normal termination; * the walsender will exit quickly without sending any more XLOG records. * * If the server is shut down, postmaster sends us SIGUSR2 after all * regular backends have exited and the shutdown checkpoint has been written. * This instruct walsender to send any outstanding WAL, including the * shutdown checkpoint record, and then exit. * * * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group * * IDENTIFICATION * src/backend/replication/walsender.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include #include #include "funcapi.h" #include "access/xlog_internal.h" #include "catalog/pg_type.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "replication/basebackup.h" #include "replication/walprotocol.h" #include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/pmsignal.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/memutils.h" #include "utils/ps_status.h" #include "utils/resowner.h" /* Array of WalSnds in shared memory */ WalSndCtlData *WalSndCtl = NULL; /* My slot in the shared memory array */ static WalSnd *MyWalSnd = NULL; /* Global state */ bool am_walsender = false; /* Am I a walsender process ? */ /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ int WalSndDelay = 200; /* max sleep time between some actions */ /* * These variables are used similarly to openLogFile/Id/Seg/Off, * but for walsender to read the XLOG. */ static int sendFile = -1; static uint32 sendId = 0; static uint32 sendSeg = 0; static uint32 sendOff = 0; /* * How far have we sent WAL already? This is also advertised in * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.) */ static XLogRecPtr sentPtr = {0, 0}; /* Flags set by signal handlers for later service in main loop */ static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t shutdown_requested = false; static volatile sig_atomic_t ready_to_stop = false; /* Signal handlers */ static void WalSndSigHupHandler(SIGNAL_ARGS); static void WalSndShutdownHandler(SIGNAL_ARGS); static void WalSndQuickDieHandler(SIGNAL_ARGS); static void WalSndXLogSendHandler(SIGNAL_ARGS); static void WalSndLastCycleHandler(SIGNAL_ARGS); /* Prototypes for private functions */ static int WalSndLoop(void); static void InitWalSnd(void); static void WalSndHandshake(void); static void WalSndKill(int code, Datum arg); static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes); static bool XLogSend(char *msgbuf, bool *caughtup); static void CheckClosedConnection(void); /* Main entry point for walsender process */ int WalSenderMain(void) { MemoryContext walsnd_context; if (RecoveryInProgress()) ereport(FATAL, (errcode(ERRCODE_CANNOT_CONNECT_NOW), errmsg("recovery is still in progress, can't accept WAL streaming connections"))); /* Create a per-walsender data structure in shared memory */ InitWalSnd(); /* * Create a memory context that we will do all our work in. We do this so * that we can reset the context during error recovery and thereby avoid * possible memory leaks. Formerly this code just ran in * TopMemoryContext, but resetting that would be a really bad idea. * * XXX: we don't actually attempt error recovery in walsender, we just * close the connection and exit. */ walsnd_context = AllocSetContextCreate(TopMemoryContext, "Wal Sender", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); MemoryContextSwitchTo(walsnd_context); /* Set up resource owner */ CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner"); /* Unblock signals (they were blocked when the postmaster forked us) */ PG_SETMASK(&UnBlockSig); /* Tell the standby that walsender is ready for receiving commands */ ReadyForQuery(DestRemote); /* Handle handshake messages before streaming */ WalSndHandshake(); /* Initialize shared memory status */ { /* use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = MyWalSnd; SpinLockAcquire(&walsnd->mutex); walsnd->sentPtr = sentPtr; SpinLockRelease(&walsnd->mutex); } /* Main loop of walsender */ return WalSndLoop(); } /* * Execute commands from walreceiver, until we enter streaming mode. */ static void WalSndHandshake(void) { StringInfoData input_message; bool replication_started = false; initStringInfo(&input_message); while (!replication_started) { int firstchar; WalSndSetState(WALSNDSTATE_STARTUP); set_ps_display("idle", false); /* Wait for a command to arrive */ firstchar = pq_getbyte(); /* * Emergency bailout if postmaster has died. This is to avoid the * necessity for manual cleanup of all postmaster children. */ if (!PostmasterIsAlive(true)) exit(1); /* * Check for any other interesting events that happened while we * slept. */ if (got_SIGHUP) { got_SIGHUP = false; ProcessConfigFile(PGC_SIGHUP); } if (firstchar != EOF) { /* * Read the message contents. This is expected to be done without * blocking because we've been able to get message type code. */ if (pq_getmessage(&input_message, 0)) firstchar = EOF; /* suitable message already logged */ } /* Handle the very limited subset of commands expected in this phase */ switch (firstchar) { case 'Q': /* Query message */ { const char *query_string; XLogRecPtr recptr; query_string = pq_getmsgstring(&input_message); pq_getmsgend(&input_message); if (strcmp(query_string, "IDENTIFY_SYSTEM") == 0) { StringInfoData buf; char sysid[32]; char tli[11]; /* * Reply with a result set with one row, two columns. * First col is system ID, and second is timeline ID */ snprintf(sysid, sizeof(sysid), UINT64_FORMAT, GetSystemIdentifier()); snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); /* Send a RowDescription message */ pq_beginmessage(&buf, 'T'); pq_sendint(&buf, 2, 2); /* 2 fields */ /* first field */ pq_sendstring(&buf, "systemid"); /* col name */ pq_sendint(&buf, 0, 4); /* table oid */ pq_sendint(&buf, 0, 2); /* attnum */ pq_sendint(&buf, TEXTOID, 4); /* type oid */ pq_sendint(&buf, -1, 2); /* typlen */ pq_sendint(&buf, 0, 4); /* typmod */ pq_sendint(&buf, 0, 2); /* format code */ /* second field */ pq_sendstring(&buf, "timeline"); /* col name */ pq_sendint(&buf, 0, 4); /* table oid */ pq_sendint(&buf, 0, 2); /* attnum */ pq_sendint(&buf, INT4OID, 4); /* type oid */ pq_sendint(&buf, 4, 2); /* typlen */ pq_sendint(&buf, 0, 4); /* typmod */ pq_sendint(&buf, 0, 2); /* format code */ pq_endmessage(&buf); /* Send a DataRow message */ pq_beginmessage(&buf, 'D'); pq_sendint(&buf, 2, 2); /* # of columns */ pq_sendint(&buf, strlen(sysid), 4); /* col1 len */ pq_sendbytes(&buf, (char *) &sysid, strlen(sysid)); pq_sendint(&buf, strlen(tli), 4); /* col2 len */ pq_sendbytes(&buf, (char *) tli, strlen(tli)); pq_endmessage(&buf); /* Send CommandComplete and ReadyForQuery messages */ EndCommand("SELECT", DestRemote); ReadyForQuery(DestRemote); /* ReadyForQuery did pq_flush for us */ } else if (sscanf(query_string, "START_REPLICATION %X/%X", &recptr.xlogid, &recptr.xrecoff) == 2) { StringInfoData buf; /* * Check that we're logging enough information in the * WAL for log-shipping. * * NOTE: This only checks the current value of * wal_level. Even if the current setting is not * 'minimal', there can be old WAL in the pg_xlog * directory that was created with 'minimal'. So this * is not bulletproof, the purpose is just to give a * user-friendly error message that hints how to * configure the system correctly. */ if (wal_level == WAL_LEVEL_MINIMAL) ereport(FATAL, (errcode(ERRCODE_CANNOT_CONNECT_NOW), errmsg("standby connections not allowed because wal_level=minimal"))); /* Send a CopyBothResponse message, and start streaming */ pq_beginmessage(&buf, 'W'); pq_sendbyte(&buf, 0); pq_sendint(&buf, 0, 2); pq_endmessage(&buf); pq_flush(); /* * Initialize position to the received one, then the * xlog records begin to be shipped from that position */ sentPtr = recptr; /* break out of the loop */ replication_started = true; } else if (strncmp(query_string, "BASE_BACKUP ", 12) == 0) { /* Command is BASE_BACKUP ;