*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.20 2010/05/09 18:11:55 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.21 2010/05/26 22:21:33 heikki Exp $
*
*-------------------------------------------------------------------------
*/
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
-static bool XLogSend(StringInfo outMsg);
+static bool XLogSend(StringInfo outMsg, bool *caughtup);
static void CheckClosedConnection(void);
/*
* How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
+ *
+ * We don't have a good idea of what a good value would be; there's some
+ * overhead per message in both walsender and walreceiver, but on the other
+ * hand sending large batches makes walsender less responsive to signals
+ * because signals are checked only between messages. 128kB seems like
+ * a reasonable guess for now.
*/
-#define MAX_SEND_SIZE (XLOG_SEG_SIZE / 2)
+#define MAX_SEND_SIZE (128 * 1024)
/* Main entry point for walsender process */
int
WalSndLoop(void)
{
StringInfoData output_message;
+ bool caughtup = false;
initStringInfo(&output_message);
*/
if (ready_to_stop)
{
- XLogSend(&output_message);
+ XLogSend(&output_message, &caughtup);
shutdown_requested = true;
}
}
/*
- * Nap for the configured time or until a message arrives.
+ * If we had sent all accumulated WAL in last round, nap for the
+ * configured time before retrying.
*
* On some platforms, signals won't interrupt the sleep. To ensure we
* respond reasonably promptly when someone signals us, break down the
* sleep into NAPTIME_PER_CYCLE increments, and check for
* interrupts after each nap.
*/
- remain = WalSndDelay * 1000L;
- while (remain > 0)
+ if (caughtup)
{
- if (got_SIGHUP || shutdown_requested || ready_to_stop)
- break;
+ remain = WalSndDelay * 1000L;
+ while (remain > 0)
+ {
+ /* Check for interrupts */
+ if (got_SIGHUP || shutdown_requested || ready_to_stop)
+ break;
- /*
- * Check to see whether a message from the standby or an interrupt
- * from other processes has arrived.
- */
- pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
- CheckClosedConnection();
+ /* Sleep and check that the connection is still alive */
+ pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
+ CheckClosedConnection();
- remain -= NAPTIME_PER_CYCLE;
+ remain -= NAPTIME_PER_CYCLE;
+ }
}
-
/* Attempt to send the log once every loop */
- if (!XLogSend(&output_message))
+ if (!XLogSend(&output_message, &caughtup))
goto eof;
}
}
/*
- * Read all WAL that's been written (and flushed) since last cycle, and send
- * it to client.
+ * Read up to MAX_SEND_SIZE bytes of WAL that's been written (and flushed),
+ * but not yet sent to the client, and send it. If there is no unsent WAL,
+ * *caughtup is set to true and nothing is sent, otherwise *caughtup is set
+ * to false.
*
* Returns true if OK, false if trouble.
*/
static bool
-XLogSend(StringInfo outMsg)
+XLogSend(StringInfo outMsg, bool *caughtup)
{
XLogRecPtr SendRqstPtr;
+ XLogRecPtr startptr;
+ XLogRecPtr endptr;
+ Size nbytes;
char activitymsg[50];
/* use volatile pointer to prevent code rearrangement */
/* Quick exit if nothing to do */
if (!XLByteLT(sentPtr, SendRqstPtr))
+ {
+ *caughtup = true;
return true;
+ }
+ /*
+ * Otherwise let the caller know that we're not fully caught up. Unless
+ * there's a huge backlog, we'll be caught up to the current WriteRecPtr
+ * after we've sent everything below, but more WAL could accumulate while
+ * we're busy sending.
+ */
+ *caughtup = false;
/*
- * We gather multiple records together by issuing just one XLogRead() of a
- * suitable size, and send them as one CopyData message. Repeat until
- * we've sent everything we can.
+ * Figure out how much to send in one message. If there's less than
+ * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
+ * MAX_SEND_SIZE bytes, but round to page boundary.
+ *
+ * The rounding is not only for performance reasons. Walreceiver
+ * relies on the fact that we never split a WAL record across two
+ * messages. Since a long WAL record is split at page boundary into
+ * continuation records, page boundary is always a safe cut-off point.
+ * We also assume that SendRqstPtr never points in the middle of a WAL
+ * record.
*/
- while (XLByteLT(sentPtr, SendRqstPtr))
+ startptr = sentPtr;
+ if (startptr.xrecoff >= XLogFileSize)
{
- XLogRecPtr startptr;
- XLogRecPtr endptr;
- Size nbytes;
-
/*
- * Figure out how much to send in one message. If there's less than
- * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
- * MAX_SEND_SIZE bytes, but round to page boundary.
- *
- * The rounding is not only for performance reasons. Walreceiver
- * relies on the fact that we never split a WAL record across two
- * messages. Since a long WAL record is split at page boundary into
- * continuation records, page boundary is always a safe cut-off point.
- * We also assume that SendRqstPtr never points in the middle of a WAL
- * record.
+ * crossing a logid boundary, skip the non-existent last log
+ * segment in previous logical log file.
*/
- startptr = sentPtr;
- if (startptr.xrecoff >= XLogFileSize)
- {
- /*
- * crossing a logid boundary, skip the non-existent last log
- * segment in previous logical log file.
- */
- startptr.xlogid += 1;
- startptr.xrecoff = 0;
- }
+ startptr.xlogid += 1;
+ startptr.xrecoff = 0;
+ }
- endptr = startptr;
- XLByteAdvance(endptr, MAX_SEND_SIZE);
- /* round down to page boundary. */
- endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
- /* if we went beyond SendRqstPtr, back off */
- if (XLByteLT(SendRqstPtr, endptr))
- endptr = SendRqstPtr;
+ endptr = startptr;
+ XLByteAdvance(endptr, MAX_SEND_SIZE);
+ /* round down to page boundary. */
+ endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
+ /* if we went beyond SendRqstPtr, back off */
+ if (XLByteLT(SendRqstPtr, endptr))
+ endptr = SendRqstPtr;
- /*
- * OK to read and send the slice.
- *
- * We don't need to convert the xlogid/xrecoff from host byte order to
- * network byte order because the both server can be expected to have
- * the same byte order. If they have different byte order, we don't
- * reach here.
- */
- pq_sendbyte(outMsg, 'w');
- pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
+ /*
+ * OK to read and send the slice.
+ *
+ * We don't need to convert the xlogid/xrecoff from host byte order to
+ * network byte order because the both server can be expected to have
+ * the same byte order. If they have different byte order, we don't
+ * reach here.
+ */
+ pq_sendbyte(outMsg, 'w');
+ pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
- if (endptr.xlogid != startptr.xlogid)
- {
- Assert(endptr.xlogid == startptr.xlogid + 1);
- nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff;
- }
- else
- nbytes = endptr.xrecoff - startptr.xrecoff;
+ if (endptr.xlogid != startptr.xlogid)
+ {
+ Assert(endptr.xlogid == startptr.xlogid + 1);
+ nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff;
+ }
+ else
+ nbytes = endptr.xrecoff - startptr.xrecoff;
- sentPtr = endptr;
+ sentPtr = endptr;
- /*
- * Read the log directly into the output buffer to prevent extra
- * memcpy calls.
- */
- enlargeStringInfo(outMsg, nbytes);
+ /*
+ * Read the log directly into the output buffer to prevent extra
+ * memcpy calls.
+ */
+ enlargeStringInfo(outMsg, nbytes);
- XLogRead(&outMsg->data[outMsg->len], startptr, nbytes);
- outMsg->len += nbytes;
- outMsg->data[outMsg->len] = '\0';
+ XLogRead(&outMsg->data[outMsg->len], startptr, nbytes);
+ outMsg->len += nbytes;
+ outMsg->data[outMsg->len] = '\0';
- pq_putmessage('d', outMsg->data, outMsg->len);
- resetStringInfo(outMsg);
- }
+ pq_putmessage('d', outMsg->data, outMsg->len);
+ resetStringInfo(outMsg);
/* Update shared memory status */
SpinLockAcquire(&walsnd->mutex);