WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
bool last_write)
{
+ TimestampTz now;
+ int64 now_int;
+
/* output previously gathered data in a CopyData packet */
pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
* several releases by streaming physical replication.
*/
resetStringInfo(&tmpbuf);
- pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
+ now_int = GetCurrentIntegerTimestamp();
+ now = IntegerTimestampToTimestampTz(now_int);
+ pq_sendint64(&tmpbuf, now_int);
memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
tmpbuf.data, sizeof(int64));
- /* fast path */
+ CHECK_FOR_INTERRUPTS();
+
/* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0)
WalSndShutdown();
- if (!pq_is_send_pending())
+ /* Try taking fast path unless we get too close to walsender timeout. */
+ if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
+ wal_sender_timeout / 2) &&
+ !pq_is_send_pending())
+ {
return;
+ }
+ /* If we have pending write here, go to slow path */
for (;;)
{
int wakeEvents;
long sleeptime;
- TimestampTz now;
+
+ /* Check for input from the client */
+ ProcessRepliesIfAny();
+
+ now = GetCurrentTimestamp();
+
+ /* die if timeout was reached */
+ WalSndCheckTimeOut(now);
+
+ /* Send keepalive if the time has come */
+ WalSndKeepaliveIfNecessary(now);
+
+ if (!pq_is_send_pending())
+ break;
+
+ sleeptime = WalSndComputeSleeptime(now);
+
+ wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
+ WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
+
+ /* Sleep until something happens or we time out */
+ WaitLatchOrSocket(MyLatch, wakeEvents,
+ MyProcPort->sock, sleeptime);
/*
* Emergency bailout if postmaster has died. This is to avoid the
SyncRepInitConfig();
}
- /* Check for input from the client */
- ProcessRepliesIfAny();
-
/* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0)
WalSndShutdown();
-
- /* If we finished clearing the buffered data, we're done here. */
- if (!pq_is_send_pending())
- break;
-
- now = GetCurrentTimestamp();
-
- /* die if timeout was reached */
- WalSndCheckTimeOut(now);
-
- /* Send keepalive if the time has come */
- WalSndKeepaliveIfNecessary(now);
-
- sleeptime = WalSndComputeSleeptime(now);
-
- wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
- WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
-
- /* Sleep until something happens or we time out */
- WaitLatchOrSocket(MyLatch, wakeEvents,
- MyProcPort->sock, sleeptime);
}
/* reactivate latch so WalSndLoop knows to continue */