#include "access/timeline.h"
#include "access/transam.h"
+#include "access/xact.h"
#include "access/xlog_internal.h"
+
#include "catalog/pg_type.h"
+#include "commands/dbcommands.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "replication/basebackup.h"
+#include "replication/decode.h"
+#include "replication/logical.h"
+#include "replication/logicalfuncs.h"
#include "replication/slot.h"
+#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
WalSnd *MyWalSnd = NULL;
/* Global state */
-bool am_walsender = false; /* Am I a walsender process ? */
+bool am_walsender = false; /* Am I a walsender process? */
bool am_cascading_walsender = false; /* Am I cascading WAL to
- * another standby ? */
+ * another standby? */
+bool am_db_walsender = false; /* Connected to a database? */
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
static TimestampTz last_reply_timestamp;
/* Have we sent a heartbeat message asking for reply, since last reply? */
-static bool ping_sent = false;
+static bool waiting_for_ping_response = false;
/*
* While streaming WAL in Copy mode, streamingDoneSending is set to true
static bool streamingDoneSending;
static bool streamingDoneReceiving;
+/* Are we there yet? */
+static bool WalSndCaughtUp = false;
+
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t walsender_ready_to_stop = false;
*/
static volatile sig_atomic_t replication_active = false;
+static LogicalDecodingContext *logical_decoding_ctx = NULL;
+static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
+
/* Signal handlers */
static void WalSndSigHupHandler(SIGNAL_ARGS);
static void WalSndXLogSendHandler(SIGNAL_ARGS);
static void WalSndLastCycleHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
-static void WalSndLoop(void);
+typedef void (*WalSndSendDataCallback)(void);
+static void WalSndLoop(WalSndSendDataCallback send_data);
static void InitWalSenderSlot(void);
static void WalSndKill(int code, Datum arg);
-static void XLogSend(bool *caughtup);
+static void WalSndShutdown(void) __attribute__((noreturn));
+static void XLogSendPhysical(void);
+static void XLogSendLogical(void);
+static void WalSndDone(WalSndSendDataCallback send_data);
static XLogRecPtr GetStandbyFlushRecPtr(void);
static void IdentifySystem(void);
+static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
+static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
static void StartReplication(StartReplicationCmd *cmd);
+static void StartLogicalReplication(StartReplicationCmd *cmd);
static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void);
static void WalSndKeepalive(bool requestReply);
+static void WalSndKeepaliveIfNecessary(TimestampTz now);
+static void WalSndCheckTimeOut(TimestampTz now);
+static long WalSndComputeSleeptime(TimestampTz now);
+static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
+static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
+static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+
+static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
/* Initialize walsender process before entering the main command loop */
WalSndSetState(WALSNDSTATE_STARTUP);
}
+/*
+ * Handle a client's connection abort in an orderly manner.
+ */
+static void
+WalSndShutdown(void)
+{
+ /*
+ * Reset whereToSendOutput to prevent ereport from attempting to send any
+ * more messages to the standby.
+ */
+ if (whereToSendOutput == DestRemote)
+ whereToSendOutput = DestNone;
+
+ proc_exit(0);
+ abort(); /* keep the compiler quiet */
+}
+
/*
* Handle the IDENTIFY_SYSTEM command.
*/
char tli[11];
char xpos[MAXFNAMELEN];
XLogRecPtr logptr;
+ char *dbname = NULL;
/*
- * Reply with a result set with one row, three columns. First col is
- * system ID, second is timeline ID, and third is current xlog location.
+ * Reply with a result set with one row, four columns. First col is system
+ * ID, second is timeline ID, third is current xlog location and the fourth
+ * contains the database name if we are connected to one.
*/
snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
+ if (MyDatabaseId != InvalidOid)
+ {
+ MemoryContext cur = CurrentMemoryContext;
+
+ /* syscache access needs a transaction env. */
+ StartTransactionCommand();
+ /* make dbname live outside TX context */
+ MemoryContextSwitchTo(cur);
+ dbname = get_database_name(MyDatabaseId);
+ CommitTransactionCommand();
+ /* CommitTransactionCommand switches to TopMemoryContext */
+ MemoryContextSwitchTo(cur);
+ }
+
/* Send a RowDescription message */
pq_beginmessage(&buf, 'T');
- pq_sendint(&buf, 3, 2); /* 3 fields */
+ pq_sendint(&buf, 4, 2); /* 4 fields */
/* first field */
pq_sendstring(&buf, "systemid"); /* col name */
pq_sendint(&buf, 0, 2); /* format code */
/* third field */
- pq_sendstring(&buf, "xlogpos");
- pq_sendint(&buf, 0, 4);
- pq_sendint(&buf, 0, 2);
- pq_sendint(&buf, TEXTOID, 4);
- pq_sendint(&buf, -1, 2);
- pq_sendint(&buf, 0, 4);
- pq_sendint(&buf, 0, 2);
+ pq_sendstring(&buf, "xlogpos"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ /* fourth field */
+ pq_sendstring(&buf, "dbname"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
pq_endmessage(&buf);
/* Send a DataRow message */
pq_beginmessage(&buf, 'D');
- pq_sendint(&buf, 3, 2); /* # of columns */
+ pq_sendint(&buf, 4, 2); /* # of columns */
pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
pq_sendint(&buf, strlen(tli), 4); /* col2 len */
pq_sendbytes(&buf, (char *) tli, strlen(tli));
pq_sendint(&buf, strlen(xpos), 4); /* col3 len */
pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
+ /* send NULL if not connected to a database */
+ if (dbname)
+ {
+ pq_sendint(&buf, strlen(dbname), 4); /* col4 len */
+ pq_sendbytes(&buf, (char *) dbname, strlen(dbname));
+ }
+ else
+ {
+ pq_sendint(&buf, -1, 4); /* col4 len, NULL */
+ }
pq_endmessage(&buf);
}
/* Main loop of walsender */
replication_active = true;
- WalSndLoop();
+ WalSndLoop(XLogSendPhysical);
replication_active = false;
if (walsender_ready_to_stop)
pq_puttextmessage('C', "START_STREAMING");
}
+/*
+ * read_page callback for logical decoding contexts, as a walsender process.
+ *
+ * Inside the walsender we can do better than logical_read_local_xlog_page,
+ * which has to do a plain sleep/busy loop, because the walsender's latch gets
+ * set everytime WAL is flushed.
+ */
+static int
+logical_read_xlog_page(XLogReaderState* state, XLogRecPtr targetPagePtr, int reqLen,
+ XLogRecPtr targetRecPtr, char* cur_page, TimeLineID *pageTLI)
+{
+ XLogRecPtr flushptr;
+ int count;
+
+ /* make sure we have enough WAL available */
+ flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
+
+ /* more than one block available */
+ if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
+ count = XLOG_BLCKSZ;
+ /* not enough WAL synced, that can happen during shutdown */
+ else if (targetPagePtr + reqLen > flushptr)
+ return -1;
+ /* part of the page available */
+ else
+ count = flushptr - targetPagePtr;
+
+ /* now actually read the data, we know it's there */
+ XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+
+ return count;
+}
+
/*
* Create a new replication slot.
*/
CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
{
const char *slot_name;
+ const char *snapshot_name = NULL;
+ char xpos[MAXFNAMELEN];
StringInfoData buf;
Assert(!MyReplicationSlot);
sendTimeLineIsHistoric = false;
sendTimeLine = ThisTimeLineID;
- ReplicationSlotCreate(cmd->slotname,
- cmd->kind == REPLICATION_KIND_LOGICAL,
- RS_PERSISTENT);
+ if (cmd->kind == REPLICATION_KIND_PHYSICAL)
+ {
+ ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT);
+ }
+ else
+ {
+ CheckLogicalDecodingRequirements();
+ ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL);
+ }
initStringInfo(&output_message);
slot_name = NameStr(MyReplicationSlot->data.name);
- /*
- * It may seem somewhat pointless to send back the same slot name the
- * client just requested and nothing else, but logical replication
- * will add more fields here. (We could consider removing the slot
- * name from what's sent back, though, since the client has specified
- * that.)
- */
+ if (cmd->kind == REPLICATION_KIND_LOGICAL)
+ {
+ LogicalDecodingContext *ctx;
+
+ ctx = CreateInitDecodingContext(
+ cmd->plugin, NIL,
+ logical_read_xlog_page,
+ WalSndPrepareWrite, WalSndWriteData);
+
+ /* build initial snapshot, might take a while */
+ DecodingContextFindStartpoint(ctx);
+
+ /*
+ * Export a plain (not of the snapbuild.c type) snapshot to the user
+ * that can be imported into another session.
+ */
+ snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
+
+ /* don't need the decoding context anymore */
+ FreeDecodingContext(ctx);
+
+ ReplicationSlotPersist();
+ }
+
+ slot_name = NameStr(MyReplicationSlot->data.name);
+ snprintf(xpos, sizeof(xpos), "%X/%X",
+ (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
+ (uint32) MyReplicationSlot->data.confirmed_flush);
pq_beginmessage(&buf, 'T');
- pq_sendint(&buf, 1, 2); /* 1 field */
+ pq_sendint(&buf, 4, 2); /* 4 fields */
/* first field: slot name */
pq_sendstring(&buf, "slot_name"); /* col name */
pq_sendint(&buf, 0, 4); /* typmod */
pq_sendint(&buf, 0, 2); /* format code */
+ /* second field: LSN at which we became consistent */
+ pq_sendstring(&buf, "consistent_point"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ /* third field: exported snapshot's name */
+ pq_sendstring(&buf, "snapshot_name"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ /* fourth field: output plugin */
+ pq_sendstring(&buf, "output_plugin"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
pq_endmessage(&buf);
/* Send a DataRow message */
pq_beginmessage(&buf, 'D');
- pq_sendint(&buf, 1, 2); /* # of columns */
+ pq_sendint(&buf, 4, 2); /* # of columns */
/* slot_name */
pq_sendint(&buf, strlen(slot_name), 4); /* col1 len */
pq_sendbytes(&buf, slot_name, strlen(slot_name));
+ /* consistent wal location */
+ pq_sendint(&buf, strlen(xpos), 4); /* col2 len */
+ pq_sendbytes(&buf, xpos, strlen(xpos));
+
+ /* snapshot name */
+ if (snapshot_name != NULL)
+ {
+ pq_sendint(&buf, strlen(snapshot_name), 4); /* col3 len */
+ pq_sendbytes(&buf, snapshot_name, strlen(snapshot_name));
+ }
+ else
+ pq_sendint(&buf, -1, 4); /* col3 len, NULL */
+
+ /* plugin */
+ if (cmd->plugin != NULL)
+ {
+ pq_sendint(&buf, strlen(cmd->plugin), 4); /* col4 len */
+ pq_sendbytes(&buf, cmd->plugin, strlen(cmd->plugin));
+ }
+ else
+ pq_sendint(&buf, -1, 4); /* col4 len, NULL */
+
pq_endmessage(&buf);
/*
EndCommand("DROP_REPLICATION_SLOT", DestRemote);
}
+/*
+ * Load previously initiated logical slot and prepare for sending data (via
+ * WalSndLoop).
+ */
+static void
+StartLogicalReplication(StartReplicationCmd *cmd)
+{
+ StringInfoData buf;
+
+ /* make sure that our requirements are still fulfilled */
+ CheckLogicalDecodingRequirements();
+
+ Assert(!MyReplicationSlot);
+
+ ReplicationSlotAcquire(cmd->slotname);
+
+ /*
+ * Force a disconnect, so that the decoding code doesn't need to care
+ * about a eventual switch from running in recovery, to running in a
+ * normal environment. Client code is expected to handle reconnects.
+ */
+ if (am_cascading_walsender && !RecoveryInProgress())
+ {
+ ereport(LOG,
+ (errmsg("terminating walsender process after promotion")));
+ walsender_ready_to_stop = true;
+ }
+
+ WalSndSetState(WALSNDSTATE_CATCHUP);
+
+ /* Send a CopyBothResponse message, and start streaming */
+ pq_beginmessage(&buf, 'W');
+ pq_sendbyte(&buf, 0);
+ pq_sendint(&buf, 0, 2);
+ pq_endmessage(&buf);
+ pq_flush();
+
+ /* setup state for XLogReadPage */
+ sendTimeLineIsHistoric = false;
+ sendTimeLine = ThisTimeLineID;
+
+ /*
+ * Initialize position to the last ack'ed one, then the xlog records begin
+ * to be shipped from that position.
+ */
+ logical_decoding_ctx = CreateDecodingContext(
+ cmd->startpoint, cmd->options,
+ logical_read_xlog_page,
+ WalSndPrepareWrite, WalSndWriteData);
+
+ /* Start reading WAL from the oldest required WAL. */
+ logical_startptr = MyReplicationSlot->data.restart_lsn;
+
+ /*
+ * Report the location after which we'll send out further commits as the
+ * current sentPtr.
+ */
+ sentPtr = MyReplicationSlot->data.confirmed_flush;
+
+ /* Also update the sent position status in shared memory */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->sentPtr = MyReplicationSlot->data.restart_lsn;
+ SpinLockRelease(&walsnd->mutex);
+ }
+
+ replication_active = true;
+
+ SyncRepInitConfig();
+
+ /* Main loop of walsender */
+ WalSndLoop(XLogSendLogical);
+
+ FreeDecodingContext(logical_decoding_ctx);
+ ReplicationSlotRelease();
+
+ replication_active = false;
+ if (walsender_ready_to_stop)
+ proc_exit(0);
+ WalSndSetState(WALSNDSTATE_STARTUP);
+
+ /* Get out of COPY mode (CommandComplete). */
+ EndCommand("COPY 0", DestRemote);
+}
+
+/*
+ * LogicalDecodingContext 'prepare_write' callback.
+ *
+ * Prepare a write into a StringInfo.
+ *
+ * Don't do anything lasting in here, it's quite possible that nothing will done
+ * with the data.
+ */
+static void
+WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
+{
+ /* can't have sync rep confused by sending the same LSN several times */
+ if (!last_write)
+ lsn = InvalidXLogRecPtr;
+
+ resetStringInfo(ctx->out);
+
+ pq_sendbyte(ctx->out, 'w');
+ pq_sendint64(ctx->out, lsn); /* dataStart */
+ pq_sendint64(ctx->out, lsn); /* walEnd */
+ /*
+ * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
+ * reserve space here.
+ */
+ pq_sendint64(ctx->out, 0); /* sendtime */
+}
+
+/*
+ * LogicalDecodingContext 'write' callback.
+ *
+ * Actually write out data previously prepared by WalSndPrepareWrite out to
+ * the network. Take as long as needed, but process replies from the other
+ * side and check timeouts during that.
+ */
+static void
+WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
+ bool last_write)
+{
+ /* output previously gathered data in a CopyData packet */
+ pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
+
+ /*
+ * Fill the send timestamp last, so that it is taken as late as
+ * possible. This is somewhat ugly, but the protocol's set as it's already
+ * used for several releases by streaming physical replication.
+ */
+ resetStringInfo(&tmpbuf);
+ pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
+ memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
+ tmpbuf.data, sizeof(int64));
+
+ /* fast path */
+ /* Try to flush pending output to the client */
+ if (pq_flush_if_writable() != 0)
+ WalSndShutdown();
+
+ if (!pq_is_send_pending())
+ return;
+
+ for (;;)
+ {
+ int wakeEvents;
+ long sleeptime;
+ TimestampTz now;
+
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (!PostmasterIsAlive())
+ exit(1);
+
+ /* Process any requests or signals received recently */
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ SyncRepInitConfig();
+ }
+
+ /* Check for input from the client */
+ ProcessRepliesIfAny();
+
+ /* Clear any already-pending wakeups */
+ ResetLatch(&MyWalSnd->latch);
+
+ /* Try to flush pending output to the client */
+ if (pq_flush_if_writable() != 0)
+ WalSndShutdown();
+
+ /* If we finished clearing the buffered data, we're done here. */
+ if (!pq_is_send_pending())
+ break;
+
+ now = GetCurrentTimestamp();
+
+ /* die if timeout was reached */
+ WalSndCheckTimeOut(now);
+
+ /* Send keepalive if the time has come */
+ WalSndKeepaliveIfNecessary(now);
+
+ sleeptime = WalSndComputeSleeptime(now);
+
+ wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
+ WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
+
+ /* Sleep until something happens or we time out */
+ ImmediateInterruptOK = true;
+ CHECK_FOR_INTERRUPTS();
+ WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+ MyProcPort->sock, sleeptime);
+ ImmediateInterruptOK = false;
+ }
+
+ /* reactivate latch so WalSndLoop knows to continue */
+ SetLatch(&MyWalSnd->latch);
+}
+
+/*
+ * Wait till WAL < loc is flushed to disk so it can be safely read.
+ */
+static XLogRecPtr
+WalSndWaitForWal(XLogRecPtr loc)
+{
+ int wakeEvents;
+ static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+
+
+ /*
+ * Fast path to avoid acquiring the spinlock in the we already know we
+ * have enough WAL available. This is particularly interesting if we're
+ * far behind.
+ */
+ if (RecentFlushPtr != InvalidXLogRecPtr &&
+ loc <= RecentFlushPtr)
+ return RecentFlushPtr;
+
+ /* Get a more recent flush pointer. */
+ if (!RecoveryInProgress())
+ RecentFlushPtr = GetFlushRecPtr();
+ else
+ RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+
+ for (;;)
+ {
+ long sleeptime;
+ TimestampTz now;
+
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (!PostmasterIsAlive())
+ exit(1);
+
+ /* Process any requests or signals received recently */
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ SyncRepInitConfig();
+ }
+
+ /* Check for input from the client */
+ ProcessRepliesIfAny();
+
+ /* Clear any already-pending wakeups */
+ ResetLatch(&MyWalSnd->latch);
+
+ /* Update our idea of the currently flushed position. */
+ if (!RecoveryInProgress())
+ RecentFlushPtr = GetFlushRecPtr();
+ else
+ RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+
+ /*
+ * If postmaster asked us to stop, don't wait here anymore. This will
+ * cause the xlogreader to return without reading a full record, which
+ * is the fastest way to reach the mainloop which then can quit.
+ *
+ * It's important to do this check after the recomputation of
+ * RecentFlushPtr, so we can send all remaining data before shutting
+ * down.
+ */
+ if (walsender_ready_to_stop)
+ break;
+
+ /*
+ * We only send regular messages to the client for full decoded
+ * transactions, but a synchronous replication and walsender shutdown
+ * possibly are waiting for a later location. So we send pings
+ * containing the flush location every now and then.
+ */
+ if (MyWalSnd->flush < sentPtr && !waiting_for_ping_response)
+ {
+ WalSndKeepalive(true);
+ waiting_for_ping_response = true;
+ }
+
+ /* check whether we're done */
+ if (loc <= RecentFlushPtr)
+ break;
+
+ /* Waiting for new WAL. Since we need to wait, we're now caught up. */
+ WalSndCaughtUp = true;
+
+ /*
+ * Try to flush pending output to the client. Also wait for the socket
+ * becoming writable, if there's still pending output after an attempt
+ * to flush. Otherwise we might just sit on output data while waiting
+ * for new WAL being generated.
+ */
+ if (pq_flush_if_writable() != 0)
+ WalSndShutdown();
+
+ now = GetCurrentTimestamp();
+
+ /* die if timeout was reached */
+ WalSndCheckTimeOut(now);
+
+ /* Send keepalive if the time has come */
+ WalSndKeepaliveIfNecessary(now);
+
+ sleeptime = WalSndComputeSleeptime(now);
+
+ wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
+ WL_SOCKET_READABLE | WL_TIMEOUT;
+
+ if (pq_is_send_pending())
+ wakeEvents |= WL_SOCKET_WRITEABLE;
+
+ /* Sleep until something happens or we time out */
+ ImmediateInterruptOK = true;
+ CHECK_FOR_INTERRUPTS();
+ WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+ MyProcPort->sock, sleeptime);
+ ImmediateInterruptOK = false;
+ }
+
+ /* reactivate latch so WalSndLoop knows to continue */
+ SetLatch(&MyWalSnd->latch);
+ return RecentFlushPtr;
+}
+
/*
* Execute an incoming replication command.
*/
MemoryContext cmd_context;
MemoryContext old_context;
+ /*
+ * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
+ * command arrives. Clean up the old stuff if there's anything.
+ */
+ SnapBuildClearExportedSnapshot();
+
elog(DEBUG1, "received replication command: %s", cmd_string);
CHECK_FOR_INTERRUPTS();
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
StartReplication(cmd);
else
- elog(ERROR, "cannot handle logical decoding yet");
+ StartLogicalReplication(cmd);
break;
}
if (received)
{
last_reply_timestamp = GetCurrentTimestamp();
- ping_sent = false;
+ waiting_for_ping_response = false;
}
}
if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
{
if (MyReplicationSlot->data.database != InvalidOid)
- elog(ERROR, "cannot handle logical decoding yet");
+ LogicalConfirmReceivedLocation(flushPtr);
else
PhysicalConfirmReceivedLocation(flushPtr);
}
MyPgXact->xmin = feedbackXmin;
}
-/* Main loop of walsender process that streams the WAL over Copy messages. */
+/*
+ * Compute how long send/receive loops should sleep.
+ *
+ * If wal_sender_timeout is enabled we want to wake up in time to send
+ * keepalives and to abort the connection if wal_sender_timeout has been
+ * reached.
+ */
+static long
+WalSndComputeSleeptime(TimestampTz now)
+{
+ long sleeptime = 10000; /* 10 s */
+
+ if (wal_sender_timeout > 0)
+ {
+ TimestampTz wakeup_time;
+ long sec_to_timeout;
+ int microsec_to_timeout;
+
+ /*
+ * At the latest stop sleeping once wal_sender_timeout has been
+ * reached.
+ */
+ wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+ wal_sender_timeout);
+
+ /*
+ * If no ping has been sent yet, wakeup when it's time to do
+ * so. WalSndKeepaliveIfNecessary() wants to send a keepalive once
+ * half of the timeout passed without a response.
+ */
+ if (!waiting_for_ping_response)
+ wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+ wal_sender_timeout / 2);
+
+ /* Compute relative time until wakeup. */
+ TimestampDifference(now, wakeup_time,
+ &sec_to_timeout, µsec_to_timeout);
+
+ sleeptime = sec_to_timeout * 1000 +
+ microsec_to_timeout / 1000;
+ }
+
+ return sleeptime;
+}
+
+/*
+ * Check whether there have been responses by the client within
+ * wal_sender_timeout and shutdown if not.
+ */
static void
-WalSndLoop(void)
+WalSndCheckTimeOut(TimestampTz now)
{
- bool caughtup = false;
+ TimestampTz timeout;
+
+ timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
+ wal_sender_timeout);
+ if (wal_sender_timeout > 0 && now >= timeout)
+ {
+ /*
+ * Since typically expiration of replication timeout means
+ * communication problem, we don't send the error message to
+ * the standby.
+ */
+ ereport(COMMERROR,
+ (errmsg("terminating walsender process due to replication timeout")));
+
+ WalSndShutdown();
+ }
+}
+
+/* Main loop of walsender process that streams the WAL over Copy messages. */
+static void
+WalSndLoop(WalSndSendDataCallback send_data)
+{
/*
* Allocate buffers that will be used for each outgoing and incoming
* message. We do this just once to reduce palloc overhead.
/* Initialize the last reply timestamp */
last_reply_timestamp = GetCurrentTimestamp();
- ping_sent = false;
+ waiting_for_ping_response = false;
/*
* Loop until we reach the end of this timeline or the client requests to
*/
for (;;)
{
- /* Clear any already-pending wakeups */
- ResetLatch(&MyWalSnd->latch);
+ TimestampTz now;
/*
* Emergency bailout if postmaster has died. This is to avoid the
/* Check for input from the client */
ProcessRepliesIfAny();
+ /* Clear any already-pending wakeups */
+ ResetLatch(&MyWalSnd->latch);
+
/*
* If we have received CopyDone from the client, sent CopyDone
* ourselves, and the output buffer is empty, it's time to exit
/*
* If we don't have any pending data in the output buffer, try to send
- * some more. If there is some, we don't bother to call XLogSend
+ * some more. If there is some, we don't bother to call send_data
* again until we've flushed it ... but we'd better assume we are not
* caught up.
*/
if (!pq_is_send_pending())
- XLogSend(&caughtup);
+ send_data();
else
- caughtup = false;
+ WalSndCaughtUp = false;
/* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0)
- goto send_failure;
+ WalSndShutdown();
/* If nothing remains to be sent right now ... */
- if (caughtup && !pq_is_send_pending())
+ if (WalSndCaughtUp && !pq_is_send_pending())
{
/*
* If we're in catchup state, move to streaming. This is an
* the walsender is not sure which.
*/
if (walsender_ready_to_stop)
- {
- /* ... let's just be real sure we're caught up ... */
- XLogSend(&caughtup);
- if (caughtup && sentPtr == MyWalSnd->flush &&
- !pq_is_send_pending())
- {
- /* Inform the standby that XLOG streaming is done */
- EndCommand("COPY 0", DestRemote);
- pq_flush();
-
- proc_exit(0);
- }
- }
+ WalSndDone(send_data);
}
- /*
- * If half of wal_sender_timeout has elapsed without receiving any
- * reply from standby, send a keep-alive message requesting an
- * immediate reply.
- */
- if (wal_sender_timeout > 0 && !ping_sent)
- {
- TimestampTz timeout;
+ now = GetCurrentTimestamp();
- timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
- wal_sender_timeout / 2);
- if (GetCurrentTimestamp() >= timeout)
- {
- WalSndKeepalive(true);
- ping_sent = true;
- /* Try to flush pending output to the client */
- if (pq_flush_if_writable() != 0)
- goto send_failure;
- }
- }
+ /* Check for replication timeout. */
+ WalSndCheckTimeOut(now);
+
+ /* Send keepalive if the time has come */
+ WalSndKeepaliveIfNecessary(now);
/*
* We don't block if not caught up, unless there is unsent data
* pending in which case we'd better block until the socket is
- * write-ready. This test is only needed for the case where XLogSend
- * loaded a subset of the available data but then pq_flush_if_writable
- * flushed it all --- we should immediately try to send more.
+ * write-ready. This test is only needed for the case where the
+ * send_data callback handled a subset of the available data but then
+ * pq_flush_if_writable flushed it all --- we should immediately try
+ * to send more.
*/
- if ((caughtup && !streamingDoneSending) || pq_is_send_pending())
+ if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
{
- TimestampTz timeout;
- long sleeptime = 10000; /* 10 s */
+ long sleeptime;
int wakeEvents;
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
WL_SOCKET_READABLE;
+ sleeptime = WalSndComputeSleeptime(now);
+
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
- /*
- * If wal_sender_timeout is active, sleep in smaller increments
- * to not go over the timeout too much. XXX: Why not just sleep
- * until the timeout has elapsed?
- */
- if (wal_sender_timeout > 0)
- sleeptime = 1 + (wal_sender_timeout / 10);
-
/* Sleep until something happens or we time out */
ImmediateInterruptOK = true;
CHECK_FOR_INTERRUPTS();
WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
MyProcPort->sock, sleeptime);
ImmediateInterruptOK = false;
-
- /*
- * Check for replication timeout. Note we ignore the corner case
- * possibility that the client replied just as we reached the
- * timeout ... he's supposed to reply *before* that.
- */
- timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
- wal_sender_timeout);
- if (wal_sender_timeout > 0 && GetCurrentTimestamp() >= timeout)
- {
- /*
- * Since typically expiration of replication timeout means
- * communication problem, we don't send the error message to
- * the standby.
- */
- ereport(COMMERROR,
- (errmsg("terminating walsender process due to replication timeout")));
- goto send_failure;
- }
}
}
return;
-
-send_failure:
-
- /*
- * Get here on send failure. Clean up and exit.
- *
- * Reset whereToSendOutput to prevent ereport from attempting to send any
- * more messages to the standby.
- */
- if (whereToSendOutput == DestRemote)
- whereToSendOutput = DestNone;
-
- proc_exit(0);
- abort(); /* keep the compiler quiet */
}
/* Initialize a per-walsender data structure for this walsender process */
}
/*
+ * Send out the WAL in its normal physical/stored form.
+ *
* Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
* but not yet sent to the client, and buffer it in the libpq output
* buffer.
*
- * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
- * *caughtup is set to false.
+ * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
+ * otherwise WalSndCaughtUp is set to false.
*/
static void
-XLogSend(bool *caughtup)
+XLogSendPhysical(void)
{
XLogRecPtr SendRqstPtr;
XLogRecPtr startptr;
if (streamingDoneSending)
{
- *caughtup = true;
+ WalSndCaughtUp = true;
return;
}
pq_putmessage_noblock('c', NULL, 0);
streamingDoneSending = true;
- *caughtup = true;
+ WalSndCaughtUp = true;
elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
(uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
Assert(sentPtr <= SendRqstPtr);
if (SendRqstPtr <= sentPtr)
{
- *caughtup = true;
+ WalSndCaughtUp = true;
return;
}
{
endptr = SendRqstPtr;
if (sendTimeLineIsHistoric)
- *caughtup = false;
+ WalSndCaughtUp = false;
else
- *caughtup = true;
+ WalSndCaughtUp = true;
}
else
{
/* round down to page boundary. */
endptr -= (endptr % XLOG_BLCKSZ);
- *caughtup = false;
+ WalSndCaughtUp = false;
}
nbytes = endptr - startptr;
return;
}
+/*
+ * Stream out logically decoded data.
+ */
+static void
+XLogSendLogical(void)
+{
+ XLogRecord *record;
+ char *errm;
+
+ /*
+ * Don't know whether we've caught up yet. We'll set it to true in
+ * WalSndWaitForWal, if we're actually waiting. We also set to true if
+ * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
+ * i.e. when we're shutting down.
+ */
+ WalSndCaughtUp = false;
+
+ record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
+ logical_startptr = InvalidXLogRecPtr;
+
+ /* xlog record was invalid */
+ if (errm != NULL)
+ elog(ERROR, "%s", errm);
+
+ if (record != NULL)
+ {
+ LogicalDecodingProcessRecord(logical_decoding_ctx, record);
+
+ sentPtr = logical_decoding_ctx->reader->EndRecPtr;
+ }
+ else
+ {
+ /*
+ * If the record we just wanted read is at or beyond the flushed point,
+ * then we're caught up.
+ */
+ if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
+ WalSndCaughtUp = true;
+ }
+
+ /* Update shared memory status */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->sentPtr = sentPtr;
+ SpinLockRelease(&walsnd->mutex);
+ }
+}
+
+/*
+ * Shutdown if the sender is caught up.
+ *
+ * NB: This should only be called when the shutdown signal has been received
+ * from postmaster.
+ *
+ * Note that if we determine that there's still more data to send, this
+ * function will return control to the caller.
+ */
+static void
+WalSndDone(WalSndSendDataCallback send_data)
+{
+ /* ... let's just be real sure we're caught up ... */
+ send_data();
+
+ if (WalSndCaughtUp && sentPtr == MyWalSnd->flush &&
+ !pq_is_send_pending())
+ {
+ /* Inform the standby that XLOG streaming is done */
+ EndCommand("COPY 0", DestRemote);
+ pq_flush();
+
+ proc_exit(0);
+ }
+ if (!waiting_for_ping_response)
+ WalSndKeepalive(true);
+}
+
/*
* Returns the latest point in WAL that has been safely flushed to disk, and
* can be sent to the standby. This should only be called when in recovery,
pq_putmessage_noblock('d', output_message.data, output_message.len);
}
+/*
+ * Send keepalive message if too much time has elapsed.
+ */
+static void
+WalSndKeepaliveIfNecessary(TimestampTz now)
+{
+ TimestampTz ping_time;
+
+ if (wal_sender_timeout <= 0)
+ return;
+
+ if (waiting_for_ping_response)
+ return;
+
+ /*
+ * If half of wal_sender_timeout has lapsed without receiving any reply
+ * from the standby, send a keep-alive message to the standby requesting
+ * an immediate reply.
+ */
+ ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+ wal_sender_timeout / 2);
+ if (now >= ping_time)
+ {
+ WalSndKeepalive(true);
+ waiting_for_ping_response = true;
+
+ /* Try to flush pending output to the client */
+ if (pq_flush_if_writable() != 0)
+ WalSndShutdown();
+ }
+}
+
/*
* This isn't currently used for anything. Monitoring tools might be
* interested in the future, and we'll need something like this in the