* shutdown checkpoint record, and then exit.
*
*
- * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/walsender.c
#include <signal.h>
#include <unistd.h>
-#include "funcapi.h"
-#include "access/xlog_internal.h"
#include "access/transam.h"
+#include "access/xlog_internal.h"
#include "catalog/pg_type.h"
+#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
+#include "nodes/replnodes.h"
#include "replication/basebackup.h"
-#include "replication/replnodes.h"
+#include "replication/syncrep.h"
#include "replication/walprotocol.h"
+#include "replication/walreceiver.h"
#include "replication/walsender.h"
+#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
+#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
+#include "utils/timeout.h"
+#include "utils/timestamp.h"
/* Array of WalSnds in shared memory */
WalSndCtlData *WalSndCtl = NULL;
/* My slot in the shared memory array */
-static WalSnd *MyWalSnd = NULL;
+WalSnd *MyWalSnd = NULL;
/* Global state */
bool am_walsender = false; /* Am I a walsender process ? */
+bool am_cascading_walsender = false; /* Am I cascading WAL to
+ * another standby ? */
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
-int WalSndDelay = 200; /* max sleep time between some actions */
+int replication_timeout = 60 * 1000; /* maximum time to send one
+ * WAL data message */
+/*
+ * State for WalSndWakeupRequest
+ */
+bool wake_wal_senders = false;
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
* but for walsender to read the XLOG.
*/
static int sendFile = -1;
-static uint32 sendId = 0;
-static uint32 sendSeg = 0;
+static XLogSegNo sendSegNo = 0;
static uint32 sendOff = 0;
/*
* How far have we sent WAL already? This is also advertised in
* MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
*/
-static XLogRecPtr sentPtr = {0, 0};
+static XLogRecPtr sentPtr = 0;
+
+/*
+ * Buffer for processing reply messages.
+ */
+static StringInfoData reply_message;
+
+/*
+ * Timestamp of the last receipt of the reply from the standby.
+ */
+static TimestampTz last_reply_timestamp;
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
/* Prototypes for private functions */
static bool HandleReplicationCommand(const char *cmd_string);
-static int WalSndLoop(void);
+static void WalSndLoop(void) __attribute__((noreturn));
static void InitWalSnd(void);
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
-static bool XLogSend(char *msgbuf, bool *caughtup);
+static void XLogSend(char *msgbuf, bool *caughtup);
static void IdentifySystem(void);
-static void StartReplication(StartReplicationCmd * cmd);
+static void StartReplication(StartReplicationCmd *cmd);
+static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
+static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void);
+static void WalSndKeepalive(char *msgbuf);
/* Main entry point for walsender process */
-int
+void
WalSenderMain(void)
{
MemoryContext walsnd_context;
- if (RecoveryInProgress())
- ereport(FATAL,
- (errcode(ERRCODE_CANNOT_CONNECT_NOW),
- errmsg("recovery is still in progress, can't accept WAL streaming connections")));
+ am_cascading_walsender = RecoveryInProgress();
/* Create a per-walsender data structure in shared memory */
InitWalSnd();
/* Unblock signals (they were blocked when the postmaster forked us) */
PG_SETMASK(&UnBlockSig);
+ /*
+ * Use the recovery target timeline ID during recovery
+ */
+ if (am_cascading_walsender)
+ ThisTimeLineID = GetRecoveryTargetTLI();
+
/* Tell the standby that walsender is ready for receiving commands */
ReadyForQuery(DestRemote);
SpinLockRelease(&walsnd->mutex);
}
+ SyncRepInitConfig();
+
/* Main loop of walsender */
- return WalSndLoop();
+ WalSndLoop();
}
/*
* Emergency bailout if postmaster has died. This is to avoid the
* necessity for manual cleanup of all postmaster children.
*/
- if (!PostmasterIsAlive(true))
+ if (!PostmasterIsAlive())
exit(1);
/*
XLogRecPtr logptr;
/*
- * Reply with a result set with one row, three columns. First col is system
- * ID, second is timeline ID, and third is current xlog location.
+ * Reply with a result set with one row, three columns. First col is
+ * system ID, second is timeline ID, and third is current xlog location.
*/
snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
GetSystemIdentifier());
snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
- logptr = GetInsertRecPtr();
+ logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
- snprintf(xpos, sizeof(xpos), "%X/%X",
- logptr.xlogid, logptr.xrecoff);
+ snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
/* Send a RowDescription message */
pq_beginmessage(&buf, 'T');
* START_REPLICATION
*/
static void
-StartReplication(StartReplicationCmd * cmd)
+StartReplication(StartReplicationCmd *cmd)
{
StringInfoData buf;
/*
- * Let postmaster know that we're streaming. Once we've declared us as
- * a WAL sender process, postmaster will let us outlive the bgwriter and
- * kill us last in the shutdown sequence, so we get a chance to stream
- * all remaining WAL at shutdown, including the shutdown checkpoint.
- * Note that there's no going back, and we mustn't write any WAL records
- * after this.
+ * Let postmaster know that we're streaming. Once we've declared us as a
+ * WAL sender process, postmaster will let us outlive the bgwriter and
+ * kill us last in the shutdown sequence, so we get a chance to stream all
+ * remaining WAL at shutdown, including the shutdown checkpoint. Note that
+ * there's no going back, and we mustn't write any WAL records after this.
*/
MarkPostmasterChildWalSender();
+ SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
/*
- * Check that we're logging enough information in the WAL for
- * log-shipping.
+ * When promoting a cascading standby, postmaster sends SIGUSR2 to any
+ * cascading walsenders to kill them. But there is a corner-case where
+ * such walsender fails to receive SIGUSR2 and survives a standby
+ * promotion unexpectedly. This happens when postmaster sends SIGUSR2
+ * before the walsender marks itself as a WAL sender, because postmaster
+ * sends SIGUSR2 to only the processes marked as a WAL sender.
+ *
+ * To avoid this corner-case, if recovery is NOT in progress even though
+ * the walsender is cascading one, we do the same thing as SIGUSR2 signal
+ * handler does, i.e., set walsender_ready_to_stop to true. Which causes
+ * the walsender to end later.
*
- * NOTE: This only checks the current value of wal_level. Even if the
- * current setting is not 'minimal', there can be old WAL in the pg_xlog
- * directory that was created with 'minimal'. So this is not bulletproof,
- * the purpose is just to give a user-friendly error message that hints
- * how to configure the system correctly.
+ * When terminating cascading walsenders, usually postmaster writes the
+ * log message announcing the terminations. But there is a race condition
+ * here. If there is no walsender except this process before reaching
+ * here, postmaster thinks that there is no walsender and suppresses that
+ * log message. To handle this case, we always emit that log message here.
+ * This might cause duplicate log messages, but which is less likely to
+ * happen, so it's not worth writing some code to suppress them.
+ */
+ if (am_cascading_walsender && !RecoveryInProgress())
+ {
+ ereport(LOG,
+ (errmsg("terminating walsender process to force cascaded standby "
+ "to update timeline and reconnect")));
+ walsender_ready_to_stop = true;
+ }
+
+ /*
+ * We assume here that we're logging enough information in the WAL for
+ * log-shipping, since this is checked in PostmasterMain().
+ *
+ * NOTE: wal_level can only change at shutdown, so in most cases it is
+ * difficult for there to be WAL data that we can still see that was
+ * written at wal_level='minimal'.
*/
- if (wal_level == WAL_LEVEL_MINIMAL)
- ereport(FATAL,
- (errcode(ERRCODE_CANNOT_CONNECT_NOW),
- errmsg("standby connections not allowed because wal_level=minimal")));
+
+ /*
+ * When we first start replication the standby will be behind the primary.
+ * For some applications, for example, synchronous replication, it is
+ * important to have a clear state for this initial catchup mode, so we
+ * can trigger actions when we change streaming state later. We may stay
+ * in this state for a long time, which is exactly why we want to be able
+ * to monitor whether or not we are still here.
+ */
+ WalSndSetState(WALSNDSTATE_CATCHUP);
/* Send a CopyBothResponse message, and start streaming */
pq_beginmessage(&buf, 'W');
{
unsigned char firstchar;
int r;
+ bool received = false;
- r = pq_getbyte_if_available(&firstchar);
- if (r < 0)
- {
- /* unexpected error or EOF */
- ereport(COMMERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("unexpected EOF on standby connection")));
- proc_exit(0);
- }
- if (r == 0)
- {
- /* no data available without blocking */
- return;
- }
-
- /* Handle the very limited subset of commands expected in this phase */
- switch (firstchar)
+ for (;;)
{
- /*
- * 'd' means a standby reply wrapped in a COPY BOTH packet.
- */
- case 'd':
- ProcessStandbyReplyMessage();
+ r = pq_getbyte_if_available(&firstchar);
+ if (r < 0)
+ {
+ /* unexpected error or EOF */
+ ereport(COMMERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected EOF on standby connection")));
+ proc_exit(0);
+ }
+ if (r == 0)
+ {
+ /* no data available without blocking */
break;
+ }
- /*
- * 'X' means that the standby is closing down the socket.
- */
- case 'X':
- proc_exit(0);
+ /* Handle the very limited subset of commands expected in this phase */
+ switch (firstchar)
+ {
+ /*
+ * 'd' means a standby reply wrapped in a CopyData packet.
+ */
+ case 'd':
+ ProcessStandbyMessage();
+ received = true;
+ break;
- default:
- ereport(FATAL,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("invalid standby closing message type %d",
- firstchar)));
+ /*
+ * 'X' means that the standby is closing down the socket.
+ */
+ case 'X':
+ proc_exit(0);
+
+ default:
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid standby message type \"%c\"",
+ firstchar)));
+ }
}
+
+ /*
+ * Save the last reply timestamp if we've received at least one reply.
+ */
+ if (received)
+ last_reply_timestamp = GetCurrentTimestamp();
}
/*
* Process a status update message received from standby.
*/
static void
-ProcessStandbyReplyMessage(void)
+ProcessStandbyMessage(void)
{
- static StringInfoData input_message;
- StandbyReplyMessage reply;
- char msgtype;
+ char msgtype;
- initStringInfo(&input_message);
+ resetStringInfo(&reply_message);
/*
* Read the message contents.
*/
- if (pq_getmessage(&input_message, 0))
+ if (pq_getmessage(&reply_message, 0))
{
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
}
/*
- * Check message type from the first byte. At the moment, there is only
- * one type.
+ * Check message type from the first byte.
*/
- msgtype = pq_getmsgbyte(&input_message);
- if (msgtype != 'r')
- ereport(COMMERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("unexpected message type %c", msgtype)));
+ msgtype = pq_getmsgbyte(&reply_message);
+
+ switch (msgtype)
+ {
+ case 'r':
+ ProcessStandbyReplyMessage();
+ break;
+
+ case 'h':
+ ProcessStandbyHSFeedbackMessage();
+ break;
+
+ default:
+ ereport(COMMERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected message type \"%c\"", msgtype)));
+ proc_exit(0);
+ }
+}
- pq_copymsgbytes(&input_message, (char *) &reply, sizeof(StandbyReplyMessage));
+/*
+ * Regular reply from standby advising of WAL positions on standby server.
+ */
+static void
+ProcessStandbyReplyMessage(void)
+{
+ StandbyReplyMessage reply;
- elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X ",
- reply.write.xlogid, reply.write.xrecoff,
- reply.flush.xlogid, reply.flush.xrecoff,
- reply.apply.xlogid, reply.apply.xrecoff);
+ pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
+
+ elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
+ (uint32) (reply.write >> 32), (uint32) reply.write,
+ (uint32) (reply.flush >> 32), (uint32) reply.flush,
+ (uint32) (reply.apply >> 32), (uint32) reply.apply);
/*
- * Update shared state for this WalSender process
- * based on reply data from standby.
+ * Update shared state for this WalSender process based on reply data from
+ * standby.
*/
{
/* use volatile pointer to prevent code rearrangement */
walsnd->apply = reply.apply;
SpinLockRelease(&walsnd->mutex);
}
+
+ if (!am_cascading_walsender)
+ SyncRepReleaseWaiters();
+}
+
+/*
+ * Hot Standby feedback
+ */
+static void
+ProcessStandbyHSFeedbackMessage(void)
+{
+ StandbyHSFeedbackMessage reply;
+ TransactionId nextXid;
+ uint32 nextEpoch;
+
+ /* Decipher the reply message */
+ pq_copymsgbytes(&reply_message, (char *) &reply,
+ sizeof(StandbyHSFeedbackMessage));
+
+ elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
+ reply.xmin,
+ reply.epoch);
+
+ /* Ignore invalid xmin (can't actually happen with current walreceiver) */
+ if (!TransactionIdIsNormal(reply.xmin))
+ return;
+
+ /*
+ * Check that the provided xmin/epoch are sane, that is, not in the future
+ * and not so far back as to be already wrapped around. Ignore if not.
+ *
+ * Epoch of nextXid should be same as standby, or if the counter has
+ * wrapped, then one greater than standby.
+ */
+ GetNextXidAndEpoch(&nextXid, &nextEpoch);
+
+ if (reply.xmin <= nextXid)
+ {
+ if (reply.epoch != nextEpoch)
+ return;
+ }
+ else
+ {
+ if (reply.epoch + 1 != nextEpoch)
+ return;
+ }
+
+ if (!TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
+ return; /* epoch OK, but it's wrapped around */
+
+ /*
+ * Set the WalSender's xmin equal to the standby's requested xmin, so that
+ * the xmin will be taken into account by GetOldestXmin. This will hold
+ * back the removal of dead rows and thereby prevent the generation of
+ * cleanup conflicts on the standby server.
+ *
+ * There is a small window for a race condition here: although we just
+ * checked that reply.xmin precedes nextXid, the nextXid could have gotten
+ * advanced between our fetching it and applying the xmin below, perhaps
+ * far enough to make reply.xmin wrap around. In that case the xmin we
+ * set here would be "in the future" and have no effect. No point in
+ * worrying about this since it's too late to save the desired data
+ * anyway. Assuming that the standby sends us an increasing sequence of
+ * xmins, this could only happen during the first reply cycle, else our
+ * own xmin would prevent nextXid from advancing so far.
+ *
+ * We don't bother taking the ProcArrayLock here. Setting the xmin field
+ * is assumed atomic, and there's no real need to prevent a concurrent
+ * GetOldestXmin. (If we're moving our xmin forward, this is obviously
+ * safe, and if we're moving it backwards, well, the data is at risk
+ * already since a VACUUM could have just finished calling GetOldestXmin.)
+ */
+ MyPgXact->xmin = reply.xmin;
}
/* Main loop of walsender process */
-static int
+static void
WalSndLoop(void)
{
char *output_message;
*/
output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
+ /*
+ * Allocate buffer that will be used for processing reply messages. As
+ * above, do this just once to reduce palloc overhead.
+ */
+ initStringInfo(&reply_message);
+
+ /* Initialize the last reply timestamp */
+ last_reply_timestamp = GetCurrentTimestamp();
+
/* Loop forever, unless we get an error */
for (;;)
{
+ /* Clear any already-pending wakeups */
+ ResetLatch(&MyWalSnd->latch);
+
/*
* Emergency bailout if postmaster has died. This is to avoid the
* necessity for manual cleanup of all postmaster children.
*/
- if (!PostmasterIsAlive(true))
+ if (!PostmasterIsAlive())
exit(1);
/* Process any requests or signals received recently */
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
- }
-
- /*
- * When SIGUSR2 arrives, we send all outstanding logs up to the
- * shutdown checkpoint record (i.e., the latest record) and exit.
- */
- if (walsender_ready_to_stop)
- {
- if (!XLogSend(output_message, &caughtup))
- break;
- ProcessRepliesIfAny();
- if (caughtup)
- walsender_shutdown_requested = true;
+ SyncRepInitConfig();
}
/* Normal exit from the walsender is here */
if (walsender_shutdown_requested)
{
- /* Inform the standby that XLOG streaming was done */
+ /* Inform the standby that XLOG streaming is done */
pq_puttextmessage('C', "COPY 0");
pq_flush();
proc_exit(0);
}
+ /* Check for input from the client */
+ ProcessRepliesIfAny();
+
/*
- * If we had sent all accumulated WAL in last round, nap for the
- * configured time before retrying.
+ * If we don't have any pending data in the output buffer, try to send
+ * some more. If there is some, we don't bother to call XLogSend
+ * again until we've flushed it ... but we'd better assume we are not
+ * caught up.
*/
- if (caughtup)
+ if (!pq_is_send_pending())
+ XLogSend(output_message, &caughtup);
+ else
+ caughtup = false;
+
+ /* Try to flush pending output to the client */
+ if (pq_flush_if_writable() != 0)
+ break;
+
+ /* If nothing remains to be sent right now ... */
+ if (caughtup && !pq_is_send_pending())
{
/*
- * Even if we wrote all the WAL that was available when we started
- * sending, more might have arrived while we were sending this
- * batch. We had the latch set while sending, so we have not
- * received any signals from that time. Let's arm the latch
- * again, and after that check that we're still up-to-date.
+ * If we're in catchup state, move to streaming. This is an
+ * important state change for users to know about, since before
+ * this point data loss might occur if the primary dies and we
+ * need to failover to the standby. The state change is also
+ * important for synchronous replication, since commits that
+ * started to wait at that point might wait for some time.
*/
- ResetLatch(&MyWalSnd->latch);
-
- if (!XLogSend(output_message, &caughtup))
- break;
- if (caughtup && !got_SIGHUP && !walsender_ready_to_stop && !walsender_shutdown_requested)
+ if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
{
- /*
- * XXX: We don't really need the periodic wakeups anymore,
- * WaitLatchOrSocket should reliably wake up as soon as
- * something interesting happens.
- */
+ ereport(DEBUG1,
+ (errmsg("standby \"%s\" has now caught up with primary",
+ application_name)));
+ WalSndSetState(WALSNDSTATE_STREAMING);
+ }
- /* Sleep */
- WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
- WalSndDelay * 1000L);
+ /*
+ * When SIGUSR2 arrives, we send any outstanding logs up to the
+ * shutdown checkpoint record (i.e., the latest record) and exit.
+ * This may be a normal termination at shutdown, or a promotion,
+ * the walsender is not sure which.
+ */
+ if (walsender_ready_to_stop)
+ {
+ /* ... let's just be real sure we're caught up ... */
+ XLogSend(output_message, &caughtup);
+ if (caughtup && !pq_is_send_pending())
+ {
+ walsender_shutdown_requested = true;
+ continue; /* don't want to wait more */
+ }
}
}
- else
+
+ /*
+ * We don't block if not caught up, unless there is unsent data
+ * pending in which case we'd better block until the socket is
+ * write-ready. This test is only needed for the case where XLogSend
+ * loaded a subset of the available data but then pq_flush_if_writable
+ * flushed it all --- we should immediately try to send more.
+ */
+ if (caughtup || pq_is_send_pending())
{
- /* Attempt to send the log once every loop */
- if (!XLogSend(output_message, &caughtup))
+ TimestampTz timeout = 0;
+ long sleeptime = 10000; /* 10 s */
+ int wakeEvents;
+
+ wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
+ WL_SOCKET_READABLE | WL_TIMEOUT;
+
+ if (pq_is_send_pending())
+ wakeEvents |= WL_SOCKET_WRITEABLE;
+ else
+ {
+ WalSndKeepalive(output_message);
+ /* Try to flush pending output to the client */
+ if (pq_flush_if_writable() != 0)
+ break;
+ }
+
+ /* Determine time until replication timeout */
+ if (replication_timeout > 0)
+ {
+ timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
+ replication_timeout);
+ sleeptime = 1 + (replication_timeout / 10);
+ }
+
+ /* Sleep until something happens or replication timeout */
+ WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+ MyProcPort->sock, sleeptime);
+
+ /*
+ * Check for replication timeout. Note we ignore the corner case
+ * possibility that the client replied just as we reached the
+ * timeout ... he's supposed to reply *before* that.
+ */
+ if (replication_timeout > 0 &&
+ GetCurrentTimestamp() >= timeout)
+ {
+ /*
+ * Since typically expiration of replication timeout means
+ * communication problem, we don't send the error message to
+ * the standby.
+ */
+ ereport(COMMERROR,
+ (errmsg("terminating walsender process due to replication timeout")));
break;
+ }
}
-
- /* Update our state to indicate if we're behind or not */
- WalSndSetState(caughtup ? WALSNDSTATE_STREAMING : WALSNDSTATE_CATCHUP);
- ProcessRepliesIfAny();
}
/*
whereToSendOutput = DestNone;
proc_exit(0);
- return 1; /* keep the compiler quiet */
+ abort(); /* keep the compiler quiet */
}
/* Initialize a per-walsender data structure for this walsender process */
}
/*
- * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
+ * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
*
* XXX probably this should be improved to suck data directly from the
* WAL buffers when possible.
* more than one.
*/
void
-XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
+XLogRead(char *buf, XLogRecPtr startptr, Size count)
{
- XLogRecPtr startRecPtr = recptr;
- char path[MAXPGPATH];
- uint32 lastRemovedLog;
- uint32 lastRemovedSeg;
- uint32 log;
- uint32 seg;
+ char *p;
+ XLogRecPtr recptr;
+ Size nbytes;
+ XLogSegNo lastRemovedSegNo;
+ XLogSegNo segno;
+
+retry:
+ p = buf;
+ recptr = startptr;
+ nbytes = count;
while (nbytes > 0)
{
int segbytes;
int readbytes;
- startoff = recptr.xrecoff % XLogSegSize;
+ startoff = recptr % XLogSegSize;
- if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
+ if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
{
+ char path[MAXPGPATH];
+
/* Switch to another logfile segment */
if (sendFile >= 0)
close(sendFile);
- XLByteToSeg(recptr, sendId, sendSeg);
- XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
+ XLByteToSeg(recptr, sendSegNo);
+ XLogFilePath(path, ThisTimeLineID, sendSegNo);
sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (sendFile < 0)
* removed or recycled.
*/
if (errno == ENOENT)
- {
- char filename[MAXFNAMELEN];
-
- XLogFileName(filename, ThisTimeLineID, sendId, sendSeg);
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
- filename)));
- }
+ XLogFileNameP(ThisTimeLineID, sendSegNo))));
else
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
- path, sendId, sendSeg)));
+ errmsg("could not open file \"%s\": %m",
+ path)));
}
sendOff = 0;
}
if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not seek in log file %u, segment %u to offset %u: %m",
- sendId, sendSeg, startoff)));
+ errmsg("could not seek in log segment %s to offset %u: %m",
+ XLogFileNameP(ThisTimeLineID, sendSegNo),
+ startoff)));
sendOff = startoff;
}
else
segbytes = nbytes;
- readbytes = read(sendFile, buf, segbytes);
+ readbytes = read(sendFile, p, segbytes);
if (readbytes <= 0)
+ {
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not read from log file %u, segment %u, offset %u, "
- "length %lu: %m",
- sendId, sendSeg, sendOff, (unsigned long) segbytes)));
+ errmsg("could not read from log segment %s, offset %u, length %lu: %m",
+ XLogFileNameP(ThisTimeLineID, sendSegNo),
+ sendOff, (unsigned long) segbytes)));
+ }
/* Update state for read */
XLByteAdvance(recptr, readbytes);
sendOff += readbytes;
nbytes -= readbytes;
- buf += readbytes;
+ p += readbytes;
}
/*
* read() succeeds in that case, but the data we tried to read might
* already have been overwritten with new WAL records.
*/
- XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
- XLByteToSeg(startRecPtr, log, seg);
- if (log < lastRemovedLog ||
- (log == lastRemovedLog && seg <= lastRemovedSeg))
- {
- char filename[MAXFNAMELEN];
-
- XLogFileName(filename, ThisTimeLineID, log, seg);
+ XLogGetLastRemoved(&lastRemovedSegNo);
+ XLByteToSeg(startptr, segno);
+ if (segno <= lastRemovedSegNo)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
- filename)));
+ XLogFileNameP(ThisTimeLineID, segno))));
+
+ /*
+ * During recovery, the currently-open WAL file might be replaced with the
+ * file of the same name retrieved from archive. So we always need to
+ * check what we read was valid after reading into the buffer. If it's
+ * invalid, we try to open and read the file again.
+ */
+ if (am_cascading_walsender)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+ bool reload;
+
+ SpinLockAcquire(&walsnd->mutex);
+ reload = walsnd->needreload;
+ walsnd->needreload = false;
+ SpinLockRelease(&walsnd->mutex);
+
+ if (reload && sendFile >= 0)
+ {
+ close(sendFile);
+ sendFile = -1;
+
+ goto retry;
+ }
}
}
/*
* Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
- * but not yet sent to the client, and send it.
+ * but not yet sent to the client, and buffer it in the libpq output
+ * buffer.
*
* msgbuf is a work area in which the output message is constructed. It's
* passed in just so we can avoid re-palloc'ing the buffer on each cycle.
*
* If there is no unsent WAL remaining, *caughtup is set to true, otherwise
* *caughtup is set to false.
- *
- * Returns true if OK, false if trouble.
+
*/
-static bool
+static void
XLogSend(char *msgbuf, bool *caughtup)
{
XLogRecPtr SendRqstPtr;
* subsequently crashes and restarts, slaves must not have applied any WAL
* that gets lost on the master.
*/
- SendRqstPtr = GetFlushRecPtr();
+ SendRqstPtr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr();
/* Quick exit if nothing to do */
if (XLByteLE(SendRqstPtr, sentPtr))
{
*caughtup = true;
- return true;
+ return;
}
/*
* SendRqstPtr never points to the middle of a WAL record.
*/
startptr = sentPtr;
- if (startptr.xrecoff >= XLogFileSize)
- {
- /*
- * crossing a logid boundary, skip the non-existent last log segment
- * in previous logical log file.
- */
- startptr.xlogid += 1;
- startptr.xrecoff = 0;
- }
-
endptr = startptr;
XLByteAdvance(endptr, MAX_SEND_SIZE);
- if (endptr.xlogid != startptr.xlogid)
- {
- /* Don't cross a logfile boundary within one message */
- Assert(endptr.xlogid == startptr.xlogid + 1);
- endptr.xlogid = startptr.xlogid;
- endptr.xrecoff = XLogFileSize;
- }
/* if we went beyond SendRqstPtr, back off */
if (XLByteLE(SendRqstPtr, endptr))
else
{
/* round down to page boundary. */
- endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
+ endptr -= (endptr % XLOG_BLCKSZ);
*caughtup = false;
}
- nbytes = endptr.xrecoff - startptr.xrecoff;
+ nbytes = endptr - startptr;
Assert(nbytes <= MAX_SEND_SIZE);
/*
memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
- pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
-
- /* Flush pending output to the client */
- if (pq_flush())
- return false;
+ pq_putmessage_noblock('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
sentPtr = endptr;
char activitymsg[50];
snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
- sentPtr.xlogid, sentPtr.xrecoff);
+ (uint32) (sentPtr >> 32), (uint32) sentPtr);
set_ps_display(activitymsg, false);
}
- return true;
+ return;
+}
+
+/*
+ * Request walsenders to reload the currently-open WAL file
+ */
+void
+WalSndRqstFileReload(void)
+{
+ int i;
+
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+ if (walsnd->pid == 0)
+ continue;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->needreload = true;
+ SpinLockRelease(&walsnd->mutex);
+ }
}
/* SIGHUP: set flag to re-read config file at next convenient time */
static void
WalSndSigHupHandler(SIGNAL_ARGS)
{
+ int save_errno = errno;
+
got_SIGHUP = true;
if (MyWalSnd)
SetLatch(&MyWalSnd->latch);
+
+ errno = save_errno;
}
/* SIGTERM: set flag to shut down */
static void
WalSndShutdownHandler(SIGNAL_ARGS)
{
+ int save_errno = errno;
+
walsender_shutdown_requested = true;
if (MyWalSnd)
SetLatch(&MyWalSnd->latch);
+
+ /*
+ * Set the standard (non-walsender) state as well, so that we can abort
+ * things like do_pg_stop_backup().
+ */
+ InterruptPending = true;
+ ProcDiePending = true;
+
+ errno = save_errno;
}
/*
static void
WalSndXLogSendHandler(SIGNAL_ARGS)
{
+ int save_errno = errno;
+
latch_sigusr1_handler();
+
+ errno = save_errno;
}
/* SIGUSR2: set flag to do a last cycle and shut down afterwards */
static void
WalSndLastCycleHandler(SIGNAL_ARGS)
{
+ int save_errno = errno;
+
walsender_ready_to_stop = true;
if (MyWalSnd)
SetLatch(&MyWalSnd->latch);
+
+ errno = save_errno;
}
/* Set up signal handlers */
pqsignal(SIGINT, SIG_IGN); /* not used */
pqsignal(SIGTERM, WalSndShutdownHandler); /* request shutdown */
pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */
- pqsignal(SIGALRM, SIG_IGN);
+ InitializeTimeouts(); /* establishes SIGALRM handler */
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
/* First time through, so initialize */
MemSet(WalSndCtl, 0, WalSndShmemSize());
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+ SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
+
for (i = 0; i < max_wal_senders; i++)
{
WalSnd *walsnd = &WalSndCtl->walsnds[i];
}
}
-/* Wake up all walsenders */
+/*
+ * Wake up all walsenders
+ *
+ * This will be called inside critical sections, so throwing an error is not
+ * adviseable.
+ */
void
WalSndWakeup(void)
{
- int i;
+ int i;
for (i = 0; i < max_wal_senders; i++)
SetLatch(&WalSndCtl->walsnds[i].latch);
switch (state)
{
case WALSNDSTATE_STARTUP:
- return "STARTUP";
+ return "startup";
case WALSNDSTATE_BACKUP:
- return "BACKUP";
+ return "backup";
case WALSNDSTATE_CATCHUP:
- return "CATCHUP";
+ return "catchup";
case WALSNDSTATE_STREAMING:
- return "STREAMING";
+ return "streaming";
}
return "UNKNOWN";
}
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_WAL_SENDERS_COLS 6
- ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
- TupleDesc tupdesc;
- Tuplestorestate *tupstore;
- MemoryContext per_query_ctx;
- MemoryContext oldcontext;
- int i;
+#define PG_STAT_GET_WAL_SENDERS_COLS 8
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
+ Tuplestorestate *tupstore;
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+ int *sync_priority;
+ int priority = 0;
+ int sync_standby = -1;
+ int i;
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
MemoryContextSwitchTo(oldcontext);
+ /*
+ * Get the priorities of sync standbys all in one go, to minimise lock
+ * acquisitions and to allow us to evaluate who is the current sync
+ * standby. This code must match the code in SyncRepReleaseWaiters().
+ */
+ sync_priority = palloc(sizeof(int) * max_wal_senders);
+ LWLockAcquire(SyncRepLock, LW_SHARED);
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+ if (walsnd->pid != 0)
+ {
+ /*
+ * Treat a standby such as a pg_basebackup background process
+ * which always returns an invalid flush location, as an
+ * asynchronous standby.
+ */
+ sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
+ 0 : walsnd->sync_standby_priority;
+
+ if (walsnd->state == WALSNDSTATE_STREAMING &&
+ walsnd->sync_standby_priority > 0 &&
+ (priority == 0 ||
+ priority > walsnd->sync_standby_priority) &&
+ !XLogRecPtrIsInvalid(walsnd->flush))
+ {
+ priority = walsnd->sync_standby_priority;
+ sync_standby = i;
+ }
+ }
+ }
+ LWLockRelease(SyncRepLock);
+
for (i = 0; i < max_wal_senders; i++)
{
/* use volatile pointer to prevent code rearrangement */
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
- WalSndState state;
+ WalSndState state;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
if (!superuser())
{
/*
- * Only superusers can see details. Other users only get
- * the pid value to know it's a walsender, but no details.
+ * Only superusers can see details. Other users only get the pid
+ * value to know it's a walsender, but no details.
*/
- nulls[1] = true;
- nulls[2] = true;
- nulls[3] = true;
- nulls[4] = true;
- nulls[5] = true;
+ MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
}
else
{
values[1] = CStringGetTextDatum(WalSndGetStateString(state));
snprintf(location, sizeof(location), "%X/%X",
- sentPtr.xlogid, sentPtr.xrecoff);
+ (uint32) (sentPtr >> 32), (uint32) sentPtr);
values[2] = CStringGetTextDatum(location);
- if (write.xlogid == 0 && write.xrecoff == 0)
+ if (write == 0)
nulls[3] = true;
snprintf(location, sizeof(location), "%X/%X",
- write.xlogid, write.xrecoff);
+ (uint32) (write >> 32), (uint32) write);
values[3] = CStringGetTextDatum(location);
- if (flush.xlogid == 0 && flush.xrecoff == 0)
+ if (flush == 0)
nulls[4] = true;
snprintf(location, sizeof(location), "%X/%X",
- flush.xlogid, flush.xrecoff);
+ (uint32) (flush >> 32), (uint32) flush);
values[4] = CStringGetTextDatum(location);
- if (apply.xlogid == 0 && apply.xrecoff == 0)
+ if (apply == 0)
nulls[5] = true;
snprintf(location, sizeof(location), "%X/%X",
- apply.xlogid, apply.xrecoff);
+ (uint32) (apply >> 32), (uint32) apply);
values[5] = CStringGetTextDatum(location);
+
+ values[6] = Int32GetDatum(sync_priority[i]);
+
+ /*
+ * More easily understood version of standby state. This is purely
+ * informational, not different from priority.
+ */
+ if (sync_priority[i] == 0)
+ values[7] = CStringGetTextDatum("async");
+ else if (i == sync_standby)
+ values[7] = CStringGetTextDatum("sync");
+ else
+ values[7] = CStringGetTextDatum("potential");
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
+ pfree(sync_priority);
/* clean up and return the tuplestore */
tuplestore_donestoring(tupstore);
return (Datum) 0;
}
+static void
+WalSndKeepalive(char *msgbuf)
+{
+ PrimaryKeepaliveMessage keepalive_message;
+
+ /* Construct a new message */
+ keepalive_message.walEnd = sentPtr;
+ keepalive_message.sendTime = GetCurrentTimestamp();
+
+ elog(DEBUG2, "sending replication keepalive");
+
+ /* Prepend with the message type and send it. */
+ msgbuf[0] = 'k';
+ memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
+ pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1);
+}
+
/*
* This isn't currently used for anything. Monitoring tools might be
* interested in the future, and we'll need something like this in the