From: Tom Lane Date: Thu, 3 Jun 2010 22:17:32 +0000 (+0000) Subject: Add current WAL end (as seen by walsender, ie, GetWriteRecPtr() result) X-Git-Tag: REL9_0_BETA2~2 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=0cc59cc1f38d91587d52b14789e20bdd4c1af70a;p=postgresql Add current WAL end (as seen by walsender, ie, GetWriteRecPtr() result) and current server clock time to SR data messages. These are not currently used on the slave side but seem likely to be useful in future, and it'd be better not to change the SR protocol after release. Per discussion. Also do some minor code review and cleanup on walsender.c, and improve the protocol documentation. --- diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 7076267161..b88833c8ee 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1,4 +1,4 @@ - + Frontend/Backend Protocol @@ -1284,6 +1284,173 @@ + +Streaming Replication Protocol + + +To initiate streaming replication, the frontend sends the +replication parameter in the startup message. This tells the +backend to go into walsender mode, wherein a small set of replication commands +can be issued instead of SQL statements. Only the simple query protocol can be +used in walsender mode. + +The commands accepted in walsender mode are: + + + + IDENTIFY_SYSTEM + + + Requests the server to identify itself. Server replies with a result + set of a single row, containing two fields: + + + + + + + systemid + + + + The unique system identifier identifying the cluster. This + can be used to check that the base backup used to initialize the + slave came from the same cluster. + + + + + + + timeline + + + + Current TimelineID. Also useful to check that the slave is + consistent with the master. + + + + + + + + + + START_REPLICATION XXX/XXX + + + Instructs server to start streaming WAL, starting at + WAL position XXX/XXX. + The server can reply with an error, e.g. if the requested section of WAL + has already been recycled. On success, server responds with a + CopyOutResponse message, and then starts to stream WAL to the frontend. + WAL will continue to be streamed until the connection is broken; + no further commands will be accepted. + + + + WAL data is sent as a series of CopyData messages. (This allows + other information to be intermixed; in particular the server can send + an ErrorResponse message if it encounters a failure after beginning + to stream.) The payload in each CopyData message follows this format: + + + + + + + XLogData (B) + + + + + + + Byte1('w') + + + + Identifies the message as WAL data. + + + + + + Byte8 + + + + The starting point of the WAL data in this message, given in + XLogRecPtr format. + + + + + + Byte8 + + + + The current end of WAL on the server, given in + XLogRecPtr format. + + + + + + Byte8 + + + + The server's system clock at the time of transmission, + given in TimestampTz format. + + + + + + Byten + + + + A section of the WAL data stream. + + + + + + + + + + + A single WAL record is never split across two CopyData messages. + When a WAL record crosses a WAL page boundary, and is therefore + already split using continuation records, it can be split at the page + boundary. In other words, the first main WAL record and its + continuation records can be sent in different CopyData messages. + + + Note that all fields within the WAL data and the above-described header + will be in the sending server's native format. Endianness, and the + format for the timestamp, are unpredictable unless the receiver has + verified that the sender's system identifier matches its own + pg_control contents. + + + If the WAL sender process is terminated normally (during postmaster + shutdown), it will send a CommandComplete message before exiting. + This might not happen during an abnormal shutdown, of course. + + + + + + + + + Message Data Types @@ -4137,120 +4304,6 @@ not line breaks. - -Streaming Replication Protocol - - -To initiate streaming replication, the frontend sends the "replication" -parameter in the startup message. This tells the backend to go into -walsender mode, where a small set of replication commands can be issued -instead of SQL statements. Only the simple query protocol can be used in -walsender mode. - -The commands accepted in walsender mode are: - - - - IDENTIFY_SYSTEM - - - Requests the server to identify itself. Server replies with a result - set of a single row, and two fields: - - systemid: The unique system identifier identifying the cluster. This - can be used to check that the base backup used to initialize the - slave came from the same cluster. - - timeline: Current TimelineID. Also used to check that the slave is - consistent with the master. - - - - - - START_REPLICATION XXX/XXX - - - Instructs backend to start streaming WAL, starting at point XXX/XXX. - Server can reply with an error e.g if the requested piece of WAL has - already been recycled. On success, server responds with a - CopyOutResponse message, and backend starts to stream WAL as CopyData - messages. - The payload in CopyData message consists of the following format. - - - - - - - XLogData (B) - - - - - - - Byte1('w') - - - - Identifies the message as WAL data. - - - - - - Int32 - - - - The log file number of the LSN, indicating the starting point of - the WAL in the message. - - - - - - Int32 - - - - The byte offset of the LSN, indicating the starting point of - the WAL in the message. - - - - - - Byten - - - - Data that forms part of WAL data stream. - - - - - - - - - - - A single WAL record is never split across two CopyData messages. When - a WAL record crosses a WAL page boundary, however, and is therefore - already split using continuation records, it can be split at the page - boundary. In other words, the first main WAL record and its - continuation records can be split across different CopyData messages. - - - - - - - - - Summary of Changes since Protocol 2.0 diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index ecb2c3a6d3..b31cfb4147 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -29,7 +29,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.10 2010/04/20 22:55:03 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.11 2010/06/03 22:17:32 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -41,6 +41,7 @@ #include "access/xlog_internal.h" #include "libpq/pqsignal.h" #include "miscadmin.h" +#include "replication/walprotocol.h" #include "replication/walreceiver.h" #include "storage/ipc.h" #include "storage/pmsignal.h" @@ -393,18 +394,18 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) { case 'w': /* WAL records */ { - XLogRecPtr recptr; + WalDataMessageHeader msghdr; - if (len < sizeof(XLogRecPtr)) + if (len < sizeof(WalDataMessageHeader)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid WAL message received from primary"))); + /* memcpy is required here for alignment reasons */ + memcpy(&msghdr, buf, sizeof(WalDataMessageHeader)); + buf += sizeof(WalDataMessageHeader); + len -= sizeof(WalDataMessageHeader); - memcpy(&recptr, buf, sizeof(XLogRecPtr)); - buf += sizeof(XLogRecPtr); - len -= sizeof(XLogRecPtr); - - XLogWalRcvWrite(buf, len, recptr); + XLogWalRcvWrite(buf, len, msghdr.dataStart); break; } default: diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d2e37fd008..e337e7e5a6 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -5,11 +5,10 @@ * The WAL sender process (walsender) is new as of Postgres 9.0. It takes * charge of XLOG streaming sender in the primary server. At first, it is * started by the postmaster when the walreceiver in the standby server - * connects to the primary server and requests XLOG streaming replication, - * i.e., unlike any auxiliary process, it is not an always-running process. + * connects to the primary server and requests XLOG streaming replication. * It attempts to keep reading XLOG records from the disk and sending them * to the standby server, as long as the connection is alive (i.e., like - * any backend, there is an one to one relationship between a connection + * any backend, there is a one-to-one relationship between a connection * and a walsender process). * * Normal termination is by SIGTERM, which instructs the walsender to @@ -30,7 +29,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.24 2010/06/03 21:02:12 petere Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.25 2010/06/03 22:17:32 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -44,6 +43,7 @@ #include "libpq/pqformat.h" #include "libpq/pqsignal.h" #include "miscadmin.h" +#include "replication/walprotocol.h" #include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -80,7 +80,7 @@ static uint32 sendOff = 0; /* * How far have we sent WAL already? This is also advertised in - * MyWalSnd->sentPtr. + * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.) */ static XLogRecPtr sentPtr = {0, 0}; @@ -100,19 +100,9 @@ static void InitWalSnd(void); static void WalSndHandshake(void); static void WalSndKill(int code, Datum arg); static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes); -static bool XLogSend(StringInfo outMsg, bool *caughtup); +static bool XLogSend(char *msgbuf, bool *caughtup); static void CheckClosedConnection(void); -/* - * How much WAL to send in one message? Must be >= XLOG_BLCKSZ. - * - * We don't have a good idea of what a good value would be; there's some - * overhead per message in both walsender and walreceiver, but on the other - * hand sending large batches makes walsender less responsive to signals - * because signals are checked only between messages. 128kB (with - * default 8k blocks) seems like a reasonable guess for now. - */ -#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* Main entry point for walsender process */ int @@ -157,6 +147,9 @@ WalSenderMain(void) return WalSndLoop(); } +/* + * Execute commands from walreceiver, until we enter streaming mode. + */ static void WalSndHandshake(void) { @@ -172,6 +165,13 @@ WalSndHandshake(void) /* Wait for a command to arrive */ firstchar = pq_getbyte(); + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (!PostmasterIsAlive(true)) + exit(1); + /* * Check for any other interesting events that happened while we * slept. @@ -211,7 +211,7 @@ WalSndHandshake(void) /* * Reply with a result set with one row, two columns. - * First col is system ID, and second if timeline ID + * First col is system ID, and second is timeline ID */ snprintf(sysid, sizeof(sysid), UINT64_FORMAT, @@ -253,6 +253,7 @@ WalSndHandshake(void) /* Send CommandComplete and ReadyForQuery messages */ EndCommand("SELECT", DestRemote); ReadyForQuery(DestRemote); + /* ReadyForQuery did pq_flush for us */ } else if (sscanf(query_string, "START_REPLICATION %X/%X", &recptr.xlogid, &recptr.xrecoff) == 2) @@ -365,12 +366,17 @@ CheckClosedConnection(void) static int WalSndLoop(void) { - StringInfoData output_message; + char *output_message; bool caughtup = false; - initStringInfo(&output_message); + /* + * Allocate buffer that will be used for each output message. We do this + * just once to reduce palloc overhead. The buffer must be made large + * enough for maximum-sized messages. + */ + output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE); - /* Loop forever */ + /* Loop forever, unless we get an error */ for (;;) { long remain; /* remaining time (us) */ @@ -381,6 +387,7 @@ WalSndLoop(void) */ if (!PostmasterIsAlive(true)) exit(1); + /* Process any requests or signals received recently */ if (got_SIGHUP) { @@ -394,8 +401,8 @@ WalSndLoop(void) */ if (ready_to_stop) { - if (!XLogSend(&output_message, &caughtup)) - goto eof; + if (!XLogSend(output_message, &caughtup)) + break; if (caughtup) shutdown_requested = true; } @@ -435,17 +442,15 @@ WalSndLoop(void) remain -= NAPTIME_PER_CYCLE; } } + /* Attempt to send the log once every loop */ - if (!XLogSend(&output_message, &caughtup)) - goto eof; + if (!XLogSend(output_message, &caughtup)) + break; } - /* can't get here because the above loop never exits */ - return 1; - -eof: - /* + * Get here on send failure. Clean up and exit. + * * Reset whereToSendOutput to prevent ereport from attempting to send any * more messages to the standby. */ @@ -524,6 +529,9 @@ WalSndKill(int code, Datum arg) /* * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr' + * + * XXX probably this should be improved to suck data directly from the + * WAL buffers when possible. */ static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) @@ -634,51 +642,46 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) /* * Read up to MAX_SEND_SIZE bytes of WAL that's been written (and flushed), - * but not yet sent to the client, and send it. If there is no unsent WAL, - * *caughtup is set to true and nothing is sent, otherwise *caughtup is set - * to false. + * but not yet sent to the client, and send it. + * + * msgbuf is a work area in which the output message is constructed. It's + * passed in just so we can avoid re-palloc'ing the buffer on each cycle. + * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE. + * + * If there is no unsent WAL remaining, *caughtup is set to true, otherwise + * *caughtup is set to false. * * Returns true if OK, false if trouble. */ static bool -XLogSend(StringInfo outMsg, bool *caughtup) +XLogSend(char *msgbuf, bool *caughtup) { XLogRecPtr SendRqstPtr; XLogRecPtr startptr; XLogRecPtr endptr; Size nbytes; - char activitymsg[50]; - - /* use volatile pointer to prevent code rearrangement */ - volatile WalSnd *walsnd = MyWalSnd; + WalDataMessageHeader msghdr; /* Attempt to send all records flushed to the disk already */ SendRqstPtr = GetWriteRecPtr(); /* Quick exit if nothing to do */ - if (!XLByteLT(sentPtr, SendRqstPtr)) + if (XLByteLE(SendRqstPtr, sentPtr)) { *caughtup = true; return true; } - /* - * Otherwise let the caller know that we're not fully caught up. Unless - * there's a huge backlog, we'll be caught up to the current WriteRecPtr - * after we've sent everything below, but more WAL could accumulate while - * we're busy sending. - */ - *caughtup = false; /* - * Figure out how much to send in one message. If there's less than + * Figure out how much to send in one message. If there's no more than * MAX_SEND_SIZE bytes to send, send everything. Otherwise send - * MAX_SEND_SIZE bytes, but round to page boundary. + * MAX_SEND_SIZE bytes, but round to logfile or page boundary. * * The rounding is not only for performance reasons. Walreceiver * relies on the fact that we never split a WAL record across two * messages. Since a long WAL record is split at page boundary into * continuation records, page boundary is always a safe cut-off point. - * We also assume that SendRqstPtr never points in the middle of a WAL + * We also assume that SendRqstPtr never points to the middle of a WAL * record. */ startptr = sentPtr; @@ -694,59 +697,78 @@ XLogSend(StringInfo outMsg, bool *caughtup) endptr = startptr; XLByteAdvance(endptr, MAX_SEND_SIZE); - /* round down to page boundary. */ - endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ); - /* if we went beyond SendRqstPtr, back off */ - if (XLByteLT(SendRqstPtr, endptr)) - endptr = SendRqstPtr; - - /* - * OK to read and send the slice. - * - * We don't need to convert the xlogid/xrecoff from host byte order to - * network byte order because the both server can be expected to have - * the same byte order. If they have different byte order, we don't - * reach here. - */ - pq_sendbyte(outMsg, 'w'); - pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr)); - if (endptr.xlogid != startptr.xlogid) { + /* Don't cross a logfile boundary within one message */ Assert(endptr.xlogid == startptr.xlogid + 1); - nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff; + endptr.xlogid = startptr.xlogid; + endptr.xrecoff = XLogFileSize; + } + + /* if we went beyond SendRqstPtr, back off */ + if (XLByteLE(SendRqstPtr, endptr)) + { + endptr = SendRqstPtr; + *caughtup = true; } else - nbytes = endptr.xrecoff - startptr.xrecoff; + { + /* round down to page boundary. */ + endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ); + *caughtup = false; + } - sentPtr = endptr; + nbytes = endptr.xrecoff - startptr.xrecoff; + Assert(nbytes <= MAX_SEND_SIZE); /* - * Read the log directly into the output buffer to prevent extra - * memcpy calls. + * OK to read and send the slice. */ - enlargeStringInfo(outMsg, nbytes); + msgbuf[0] = 'w'; - XLogRead(&outMsg->data[outMsg->len], startptr, nbytes); - outMsg->len += nbytes; - outMsg->data[outMsg->len] = '\0'; + /* + * Read the log directly into the output buffer to avoid extra memcpy + * calls. + */ + XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes); - pq_putmessage('d', outMsg->data, outMsg->len); - resetStringInfo(outMsg); + /* + * We fill the message header last so that the send timestamp is taken + * as late as possible. + */ + msghdr.dataStart = startptr; + msghdr.walEnd = SendRqstPtr; + msghdr.sendTime = GetCurrentTimestamp(); - /* Update shared memory status */ - SpinLockAcquire(&walsnd->mutex); - walsnd->sentPtr = sentPtr; - SpinLockRelease(&walsnd->mutex); + memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader)); + + pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes); /* Flush pending output */ if (pq_flush()) return false; + sentPtr = endptr; + + /* Update shared memory status */ + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->sentPtr = sentPtr; + SpinLockRelease(&walsnd->mutex); + } + /* Report progress of XLOG streaming in PS display */ - snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", - sentPtr.xlogid, sentPtr.xrecoff); - set_ps_display(activitymsg, false); + if (update_process_title) + { + char activitymsg[50]; + + snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", + sentPtr.xlogid, sentPtr.xrecoff); + set_ps_display(activitymsg, false); + } return true; } diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h new file mode 100644 index 0000000000..15025a277c --- /dev/null +++ b/src/include/replication/walprotocol.h @@ -0,0 +1,53 @@ +/*------------------------------------------------------------------------- + * + * walprotocol.h + * Definitions relevant to the streaming WAL transmission protocol. + * + * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group + * + * $PostgreSQL: pgsql/src/include/replication/walprotocol.h,v 1.1 2010/06/03 22:17:32 tgl Exp $ + * + *------------------------------------------------------------------------- + */ +#ifndef _WALPROTOCOL_H +#define _WALPROTOCOL_H + +#include "access/xlogdefs.h" +#include "utils/timestamp.h" + + +/* + * Header for a WAL data message (message type 'w'). This is wrapped within + * a CopyData message at the FE/BE protocol level. + * + * The header is followed by actual WAL data. Note that the data length is + * not specified in the header --- it's just whatever remains in the message. + * + * walEnd and sendTime are not essential data, but are provided in case + * the receiver wants to adjust its behavior depending on how far behind + * it is. + */ +typedef struct +{ + /* WAL start location of the data included in this message */ + XLogRecPtr dataStart; + + /* Current end of WAL on the sender */ + XLogRecPtr walEnd; + + /* Sender's system clock at the time of transmission */ + TimestampTz sendTime; +} WalDataMessageHeader; + +/* + * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ. + * + * We don't have a good idea of what a good value would be; there's some + * overhead per message in both walsender and walreceiver, but on the other + * hand sending large batches makes walsender less responsive to signals + * because signals are checked only between messages. 128kB (with + * default 8k blocks) seems like a reasonable guess for now. + */ +#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) + +#endif /* _WALPROTOCOL_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 4300b80b27..5dcaeba3f3 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -5,7 +5,7 @@ * * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group * - * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.8 2010/02/26 02:01:27 momjian Exp $ + * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.9 2010/06/03 22:17:32 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -14,6 +14,7 @@ #include "access/xlogdefs.h" #include "storage/spin.h" +#include "pgtime.h" extern bool am_walreceiver;