* shutdown checkpoint record, and then exit.
*
*
- * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/walsender.c
#include "replication/walprotocol.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
+#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
+#include "utils/timeout.h"
#include "utils/timestamp.h"
/* Global state */
bool am_walsender = false; /* Am I a walsender process ? */
-bool am_cascading_walsender = false; /* Am I cascading WAL to another standby ? */
+bool am_cascading_walsender = false; /* Am I cascading WAL to
+ * another standby ? */
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int replication_timeout = 60 * 1000; /* maximum time to send one
* WAL data message */
+/*
+ * State for WalSndWakeupRequest
+ */
+bool wake_wal_senders = false;
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
* but for walsender to read the XLOG.
*/
static int sendFile = -1;
-static uint32 sendId = 0;
-static uint32 sendSeg = 0;
+static XLogSegNo sendSegNo = 0;
static uint32 sendOff = 0;
/*
* How far have we sent WAL already? This is also advertised in
* MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
*/
-static XLogRecPtr sentPtr = {0, 0};
+static XLogRecPtr sentPtr = 0;
/*
* Buffer for processing reply messages.
/* Prototypes for private functions */
static bool HandleReplicationCommand(const char *cmd_string);
-static int WalSndLoop(void);
+static void WalSndLoop(void) __attribute__((noreturn));
static void InitWalSnd(void);
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void);
+static void WalSndKeepalive(char *msgbuf);
/* Main entry point for walsender process */
-int
+void
WalSenderMain(void)
{
MemoryContext walsnd_context;
SyncRepInitConfig();
/* Main loop of walsender */
- return WalSndLoop();
+ WalSndLoop();
}
/*
logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
- snprintf(xpos, sizeof(xpos), "%X/%X",
- logptr.xlogid, logptr.xrecoff);
+ snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
/* Send a RowDescription message */
pq_beginmessage(&buf, 'T');
SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
/*
- * When promoting a cascading standby, postmaster sends SIGUSR2 to
- * any cascading walsenders to kill them. But there is a corner-case where
- * such walsender fails to receive SIGUSR2 and survives a standby promotion
- * unexpectedly. This happens when postmaster sends SIGUSR2 before
- * the walsender marks itself as a WAL sender, because postmaster sends
- * SIGUSR2 to only the processes marked as a WAL sender.
+ * When promoting a cascading standby, postmaster sends SIGUSR2 to any
+ * cascading walsenders to kill them. But there is a corner-case where
+ * such walsender fails to receive SIGUSR2 and survives a standby
+ * promotion unexpectedly. This happens when postmaster sends SIGUSR2
+ * before the walsender marks itself as a WAL sender, because postmaster
+ * sends SIGUSR2 to only the processes marked as a WAL sender.
*
* To avoid this corner-case, if recovery is NOT in progress even though
* the walsender is cascading one, we do the same thing as SIGUSR2 signal
* handler does, i.e., set walsender_ready_to_stop to true. Which causes
* the walsender to end later.
*
- * When terminating cascading walsenders, usually postmaster writes
- * the log message announcing the terminations. But there is a race condition
- * here. If there is no walsender except this process before reaching here,
- * postmaster thinks that there is no walsender and suppresses that
+ * When terminating cascading walsenders, usually postmaster writes the
+ * log message announcing the terminations. But there is a race condition
+ * here. If there is no walsender except this process before reaching
+ * here, postmaster thinks that there is no walsender and suppresses that
* log message. To handle this case, we always emit that log message here.
- * This might cause duplicate log messages, but which is less likely to happen,
- * so it's not worth writing some code to suppress them.
+ * This might cause duplicate log messages, but which is less likely to
+ * happen, so it's not worth writing some code to suppress them.
*/
if (am_cascading_walsender && !RecoveryInProgress())
{
ereport(LOG,
- (errmsg("terminating walsender process to force cascaded standby "
- "to update timeline and reconnect")));
+ (errmsg("terminating walsender process to force cascaded standby "
+ "to update timeline and reconnect")));
walsender_ready_to_stop = true;
}
* log-shipping, since this is checked in PostmasterMain().
*
* NOTE: wal_level can only change at shutdown, so in most cases it is
- * difficult for there to be WAL data that we can still see that was written
- * at wal_level='minimal'.
+ * difficult for there to be WAL data that we can still see that was
+ * written at wal_level='minimal'.
*/
/*
pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
- reply.write.xlogid, reply.write.xrecoff,
- reply.flush.xlogid, reply.flush.xrecoff,
- reply.apply.xlogid, reply.apply.xrecoff);
+ (uint32) (reply.write >> 32), (uint32) reply.write,
+ (uint32) (reply.flush >> 32), (uint32) reply.flush,
+ (uint32) (reply.apply >> 32), (uint32) reply.apply);
/*
* Update shared state for this WalSender process based on reply data from
ProcessStandbyHSFeedbackMessage(void)
{
StandbyHSFeedbackMessage reply;
- TransactionId newxmin = InvalidTransactionId;
+ TransactionId nextXid;
+ uint32 nextEpoch;
- pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage));
+ /* Decipher the reply message */
+ pq_copymsgbytes(&reply_message, (char *) &reply,
+ sizeof(StandbyHSFeedbackMessage));
elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
reply.xmin,
reply.epoch);
+ /* Ignore invalid xmin (can't actually happen with current walreceiver) */
+ if (!TransactionIdIsNormal(reply.xmin))
+ return;
+
/*
- * Update the WalSender's proc xmin to allow it to be visible to
- * snapshots. This will hold back the removal of dead rows and thereby
- * prevent the generation of cleanup conflicts on the standby server.
+ * Check that the provided xmin/epoch are sane, that is, not in the future
+ * and not so far back as to be already wrapped around. Ignore if not.
+ *
+ * Epoch of nextXid should be same as standby, or if the counter has
+ * wrapped, then one greater than standby.
*/
- if (TransactionIdIsValid(reply.xmin))
- {
- TransactionId nextXid;
- uint32 nextEpoch;
- bool epochOK = false;
-
- GetNextXidAndEpoch(&nextXid, &nextEpoch);
+ GetNextXidAndEpoch(&nextXid, &nextEpoch);
- /*
- * Epoch of oldestXmin should be same as standby or if the counter has
- * wrapped, then one less than reply.
- */
- if (reply.xmin <= nextXid)
- {
- if (reply.epoch == nextEpoch)
- epochOK = true;
- }
- else
- {
- if (nextEpoch > 0 && reply.epoch == nextEpoch - 1)
- epochOK = true;
- }
-
- /*
- * Feedback from standby must not go backwards, nor should it go
- * forwards further than our most recent xid.
- */
- if (epochOK && TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
- {
- if (!TransactionIdIsValid(MyProc->xmin))
- {
- TransactionId oldestXmin = GetOldestXmin(true, true);
-
- if (TransactionIdPrecedes(oldestXmin, reply.xmin))
- newxmin = reply.xmin;
- else
- newxmin = oldestXmin;
- }
- else
- {
- if (TransactionIdPrecedes(MyProc->xmin, reply.xmin))
- newxmin = reply.xmin;
- else
- newxmin = MyProc->xmin; /* stay the same */
- }
- }
+ if (reply.xmin <= nextXid)
+ {
+ if (reply.epoch != nextEpoch)
+ return;
}
+ else
+ {
+ if (reply.epoch + 1 != nextEpoch)
+ return;
+ }
+
+ if (!TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
+ return; /* epoch OK, but it's wrapped around */
/*
- * Grab the ProcArrayLock to set xmin, or invalidate for bad reply
+ * Set the WalSender's xmin equal to the standby's requested xmin, so that
+ * the xmin will be taken into account by GetOldestXmin. This will hold
+ * back the removal of dead rows and thereby prevent the generation of
+ * cleanup conflicts on the standby server.
+ *
+ * There is a small window for a race condition here: although we just
+ * checked that reply.xmin precedes nextXid, the nextXid could have gotten
+ * advanced between our fetching it and applying the xmin below, perhaps
+ * far enough to make reply.xmin wrap around. In that case the xmin we
+ * set here would be "in the future" and have no effect. No point in
+ * worrying about this since it's too late to save the desired data
+ * anyway. Assuming that the standby sends us an increasing sequence of
+ * xmins, this could only happen during the first reply cycle, else our
+ * own xmin would prevent nextXid from advancing so far.
+ *
+ * We don't bother taking the ProcArrayLock here. Setting the xmin field
+ * is assumed atomic, and there's no real need to prevent a concurrent
+ * GetOldestXmin. (If we're moving our xmin forward, this is obviously
+ * safe, and if we're moving it backwards, well, the data is at risk
+ * already since a VACUUM could have just finished calling GetOldestXmin.)
*/
- if (MyProc->xmin != newxmin)
- {
- LWLockAcquire(ProcArrayLock, LW_SHARED);
- MyProc->xmin = newxmin;
- LWLockRelease(ProcArrayLock);
- }
+ MyPgXact->xmin = reply.xmin;
}
/* Main loop of walsender process */
-static int
+static void
WalSndLoop(void)
{
char *output_message;
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
{
ereport(DEBUG1,
- (errmsg("standby \"%s\" has now caught up with primary",
- application_name)));
+ (errmsg("standby \"%s\" has now caught up with primary",
+ application_name)));
WalSndSetState(WALSNDSTATE_STREAMING);
}
if (caughtup && !pq_is_send_pending())
{
walsender_shutdown_requested = true;
- continue; /* don't want to wait more */
+ continue; /* don't want to wait more */
}
}
}
*/
if (caughtup || pq_is_send_pending())
{
- TimestampTz finish_time = 0;
- long sleeptime = -1;
+ TimestampTz timeout = 0;
+ long sleeptime = 10000; /* 10 s */
int wakeEvents;
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
- WL_SOCKET_READABLE;
+ WL_SOCKET_READABLE | WL_TIMEOUT;
+
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
+ else
+ {
+ WalSndKeepalive(output_message);
+ /* Try to flush pending output to the client */
+ if (pq_flush_if_writable() != 0)
+ break;
+ }
/* Determine time until replication timeout */
if (replication_timeout > 0)
{
- long secs;
- int usecs;
-
- finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
- replication_timeout);
- TimestampDifference(GetCurrentTimestamp(),
- finish_time, &secs, &usecs);
- sleeptime = secs * 1000 + usecs / 1000;
- /* Avoid Assert in WaitLatchOrSocket if timeout is past */
- if (sleeptime < 0)
- sleeptime = 0;
- wakeEvents |= WL_TIMEOUT;
+ timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
+ replication_timeout);
+ sleeptime = 1 + (replication_timeout / 10);
}
/* Sleep until something happens or replication timeout */
* timeout ... he's supposed to reply *before* that.
*/
if (replication_timeout > 0 &&
- GetCurrentTimestamp() >= finish_time)
+ GetCurrentTimestamp() >= timeout)
{
/*
* Since typically expiration of replication timeout means
whereToSendOutput = DestNone;
proc_exit(0);
- return 1; /* keep the compiler quiet */
+ abort(); /* keep the compiler quiet */
}
/* Initialize a per-walsender data structure for this walsender process */
void
XLogRead(char *buf, XLogRecPtr startptr, Size count)
{
- char *p;
+ char *p;
XLogRecPtr recptr;
- Size nbytes;
- uint32 lastRemovedLog;
- uint32 lastRemovedSeg;
- uint32 log;
- uint32 seg;
+ Size nbytes;
+ XLogSegNo lastRemovedSegNo;
+ XLogSegNo segno;
retry:
p = buf;
int segbytes;
int readbytes;
- startoff = recptr.xrecoff % XLogSegSize;
+ startoff = recptr % XLogSegSize;
- if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
+ if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
{
char path[MAXPGPATH];
if (sendFile >= 0)
close(sendFile);
- XLByteToSeg(recptr, sendId, sendSeg);
- XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
+ XLByteToSeg(recptr, sendSegNo);
+ XLogFilePath(path, ThisTimeLineID, sendSegNo);
sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (sendFile < 0)
* removed or recycled.
*/
if (errno == ENOENT)
- {
- char filename[MAXFNAMELEN];
-
- XLogFileName(filename, ThisTimeLineID, sendId, sendSeg);
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
- filename)));
- }
+ XLogFileNameP(ThisTimeLineID, sendSegNo))));
else
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
- path, sendId, sendSeg)));
+ errmsg("could not open file \"%s\": %m",
+ path)));
}
sendOff = 0;
}
if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not seek in log file %u, segment %u to offset %u: %m",
- sendId, sendSeg, startoff)));
+ errmsg("could not seek in log segment %s to offset %u: %m",
+ XLogFileNameP(ThisTimeLineID, sendSegNo),
+ startoff)));
sendOff = startoff;
}
readbytes = read(sendFile, p, segbytes);
if (readbytes <= 0)
+ {
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not read from log file %u, segment %u, offset %u, "
- "length %lu: %m",
- sendId, sendSeg, sendOff, (unsigned long) segbytes)));
+ errmsg("could not read from log segment %s, offset %u, length %lu: %m",
+ XLogFileNameP(ThisTimeLineID, sendSegNo),
+ sendOff, (unsigned long) segbytes)));
+ }
/* Update state for read */
XLByteAdvance(recptr, readbytes);
* read() succeeds in that case, but the data we tried to read might
* already have been overwritten with new WAL records.
*/
- XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
- XLByteToSeg(startptr, log, seg);
- if (log < lastRemovedLog ||
- (log == lastRemovedLog && seg <= lastRemovedSeg))
- {
- char filename[MAXFNAMELEN];
-
- XLogFileName(filename, ThisTimeLineID, log, seg);
+ XLogGetLastRemoved(&lastRemovedSegNo);
+ XLByteToSeg(startptr, segno);
+ if (segno <= lastRemovedSegNo)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
- filename)));
- }
+ XLogFileNameP(ThisTimeLineID, segno))));
/*
- * During recovery, the currently-open WAL file might be replaced with
- * the file of the same name retrieved from archive. So we always need
- * to check what we read was valid after reading into the buffer. If it's
+ * During recovery, the currently-open WAL file might be replaced with the
+ * file of the same name retrieved from archive. So we always need to
+ * check what we read was valid after reading into the buffer. If it's
* invalid, we try to open and read the file again.
*/
if (am_cascading_walsender)
* SendRqstPtr never points to the middle of a WAL record.
*/
startptr = sentPtr;
- if (startptr.xrecoff >= XLogFileSize)
- {
- /*
- * crossing a logid boundary, skip the non-existent last log segment
- * in previous logical log file.
- */
- startptr.xlogid += 1;
- startptr.xrecoff = 0;
- }
-
endptr = startptr;
XLByteAdvance(endptr, MAX_SEND_SIZE);
- if (endptr.xlogid != startptr.xlogid)
- {
- /* Don't cross a logfile boundary within one message */
- Assert(endptr.xlogid == startptr.xlogid + 1);
- endptr.xlogid = startptr.xlogid;
- endptr.xrecoff = XLogFileSize;
- }
/* if we went beyond SendRqstPtr, back off */
if (XLByteLE(SendRqstPtr, endptr))
else
{
/* round down to page boundary. */
- endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
+ endptr -= (endptr % XLOG_BLCKSZ);
*caughtup = false;
}
- nbytes = endptr.xrecoff - startptr.xrecoff;
+ nbytes = endptr - startptr;
Assert(nbytes <= MAX_SEND_SIZE);
/*
char activitymsg[50];
snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
- sentPtr.xlogid, sentPtr.xrecoff);
+ (uint32) (sentPtr >> 32), (uint32) sentPtr);
set_ps_display(activitymsg, false);
}
if (MyWalSnd)
SetLatch(&MyWalSnd->latch);
+ /*
+ * Set the standard (non-walsender) state as well, so that we can abort
+ * things like do_pg_stop_backup().
+ */
+ InterruptPending = true;
+ ProcDiePending = true;
+
errno = save_errno;
}
pqsignal(SIGINT, SIG_IGN); /* not used */
pqsignal(SIGTERM, WalSndShutdownHandler); /* request shutdown */
pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */
- pqsignal(SIGALRM, SIG_IGN);
+ InitializeTimeouts(); /* establishes SIGALRM handler */
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
/* First time through, so initialize */
MemSet(WalSndCtl, 0, WalSndShmemSize());
- SHMQueueInit(&(WalSndCtl->SyncRepQueue));
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+ SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
for (i = 0; i < max_wal_senders; i++)
{
}
}
-/* Wake up all walsenders */
+/*
+ * Wake up all walsenders
+ *
+ * This will be called inside critical sections, so throwing an error is not
+ * adviseable.
+ */
void
WalSndWakeup(void)
{
if (walsnd->pid != 0)
{
- sync_priority[i] = walsnd->sync_standby_priority;
+ /*
+ * Treat a standby such as a pg_basebackup background process
+ * which always returns an invalid flush location, as an
+ * asynchronous standby.
+ */
+ sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
+ 0 : walsnd->sync_standby_priority;
if (walsnd->state == WALSNDSTATE_STREAMING &&
walsnd->sync_standby_priority > 0 &&
(priority == 0 ||
- priority > walsnd->sync_standby_priority))
+ priority > walsnd->sync_standby_priority) &&
+ !XLogRecPtrIsInvalid(walsnd->flush))
{
priority = walsnd->sync_standby_priority;
sync_standby = i;
values[1] = CStringGetTextDatum(WalSndGetStateString(state));
snprintf(location, sizeof(location), "%X/%X",
- sentPtr.xlogid, sentPtr.xrecoff);
+ (uint32) (sentPtr >> 32), (uint32) sentPtr);
values[2] = CStringGetTextDatum(location);
- if (write.xlogid == 0 && write.xrecoff == 0)
+ if (write == 0)
nulls[3] = true;
snprintf(location, sizeof(location), "%X/%X",
- write.xlogid, write.xrecoff);
+ (uint32) (write >> 32), (uint32) write);
values[3] = CStringGetTextDatum(location);
- if (flush.xlogid == 0 && flush.xrecoff == 0)
+ if (flush == 0)
nulls[4] = true;
snprintf(location, sizeof(location), "%X/%X",
- flush.xlogid, flush.xrecoff);
+ (uint32) (flush >> 32), (uint32) flush);
values[4] = CStringGetTextDatum(location);
- if (apply.xlogid == 0 && apply.xrecoff == 0)
+ if (apply == 0)
nulls[5] = true;
snprintf(location, sizeof(location), "%X/%X",
- apply.xlogid, apply.xrecoff);
+ (uint32) (apply >> 32), (uint32) apply);
values[5] = CStringGetTextDatum(location);
values[6] = Int32GetDatum(sync_priority[i]);
return (Datum) 0;
}
+static void
+WalSndKeepalive(char *msgbuf)
+{
+ PrimaryKeepaliveMessage keepalive_message;
+
+ /* Construct a new message */
+ keepalive_message.walEnd = sentPtr;
+ keepalive_message.sendTime = GetCurrentTimestamp();
+
+ elog(DEBUG2, "sending replication keepalive");
+
+ /* Prepend with the message type and send it. */
+ msgbuf[0] = 'k';
+ memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
+ pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1);
+}
+
/*
* This isn't currently used for anything. Monitoring tools might be
* interested in the future, and we'll need something like this in the