* shutdown checkpoint record, and then exit.
*
*
- * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/walsender.c
#include <signal.h>
#include <unistd.h>
-#include "funcapi.h"
-#include "access/xlog_internal.h"
#include "access/transam.h"
+#include "access/xlog_internal.h"
#include "catalog/pg_type.h"
+#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
+#include "nodes/replnodes.h"
#include "replication/basebackup.h"
-#include "replication/replnodes.h"
+#include "replication/syncrep.h"
#include "replication/walprotocol.h"
+#include "replication/walreceiver.h"
#include "replication/walsender.h"
+#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
+#include "utils/timeout.h"
+#include "utils/timestamp.h"
/* Array of WalSnds in shared memory */
WalSndCtlData *WalSndCtl = NULL;
/* My slot in the shared memory array */
-WalSnd *MyWalSnd = NULL;
+WalSnd *MyWalSnd = NULL;
/* Global state */
bool am_walsender = false; /* Am I a walsender process ? */
+bool am_cascading_walsender = false; /* Am I cascading WAL to
+ * another standby ? */
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
-int WalSndDelay = 1000; /* max sleep time between some actions */
-int replication_timeout = 60 * 1000; /* maximum time to send one WAL data message */
+int replication_timeout = 60 * 1000; /* maximum time to send one
+ * WAL data message */
+/*
+ * State for WalSndWakeupRequest
+ */
+bool wake_wal_senders = false;
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
* but for walsender to read the XLOG.
*/
static int sendFile = -1;
-static uint32 sendId = 0;
-static uint32 sendSeg = 0;
+static XLogSegNo sendSegNo = 0;
static uint32 sendOff = 0;
/*
* How far have we sent WAL already? This is also advertised in
* MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
*/
-static XLogRecPtr sentPtr = {0, 0};
+static XLogRecPtr sentPtr = 0;
/*
* Buffer for processing reply messages.
/* Prototypes for private functions */
static bool HandleReplicationCommand(const char *cmd_string);
-static int WalSndLoop(void);
+static void WalSndLoop(void) __attribute__((noreturn));
static void InitWalSnd(void);
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static void XLogSend(char *msgbuf, bool *caughtup);
static void IdentifySystem(void);
-static void StartReplication(StartReplicationCmd * cmd);
+static void StartReplication(StartReplicationCmd *cmd);
static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void);
+static void WalSndKeepalive(char *msgbuf);
/* Main entry point for walsender process */
-int
+void
WalSenderMain(void)
{
MemoryContext walsnd_context;
- if (RecoveryInProgress())
- ereport(FATAL,
- (errcode(ERRCODE_CANNOT_CONNECT_NOW),
- errmsg("recovery is still in progress, can't accept WAL streaming connections")));
+ am_cascading_walsender = RecoveryInProgress();
/* Create a per-walsender data structure in shared memory */
InitWalSnd();
/* Unblock signals (they were blocked when the postmaster forked us) */
PG_SETMASK(&UnBlockSig);
+ /*
+ * Use the recovery target timeline ID during recovery
+ */
+ if (am_cascading_walsender)
+ ThisTimeLineID = GetRecoveryTargetTLI();
+
/* Tell the standby that walsender is ready for receiving commands */
ReadyForQuery(DestRemote);
SyncRepInitConfig();
/* Main loop of walsender */
- return WalSndLoop();
+ WalSndLoop();
}
/*
* Emergency bailout if postmaster has died. This is to avoid the
* necessity for manual cleanup of all postmaster children.
*/
- if (!PostmasterIsAlive(true))
+ if (!PostmasterIsAlive())
exit(1);
/*
XLogRecPtr logptr;
/*
- * Reply with a result set with one row, three columns. First col is system
- * ID, second is timeline ID, and third is current xlog location.
+ * Reply with a result set with one row, three columns. First col is
+ * system ID, second is timeline ID, and third is current xlog location.
*/
snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
GetSystemIdentifier());
snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
- logptr = GetInsertRecPtr();
+ logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
- snprintf(xpos, sizeof(xpos), "%X/%X",
- logptr.xlogid, logptr.xrecoff);
+ snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
/* Send a RowDescription message */
pq_beginmessage(&buf, 'T');
* START_REPLICATION
*/
static void
-StartReplication(StartReplicationCmd * cmd)
+StartReplication(StartReplicationCmd *cmd)
{
StringInfoData buf;
/*
- * Let postmaster know that we're streaming. Once we've declared us as
- * a WAL sender process, postmaster will let us outlive the bgwriter and
- * kill us last in the shutdown sequence, so we get a chance to stream
- * all remaining WAL at shutdown, including the shutdown checkpoint.
- * Note that there's no going back, and we mustn't write any WAL records
- * after this.
+ * Let postmaster know that we're streaming. Once we've declared us as a
+ * WAL sender process, postmaster will let us outlive the bgwriter and
+ * kill us last in the shutdown sequence, so we get a chance to stream all
+ * remaining WAL at shutdown, including the shutdown checkpoint. Note that
+ * there's no going back, and we mustn't write any WAL records after this.
*/
MarkPostmasterChildWalSender();
SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
/*
- * Check that we're logging enough information in the WAL for
- * log-shipping.
+ * When promoting a cascading standby, postmaster sends SIGUSR2 to any
+ * cascading walsenders to kill them. But there is a corner-case where
+ * such walsender fails to receive SIGUSR2 and survives a standby
+ * promotion unexpectedly. This happens when postmaster sends SIGUSR2
+ * before the walsender marks itself as a WAL sender, because postmaster
+ * sends SIGUSR2 to only the processes marked as a WAL sender.
*
- * NOTE: This only checks the current value of wal_level. Even if the
- * current setting is not 'minimal', there can be old WAL in the pg_xlog
- * directory that was created with 'minimal'. So this is not bulletproof,
- * the purpose is just to give a user-friendly error message that hints
- * how to configure the system correctly.
+ * To avoid this corner-case, if recovery is NOT in progress even though
+ * the walsender is cascading one, we do the same thing as SIGUSR2 signal
+ * handler does, i.e., set walsender_ready_to_stop to true. Which causes
+ * the walsender to end later.
+ *
+ * When terminating cascading walsenders, usually postmaster writes the
+ * log message announcing the terminations. But there is a race condition
+ * here. If there is no walsender except this process before reaching
+ * here, postmaster thinks that there is no walsender and suppresses that
+ * log message. To handle this case, we always emit that log message here.
+ * This might cause duplicate log messages, but which is less likely to
+ * happen, so it's not worth writing some code to suppress them.
+ */
+ if (am_cascading_walsender && !RecoveryInProgress())
+ {
+ ereport(LOG,
+ (errmsg("terminating walsender process to force cascaded standby "
+ "to update timeline and reconnect")));
+ walsender_ready_to_stop = true;
+ }
+
+ /*
+ * We assume here that we're logging enough information in the WAL for
+ * log-shipping, since this is checked in PostmasterMain().
+ *
+ * NOTE: wal_level can only change at shutdown, so in most cases it is
+ * difficult for there to be WAL data that we can still see that was
+ * written at wal_level='minimal'.
*/
- if (wal_level == WAL_LEVEL_MINIMAL)
- ereport(FATAL,
- (errcode(ERRCODE_CANNOT_CONNECT_NOW),
- errmsg("standby connections not allowed because wal_level=minimal")));
/*
* When we first start replication the standby will be behind the primary.
* For some applications, for example, synchronous replication, it is
* important to have a clear state for this initial catchup mode, so we
* can trigger actions when we change streaming state later. We may stay
- * in this state for a long time, which is exactly why we want to be
- * able to monitor whether or not we are still here.
+ * in this state for a long time, which is exactly why we want to be able
+ * to monitor whether or not we are still here.
*/
WalSndSetState(WALSNDSTATE_CATCHUP);
{
unsigned char firstchar;
int r;
- int received = false;
+ bool received = false;
for (;;)
{
default:
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("invalid standby message type %d",
+ errmsg("invalid standby message type \"%c\"",
firstchar)));
}
}
+
/*
- * Save the last reply timestamp if we've received at least
- * one reply.
+ * Save the last reply timestamp if we've received at least one reply.
*/
if (received)
last_reply_timestamp = GetCurrentTimestamp();
static void
ProcessStandbyMessage(void)
{
- char msgtype;
+ char msgtype;
resetStringInfo(&reply_message);
default:
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("unexpected message type %c", msgtype)));
+ errmsg("unexpected message type \"%c\"", msgtype)));
proc_exit(0);
}
}
static void
ProcessStandbyReplyMessage(void)
{
- StandbyReplyMessage reply;
+ StandbyReplyMessage reply;
pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
- reply.write.xlogid, reply.write.xrecoff,
- reply.flush.xlogid, reply.flush.xrecoff,
- reply.apply.xlogid, reply.apply.xrecoff);
+ (uint32) (reply.write >> 32), (uint32) reply.write,
+ (uint32) (reply.flush >> 32), (uint32) reply.flush,
+ (uint32) (reply.apply >> 32), (uint32) reply.apply);
/*
- * Update shared state for this WalSender process
- * based on reply data from standby.
+ * Update shared state for this WalSender process based on reply data from
+ * standby.
*/
{
/* use volatile pointer to prevent code rearrangement */
SpinLockRelease(&walsnd->mutex);
}
- SyncRepReleaseWaiters();
+ if (!am_cascading_walsender)
+ SyncRepReleaseWaiters();
}
/*
static void
ProcessStandbyHSFeedbackMessage(void)
{
- StandbyHSFeedbackMessage reply;
- TransactionId newxmin = InvalidTransactionId;
+ StandbyHSFeedbackMessage reply;
+ TransactionId nextXid;
+ uint32 nextEpoch;
- pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage));
+ /* Decipher the reply message */
+ pq_copymsgbytes(&reply_message, (char *) &reply,
+ sizeof(StandbyHSFeedbackMessage));
elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
reply.xmin,
reply.epoch);
+ /* Ignore invalid xmin (can't actually happen with current walreceiver) */
+ if (!TransactionIdIsNormal(reply.xmin))
+ return;
+
/*
- * Update the WalSender's proc xmin to allow it to be visible
- * to snapshots. This will hold back the removal of dead rows
- * and thereby prevent the generation of cleanup conflicts
- * on the standby server.
+ * Check that the provided xmin/epoch are sane, that is, not in the future
+ * and not so far back as to be already wrapped around. Ignore if not.
+ *
+ * Epoch of nextXid should be same as standby, or if the counter has
+ * wrapped, then one greater than standby.
*/
- if (TransactionIdIsValid(reply.xmin))
- {
- TransactionId nextXid;
- uint32 nextEpoch;
- bool epochOK = false;
+ GetNextXidAndEpoch(&nextXid, &nextEpoch);
- GetNextXidAndEpoch(&nextXid, &nextEpoch);
-
- /*
- * Epoch of oldestXmin should be same as standby or
- * if the counter has wrapped, then one less than reply.
- */
- if (reply.xmin <= nextXid)
- {
- if (reply.epoch == nextEpoch)
- epochOK = true;
- }
- else
- {
- if (nextEpoch > 0 && reply.epoch == nextEpoch - 1)
- epochOK = true;
- }
-
- /*
- * Feedback from standby must not go backwards, nor should it go
- * forwards further than our most recent xid.
- */
- if (epochOK && TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
- {
- if (!TransactionIdIsValid(MyProc->xmin))
- {
- TransactionId oldestXmin = GetOldestXmin(true, true);
- if (TransactionIdPrecedes(oldestXmin, reply.xmin))
- newxmin = reply.xmin;
- else
- newxmin = oldestXmin;
- }
- else
- {
- if (TransactionIdPrecedes(MyProc->xmin, reply.xmin))
- newxmin = reply.xmin;
- else
- newxmin = MyProc->xmin; /* stay the same */
- }
- }
+ if (reply.xmin <= nextXid)
+ {
+ if (reply.epoch != nextEpoch)
+ return;
+ }
+ else
+ {
+ if (reply.epoch + 1 != nextEpoch)
+ return;
}
+ if (!TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
+ return; /* epoch OK, but it's wrapped around */
+
/*
- * Grab the ProcArrayLock to set xmin, or invalidate for bad reply
+ * Set the WalSender's xmin equal to the standby's requested xmin, so that
+ * the xmin will be taken into account by GetOldestXmin. This will hold
+ * back the removal of dead rows and thereby prevent the generation of
+ * cleanup conflicts on the standby server.
+ *
+ * There is a small window for a race condition here: although we just
+ * checked that reply.xmin precedes nextXid, the nextXid could have gotten
+ * advanced between our fetching it and applying the xmin below, perhaps
+ * far enough to make reply.xmin wrap around. In that case the xmin we
+ * set here would be "in the future" and have no effect. No point in
+ * worrying about this since it's too late to save the desired data
+ * anyway. Assuming that the standby sends us an increasing sequence of
+ * xmins, this could only happen during the first reply cycle, else our
+ * own xmin would prevent nextXid from advancing so far.
+ *
+ * We don't bother taking the ProcArrayLock here. Setting the xmin field
+ * is assumed atomic, and there's no real need to prevent a concurrent
+ * GetOldestXmin. (If we're moving our xmin forward, this is obviously
+ * safe, and if we're moving it backwards, well, the data is at risk
+ * already since a VACUUM could have just finished calling GetOldestXmin.)
*/
- if (MyProc->xmin != newxmin)
- {
- LWLockAcquire(ProcArrayLock, LW_SHARED);
- MyProc->xmin = newxmin;
- LWLockRelease(ProcArrayLock);
- }
+ MyPgXact->xmin = reply.xmin;
}
/* Main loop of walsender process */
-static int
+static void
WalSndLoop(void)
{
char *output_message;
/* Loop forever, unless we get an error */
for (;;)
{
+ /* Clear any already-pending wakeups */
+ ResetLatch(&MyWalSnd->latch);
+
/*
* Emergency bailout if postmaster has died. This is to avoid the
* necessity for manual cleanup of all postmaster children.
*/
- if (!PostmasterIsAlive(true))
+ if (!PostmasterIsAlive())
exit(1);
/* Process any requests or signals received recently */
/* Normal exit from the walsender is here */
if (walsender_shutdown_requested)
{
- /* Inform the standby that XLOG streaming was done */
+ /* Inform the standby that XLOG streaming is done */
pq_puttextmessage('C', "COPY 0");
pq_flush();
proc_exit(0);
}
+ /* Check for input from the client */
+ ProcessRepliesIfAny();
+
/*
- * If we don't have any pending data in the output buffer, try to
- * send some more.
+ * If we don't have any pending data in the output buffer, try to send
+ * some more. If there is some, we don't bother to call XLogSend
+ * again until we've flushed it ... but we'd better assume we are not
+ * caught up.
*/
if (!pq_is_send_pending())
- {
XLogSend(output_message, &caughtup);
+ else
+ caughtup = false;
+ /* Try to flush pending output to the client */
+ if (pq_flush_if_writable() != 0)
+ break;
+
+ /* If nothing remains to be sent right now ... */
+ if (caughtup && !pq_is_send_pending())
+ {
/*
- * Even if we wrote all the WAL that was available when we started
- * sending, more might have arrived while we were sending this
- * batch. We had the latch set while sending, so we have not
- * received any signals from that time. Let's arm the latch
- * again, and after that check that we're still up-to-date.
+ * If we're in catchup state, move to streaming. This is an
+ * important state change for users to know about, since before
+ * this point data loss might occur if the primary dies and we
+ * need to failover to the standby. The state change is also
+ * important for synchronous replication, since commits that
+ * started to wait at that point might wait for some time.
*/
- if (caughtup && !pq_is_send_pending())
+ if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
{
- ResetLatch(&MyWalSnd->latch);
+ ereport(DEBUG1,
+ (errmsg("standby \"%s\" has now caught up with primary",
+ application_name)));
+ WalSndSetState(WALSNDSTATE_STREAMING);
+ }
+ /*
+ * When SIGUSR2 arrives, we send any outstanding logs up to the
+ * shutdown checkpoint record (i.e., the latest record) and exit.
+ * This may be a normal termination at shutdown, or a promotion,
+ * the walsender is not sure which.
+ */
+ if (walsender_ready_to_stop)
+ {
+ /* ... let's just be real sure we're caught up ... */
XLogSend(output_message, &caughtup);
+ if (caughtup && !pq_is_send_pending())
+ {
+ walsender_shutdown_requested = true;
+ continue; /* don't want to wait more */
+ }
}
}
- /* Flush pending output to the client */
- if (pq_flush_if_writable() != 0)
- break;
-
/*
- * When SIGUSR2 arrives, we send any outstanding logs up to the
- * shutdown checkpoint record (i.e., the latest record) and exit.
+ * We don't block if not caught up, unless there is unsent data
+ * pending in which case we'd better block until the socket is
+ * write-ready. This test is only needed for the case where XLogSend
+ * loaded a subset of the available data but then pq_flush_if_writable
+ * flushed it all --- we should immediately try to send more.
*/
- if (walsender_ready_to_stop && !pq_is_send_pending())
+ if (caughtup || pq_is_send_pending())
{
- XLogSend(output_message, &caughtup);
- ProcessRepliesIfAny();
- if (caughtup && !pq_is_send_pending())
- walsender_shutdown_requested = true;
- }
+ TimestampTz timeout = 0;
+ long sleeptime = 10000; /* 10 s */
+ int wakeEvents;
- if ((caughtup || pq_is_send_pending()) &&
- !got_SIGHUP &&
- !walsender_shutdown_requested)
- {
- TimestampTz finish_time = 0;
- long sleeptime;
+ wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
+ WL_SOCKET_READABLE | WL_TIMEOUT;
- /* Reschedule replication timeout */
- if (replication_timeout > 0)
+ if (pq_is_send_pending())
+ wakeEvents |= WL_SOCKET_WRITEABLE;
+ else
{
- long secs;
- int usecs;
-
- finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
- replication_timeout);
- TimestampDifference(GetCurrentTimestamp(),
- finish_time, &secs, &usecs);
- sleeptime = secs * 1000 + usecs / 1000;
- if (WalSndDelay < sleeptime)
- sleeptime = WalSndDelay;
+ WalSndKeepalive(output_message);
+ /* Try to flush pending output to the client */
+ if (pq_flush_if_writable() != 0)
+ break;
}
- else
+
+ /* Determine time until replication timeout */
+ if (replication_timeout > 0)
{
- /*
- * XXX: Without timeout, we don't really need the periodic
- * wakeups anymore, WaitLatchOrSocket should reliably wake up
- * as soon as something interesting happens.
- */
- sleeptime = WalSndDelay;
+ timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
+ replication_timeout);
+ sleeptime = 1 + (replication_timeout / 10);
}
- /* Sleep */
- WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
- true, pq_is_send_pending(),
- sleeptime * 1000L);
+ /* Sleep until something happens or replication timeout */
+ WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+ MyProcPort->sock, sleeptime);
- /* Check for replication timeout */
+ /*
+ * Check for replication timeout. Note we ignore the corner case
+ * possibility that the client replied just as we reached the
+ * timeout ... he's supposed to reply *before* that.
+ */
if (replication_timeout > 0 &&
- GetCurrentTimestamp() >= finish_time)
+ GetCurrentTimestamp() >= timeout)
{
/*
* Since typically expiration of replication timeout means
- * communication problem, we don't send the error message
- * to the standby.
+ * communication problem, we don't send the error message to
+ * the standby.
*/
ereport(COMMERROR,
(errmsg("terminating walsender process due to replication timeout")));
break;
}
}
-
- /*
- * If we're in catchup state, see if its time to move to streaming.
- * This is an important state change for users, since before this
- * point data loss might occur if the primary dies and we need to
- * failover to the standby. The state change is also important for
- * synchronous replication, since commits that started to wait at
- * that point might wait for some time.
- */
- if (MyWalSnd->state == WALSNDSTATE_CATCHUP && caughtup)
- {
- ereport(DEBUG1,
- (errmsg("standby \"%s\" has now caught up with primary",
- application_name)));
- WalSndSetState(WALSNDSTATE_STREAMING);
- }
-
- ProcessRepliesIfAny();
}
/*
whereToSendOutput = DestNone;
proc_exit(0);
- return 1; /* keep the compiler quiet */
+ abort(); /* keep the compiler quiet */
}
/* Initialize a per-walsender data structure for this walsender process */
}
/*
- * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
+ * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
*
* XXX probably this should be improved to suck data directly from the
* WAL buffers when possible.
* more than one.
*/
void
-XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
+XLogRead(char *buf, XLogRecPtr startptr, Size count)
{
- XLogRecPtr startRecPtr = recptr;
- char path[MAXPGPATH];
- uint32 lastRemovedLog;
- uint32 lastRemovedSeg;
- uint32 log;
- uint32 seg;
+ char *p;
+ XLogRecPtr recptr;
+ Size nbytes;
+ XLogSegNo lastRemovedSegNo;
+ XLogSegNo segno;
+
+retry:
+ p = buf;
+ recptr = startptr;
+ nbytes = count;
while (nbytes > 0)
{
int segbytes;
int readbytes;
- startoff = recptr.xrecoff % XLogSegSize;
+ startoff = recptr % XLogSegSize;
- if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
+ if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
{
+ char path[MAXPGPATH];
+
/* Switch to another logfile segment */
if (sendFile >= 0)
close(sendFile);
- XLByteToSeg(recptr, sendId, sendSeg);
- XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
+ XLByteToSeg(recptr, sendSegNo);
+ XLogFilePath(path, ThisTimeLineID, sendSegNo);
sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (sendFile < 0)
* removed or recycled.
*/
if (errno == ENOENT)
- {
- char filename[MAXFNAMELEN];
-
- XLogFileName(filename, ThisTimeLineID, sendId, sendSeg);
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
- filename)));
- }
+ XLogFileNameP(ThisTimeLineID, sendSegNo))));
else
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
- path, sendId, sendSeg)));
+ errmsg("could not open file \"%s\": %m",
+ path)));
}
sendOff = 0;
}
if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not seek in log file %u, segment %u to offset %u: %m",
- sendId, sendSeg, startoff)));
+ errmsg("could not seek in log segment %s to offset %u: %m",
+ XLogFileNameP(ThisTimeLineID, sendSegNo),
+ startoff)));
sendOff = startoff;
}
else
segbytes = nbytes;
- readbytes = read(sendFile, buf, segbytes);
+ readbytes = read(sendFile, p, segbytes);
if (readbytes <= 0)
+ {
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not read from log file %u, segment %u, offset %u, "
- "length %lu: %m",
- sendId, sendSeg, sendOff, (unsigned long) segbytes)));
+ errmsg("could not read from log segment %s, offset %u, length %lu: %m",
+ XLogFileNameP(ThisTimeLineID, sendSegNo),
+ sendOff, (unsigned long) segbytes)));
+ }
/* Update state for read */
XLByteAdvance(recptr, readbytes);
sendOff += readbytes;
nbytes -= readbytes;
- buf += readbytes;
+ p += readbytes;
}
/*
* read() succeeds in that case, but the data we tried to read might
* already have been overwritten with new WAL records.
*/
- XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
- XLByteToSeg(startRecPtr, log, seg);
- if (log < lastRemovedLog ||
- (log == lastRemovedLog && seg <= lastRemovedSeg))
- {
- char filename[MAXFNAMELEN];
-
- XLogFileName(filename, ThisTimeLineID, log, seg);
+ XLogGetLastRemoved(&lastRemovedSegNo);
+ XLByteToSeg(startptr, segno);
+ if (segno <= lastRemovedSegNo)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
- filename)));
+ XLogFileNameP(ThisTimeLineID, segno))));
+
+ /*
+ * During recovery, the currently-open WAL file might be replaced with the
+ * file of the same name retrieved from archive. So we always need to
+ * check what we read was valid after reading into the buffer. If it's
+ * invalid, we try to open and read the file again.
+ */
+ if (am_cascading_walsender)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+ bool reload;
+
+ SpinLockAcquire(&walsnd->mutex);
+ reload = walsnd->needreload;
+ walsnd->needreload = false;
+ SpinLockRelease(&walsnd->mutex);
+
+ if (reload && sendFile >= 0)
+ {
+ close(sendFile);
+ sendFile = -1;
+
+ goto retry;
+ }
}
}
* subsequently crashes and restarts, slaves must not have applied any WAL
* that gets lost on the master.
*/
- SendRqstPtr = GetFlushRecPtr();
+ SendRqstPtr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr();
/* Quick exit if nothing to do */
if (XLByteLE(SendRqstPtr, sentPtr))
* SendRqstPtr never points to the middle of a WAL record.
*/
startptr = sentPtr;
- if (startptr.xrecoff >= XLogFileSize)
- {
- /*
- * crossing a logid boundary, skip the non-existent last log segment
- * in previous logical log file.
- */
- startptr.xlogid += 1;
- startptr.xrecoff = 0;
- }
-
endptr = startptr;
XLByteAdvance(endptr, MAX_SEND_SIZE);
- if (endptr.xlogid != startptr.xlogid)
- {
- /* Don't cross a logfile boundary within one message */
- Assert(endptr.xlogid == startptr.xlogid + 1);
- endptr.xlogid = startptr.xlogid;
- endptr.xrecoff = XLogFileSize;
- }
/* if we went beyond SendRqstPtr, back off */
if (XLByteLE(SendRqstPtr, endptr))
else
{
/* round down to page boundary. */
- endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
+ endptr -= (endptr % XLOG_BLCKSZ);
*caughtup = false;
}
- nbytes = endptr.xrecoff - startptr.xrecoff;
+ nbytes = endptr - startptr;
Assert(nbytes <= MAX_SEND_SIZE);
/*
char activitymsg[50];
snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
- sentPtr.xlogid, sentPtr.xrecoff);
+ (uint32) (sentPtr >> 32), (uint32) sentPtr);
set_ps_display(activitymsg, false);
}
return;
}
+/*
+ * Request walsenders to reload the currently-open WAL file
+ */
+void
+WalSndRqstFileReload(void)
+{
+ int i;
+
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+ if (walsnd->pid == 0)
+ continue;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->needreload = true;
+ SpinLockRelease(&walsnd->mutex);
+ }
+}
+
/* SIGHUP: set flag to re-read config file at next convenient time */
static void
WalSndSigHupHandler(SIGNAL_ARGS)
{
+ int save_errno = errno;
+
got_SIGHUP = true;
if (MyWalSnd)
SetLatch(&MyWalSnd->latch);
+
+ errno = save_errno;
}
/* SIGTERM: set flag to shut down */
static void
WalSndShutdownHandler(SIGNAL_ARGS)
{
+ int save_errno = errno;
+
walsender_shutdown_requested = true;
if (MyWalSnd)
SetLatch(&MyWalSnd->latch);
+
+ /*
+ * Set the standard (non-walsender) state as well, so that we can abort
+ * things like do_pg_stop_backup().
+ */
+ InterruptPending = true;
+ ProcDiePending = true;
+
+ errno = save_errno;
}
/*
static void
WalSndXLogSendHandler(SIGNAL_ARGS)
{
+ int save_errno = errno;
+
latch_sigusr1_handler();
+
+ errno = save_errno;
}
/* SIGUSR2: set flag to do a last cycle and shut down afterwards */
static void
WalSndLastCycleHandler(SIGNAL_ARGS)
{
+ int save_errno = errno;
+
walsender_ready_to_stop = true;
if (MyWalSnd)
SetLatch(&MyWalSnd->latch);
+
+ errno = save_errno;
}
/* Set up signal handlers */
pqsignal(SIGINT, SIG_IGN); /* not used */
pqsignal(SIGTERM, WalSndShutdownHandler); /* request shutdown */
pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */
- pqsignal(SIGALRM, SIG_IGN);
+ InitializeTimeouts(); /* establishes SIGALRM handler */
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
/* First time through, so initialize */
MemSet(WalSndCtl, 0, WalSndShmemSize());
- SHMQueueInit(&(WalSndCtl->SyncRepQueue));
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+ SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
for (i = 0; i < max_wal_senders; i++)
{
}
}
-/* Wake up all walsenders */
+/*
+ * Wake up all walsenders
+ *
+ * This will be called inside critical sections, so throwing an error is not
+ * adviseable.
+ */
void
WalSndWakeup(void)
{
- int i;
+ int i;
for (i = 0; i < max_wal_senders; i++)
SetLatch(&WalSndCtl->walsnds[i].latch);
switch (state)
{
case WALSNDSTATE_STARTUP:
- return "STARTUP";
+ return "startup";
case WALSNDSTATE_BACKUP:
- return "BACKUP";
+ return "backup";
case WALSNDSTATE_CATCHUP:
- return "CATCHUP";
+ return "catchup";
case WALSNDSTATE_STREAMING:
- return "STREAMING";
+ return "streaming";
}
return "UNKNOWN";
}
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_WAL_SENDERS_COLS 8
- ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
- TupleDesc tupdesc;
- Tuplestorestate *tupstore;
- MemoryContext per_query_ctx;
- MemoryContext oldcontext;
- int *sync_priority;
- int priority = 0;
- int sync_standby = -1;
- int i;
+#define PG_STAT_GET_WAL_SENDERS_COLS 8
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
+ Tuplestorestate *tupstore;
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+ int *sync_priority;
+ int priority = 0;
+ int sync_standby = -1;
+ int i;
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
MemoryContextSwitchTo(oldcontext);
/*
- * Get the priorities of sync standbys all in one go, to minimise
- * lock acquisitions and to allow us to evaluate who is the current
- * sync standby. This code must match the code in SyncRepReleaseWaiters().
+ * Get the priorities of sync standbys all in one go, to minimise lock
+ * acquisitions and to allow us to evaluate who is the current sync
+ * standby. This code must match the code in SyncRepReleaseWaiters().
*/
sync_priority = palloc(sizeof(int) * max_wal_senders);
LWLockAcquire(SyncRepLock, LW_SHARED);
if (walsnd->pid != 0)
{
- sync_priority[i] = walsnd->sync_standby_priority;
+ /*
+ * Treat a standby such as a pg_basebackup background process
+ * which always returns an invalid flush location, as an
+ * asynchronous standby.
+ */
+ sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
+ 0 : walsnd->sync_standby_priority;
if (walsnd->state == WALSNDSTATE_STREAMING &&
walsnd->sync_standby_priority > 0 &&
(priority == 0 ||
- priority > walsnd->sync_standby_priority))
+ priority > walsnd->sync_standby_priority) &&
+ !XLogRecPtrIsInvalid(walsnd->flush))
{
priority = walsnd->sync_standby_priority;
sync_standby = i;
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
- WalSndState state;
+ WalSndState state;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
if (!superuser())
{
/*
- * Only superusers can see details. Other users only get
- * the pid value to know it's a walsender, but no details.
+ * Only superusers can see details. Other users only get the pid
+ * value to know it's a walsender, but no details.
*/
MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
}
values[1] = CStringGetTextDatum(WalSndGetStateString(state));
snprintf(location, sizeof(location), "%X/%X",
- sentPtr.xlogid, sentPtr.xrecoff);
+ (uint32) (sentPtr >> 32), (uint32) sentPtr);
values[2] = CStringGetTextDatum(location);
- if (write.xlogid == 0 && write.xrecoff == 0)
+ if (write == 0)
nulls[3] = true;
snprintf(location, sizeof(location), "%X/%X",
- write.xlogid, write.xrecoff);
+ (uint32) (write >> 32), (uint32) write);
values[3] = CStringGetTextDatum(location);
- if (flush.xlogid == 0 && flush.xrecoff == 0)
+ if (flush == 0)
nulls[4] = true;
snprintf(location, sizeof(location), "%X/%X",
- flush.xlogid, flush.xrecoff);
+ (uint32) (flush >> 32), (uint32) flush);
values[4] = CStringGetTextDatum(location);
- if (apply.xlogid == 0 && apply.xrecoff == 0)
+ if (apply == 0)
nulls[5] = true;
snprintf(location, sizeof(location), "%X/%X",
- apply.xlogid, apply.xrecoff);
+ (uint32) (apply >> 32), (uint32) apply);
values[5] = CStringGetTextDatum(location);
values[6] = Int32GetDatum(sync_priority[i]);
/*
- * More easily understood version of standby state.
- * This is purely informational, not different from priority.
+ * More easily understood version of standby state. This is purely
+ * informational, not different from priority.
*/
if (sync_priority[i] == 0)
- values[7] = CStringGetTextDatum("ASYNC");
+ values[7] = CStringGetTextDatum("async");
else if (i == sync_standby)
- values[7] = CStringGetTextDatum("SYNC");
+ values[7] = CStringGetTextDatum("sync");
else
- values[7] = CStringGetTextDatum("POTENTIAL");
+ values[7] = CStringGetTextDatum("potential");
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
return (Datum) 0;
}
+static void
+WalSndKeepalive(char *msgbuf)
+{
+ PrimaryKeepaliveMessage keepalive_message;
+
+ /* Construct a new message */
+ keepalive_message.walEnd = sentPtr;
+ keepalive_message.sendTime = GetCurrentTimestamp();
+
+ elog(DEBUG2, "sending replication keepalive");
+
+ /* Prepend with the message type and send it. */
+ msgbuf[0] = 'k';
+ memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
+ pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1);
+}
+
/*
* This isn't currently used for anything. Monitoring tools might be
* interested in the future, and we'll need something like this in the