#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
+#include "utils/timeout.h"
#include "utils/timestamp.h"
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int replication_timeout = 60 * 1000; /* maximum time to send one
* WAL data message */
+/*
+ * State for WalSndWakeupRequest
+ */
+bool wake_wal_senders = false;
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
* How far have we sent WAL already? This is also advertised in
* MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
*/
-static XLogRecPtr sentPtr = {0, 0};
+static XLogRecPtr sentPtr = 0;
/*
* Buffer for processing reply messages.
/* Prototypes for private functions */
static bool HandleReplicationCommand(const char *cmd_string);
-static int WalSndLoop(void);
+static void WalSndLoop(void) __attribute__((noreturn));
static void InitWalSnd(void);
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
/* Main entry point for walsender process */
-int
+void
WalSenderMain(void)
{
MemoryContext walsnd_context;
SyncRepInitConfig();
/* Main loop of walsender */
- return WalSndLoop();
+ WalSndLoop();
}
/*
logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
- snprintf(xpos, sizeof(xpos), "%X/%X",
- logptr.xlogid, logptr.xrecoff);
+ snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
/* Send a RowDescription message */
pq_beginmessage(&buf, 'T');
pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
- reply.write.xlogid, reply.write.xrecoff,
- reply.flush.xlogid, reply.flush.xrecoff,
- reply.apply.xlogid, reply.apply.xrecoff);
+ (uint32) (reply.write >> 32), (uint32) reply.write,
+ (uint32) (reply.flush >> 32), (uint32) reply.flush,
+ (uint32) (reply.apply >> 32), (uint32) reply.apply);
/*
* Update shared state for this WalSender process based on reply data from
}
/* Main loop of walsender process */
-static int
+static void
WalSndLoop(void)
{
char *output_message;
whereToSendOutput = DestNone;
proc_exit(0);
- return 1; /* keep the compiler quiet */
+ abort(); /* keep the compiler quiet */
}
/* Initialize a per-walsender data structure for this walsender process */
int segbytes;
int readbytes;
- startoff = recptr.xrecoff % XLogSegSize;
+ startoff = recptr % XLogSegSize;
if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
{
startptr = sentPtr;
endptr = startptr;
XLByteAdvance(endptr, MAX_SEND_SIZE);
- if (endptr.xlogid != startptr.xlogid)
- {
- /* Don't cross a logfile boundary within one message */
- Assert(endptr.xlogid == startptr.xlogid + 1);
- endptr.xrecoff = 0;
- }
/* if we went beyond SendRqstPtr, back off */
if (XLByteLE(SendRqstPtr, endptr))
else
{
/* round down to page boundary. */
- endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
+ endptr -= (endptr % XLOG_BLCKSZ);
*caughtup = false;
}
- if (endptr.xrecoff == 0)
- nbytes = 0x100000000L - (uint64) startptr.xrecoff;
- else
- nbytes = endptr.xrecoff - startptr.xrecoff;
+ nbytes = endptr - startptr;
Assert(nbytes <= MAX_SEND_SIZE);
/*
char activitymsg[50];
snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
- sentPtr.xlogid, sentPtr.xrecoff);
+ (uint32) (sentPtr >> 32), (uint32) sentPtr);
set_ps_display(activitymsg, false);
}
pqsignal(SIGINT, SIG_IGN); /* not used */
pqsignal(SIGTERM, WalSndShutdownHandler); /* request shutdown */
pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */
- pqsignal(SIGALRM, SIG_IGN);
+ InitializeTimeouts(); /* establishes SIGALRM handler */
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
}
}
-/* Wake up all walsenders */
+/*
+ * Wake up all walsenders
+ *
+ * This will be called inside critical sections, so throwing an error is not
+ * adviseable.
+ */
void
WalSndWakeup(void)
{
if (walsnd->pid != 0)
{
- sync_priority[i] = walsnd->sync_standby_priority;
+ /*
+ * Treat a standby such as a pg_basebackup background process
+ * which always returns an invalid flush location, as an
+ * asynchronous standby.
+ */
+ sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
+ 0 : walsnd->sync_standby_priority;
if (walsnd->state == WALSNDSTATE_STREAMING &&
walsnd->sync_standby_priority > 0 &&
(priority == 0 ||
- priority > walsnd->sync_standby_priority))
+ priority > walsnd->sync_standby_priority) &&
+ !XLogRecPtrIsInvalid(walsnd->flush))
{
priority = walsnd->sync_standby_priority;
sync_standby = i;
values[1] = CStringGetTextDatum(WalSndGetStateString(state));
snprintf(location, sizeof(location), "%X/%X",
- sentPtr.xlogid, sentPtr.xrecoff);
+ (uint32) (sentPtr >> 32), (uint32) sentPtr);
values[2] = CStringGetTextDatum(location);
- if (write.xlogid == 0 && write.xrecoff == 0)
+ if (write == 0)
nulls[3] = true;
snprintf(location, sizeof(location), "%X/%X",
- write.xlogid, write.xrecoff);
+ (uint32) (write >> 32), (uint32) write);
values[3] = CStringGetTextDatum(location);
- if (flush.xlogid == 0 && flush.xrecoff == 0)
+ if (flush == 0)
nulls[4] = true;
snprintf(location, sizeof(location), "%X/%X",
- flush.xlogid, flush.xrecoff);
+ (uint32) (flush >> 32), (uint32) flush);
values[4] = CStringGetTextDatum(location);
- if (apply.xlogid == 0 && apply.xrecoff == 0)
+ if (apply == 0)
nulls[5] = true;
snprintf(location, sizeof(location), "%X/%X",
- apply.xlogid, apply.xrecoff);
+ (uint32) (apply >> 32), (uint32) apply);
values[5] = CStringGetTextDatum(location);
values[6] = Int32GetDatum(sync_priority[i]);