]> granicus.if.org Git - postgresql/blobdiff - src/backend/replication/walsender.c
Introduce timeout handling framework
[postgresql] / src / backend / replication / walsender.c
index 2c04df08ed1874499543bdb410b8d45dfa357774..37a030b5f5e4e1536c44227672be05ffcff88537 100644 (file)
@@ -63,6 +63,7 @@
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/resowner.h"
+#include "utils/timeout.h"
 #include "utils/timestamp.h"
 
 
@@ -81,6 +82,10 @@ bool         am_cascading_walsender = false;         /* Am I cascading WAL to
 int                    max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
 int                    replication_timeout = 60 * 1000;        /* maximum time to send one
                                                                                                 * WAL data message */
+/*
+ * State for WalSndWakeupRequest
+ */
+bool wake_wal_senders = false;
 
 /*
  * These variables are used similarly to openLogFile/Id/Seg/Off,
@@ -94,7 +99,7 @@ static uint32 sendOff = 0;
  * How far have we sent WAL already? This is also advertised in
  * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
  */
-static XLogRecPtr sentPtr = {0, 0};
+static XLogRecPtr sentPtr = 0;
 
 /*
  * Buffer for processing reply messages.
@@ -120,7 +125,7 @@ static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
 static bool HandleReplicationCommand(const char *cmd_string);
-static int     WalSndLoop(void);
+static void WalSndLoop(void) __attribute__((noreturn));
 static void InitWalSnd(void);
 static void WalSndHandshake(void);
 static void WalSndKill(int code, Datum arg);
@@ -135,7 +140,7 @@ static void WalSndKeepalive(char *msgbuf);
 
 
 /* Main entry point for walsender process */
-int
+void
 WalSenderMain(void)
 {
        MemoryContext walsnd_context;
@@ -192,7 +197,7 @@ WalSenderMain(void)
        SyncRepInitConfig();
 
        /* Main loop of walsender */
-       return WalSndLoop();
+       WalSndLoop();
 }
 
 /*
@@ -300,8 +305,7 @@ IdentifySystem(void)
 
        logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
 
-       snprintf(xpos, sizeof(xpos), "%X/%X",
-                        logptr.xlogid, logptr.xrecoff);
+       snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
 
        /* Send a RowDescription message */
        pq_beginmessage(&buf, 'T');
@@ -613,9 +617,9 @@ ProcessStandbyReplyMessage(void)
        pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
 
        elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
-                reply.write.xlogid, reply.write.xrecoff,
-                reply.flush.xlogid, reply.flush.xrecoff,
-                reply.apply.xlogid, reply.apply.xrecoff);
+                (uint32) (reply.write >> 32), (uint32) reply.write,
+                (uint32) (reply.flush >> 32), (uint32) reply.flush,
+                (uint32) (reply.apply >> 32), (uint32) reply.apply);
 
        /*
         * Update shared state for this WalSender process based on reply data from
@@ -707,7 +711,7 @@ ProcessStandbyHSFeedbackMessage(void)
 }
 
 /* Main loop of walsender process */
-static int
+static void
 WalSndLoop(void)
 {
        char       *output_message;
@@ -883,7 +887,7 @@ WalSndLoop(void)
                whereToSendOutput = DestNone;
 
        proc_exit(0);
-       return 1;                                       /* keep the compiler quiet */
+       abort();                                        /* keep the compiler quiet */
 }
 
 /* Initialize a per-walsender data structure for this walsender process */
@@ -990,7 +994,7 @@ retry:
                int                     segbytes;
                int                     readbytes;
 
-               startoff = recptr.xrecoff % XLogSegSize;
+               startoff = recptr % XLogSegSize;
 
                if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
                {
@@ -1156,12 +1160,6 @@ XLogSend(char *msgbuf, bool *caughtup)
        startptr = sentPtr;
        endptr = startptr;
        XLByteAdvance(endptr, MAX_SEND_SIZE);
-       if (endptr.xlogid != startptr.xlogid)
-       {
-               /* Don't cross a logfile boundary within one message */
-               Assert(endptr.xlogid == startptr.xlogid + 1);
-               endptr.xrecoff = 0;
-       }
 
        /* if we went beyond SendRqstPtr, back off */
        if (XLByteLE(SendRqstPtr, endptr))
@@ -1172,14 +1170,11 @@ XLogSend(char *msgbuf, bool *caughtup)
        else
        {
                /* round down to page boundary. */
-               endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
+               endptr -= (endptr % XLOG_BLCKSZ);
                *caughtup = false;
        }
 
-       if (endptr.xrecoff == 0)
-               nbytes = 0x100000000L - (uint64) startptr.xrecoff;
-       else
-               nbytes = endptr.xrecoff - startptr.xrecoff;
+       nbytes = endptr - startptr;
        Assert(nbytes <= MAX_SEND_SIZE);
 
        /*
@@ -1223,7 +1218,7 @@ XLogSend(char *msgbuf, bool *caughtup)
                char            activitymsg[50];
 
                snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
-                                sentPtr.xlogid, sentPtr.xrecoff);
+                                (uint32) (sentPtr >> 32), (uint32) sentPtr);
                set_ps_display(activitymsg, false);
        }
 
@@ -1351,7 +1346,7 @@ WalSndSignals(void)
        pqsignal(SIGINT, SIG_IGN);      /* not used */
        pqsignal(SIGTERM, WalSndShutdownHandler);       /* request shutdown */
        pqsignal(SIGQUIT, WalSndQuickDieHandler);       /* hard crash time */
-       pqsignal(SIGALRM, SIG_IGN);
+       InitializeTimeouts();           /* establishes SIGALRM handler */
        pqsignal(SIGPIPE, SIG_IGN);
        pqsignal(SIGUSR1, WalSndXLogSendHandler);       /* request WAL sending */
        pqsignal(SIGUSR2, WalSndLastCycleHandler);      /* request a last cycle and
@@ -1405,7 +1400,12 @@ WalSndShmemInit(void)
        }
 }
 
-/* Wake up all walsenders */
+/*
+ * Wake up all walsenders
+ *
+ * This will be called inside critical sections, so throwing an error is not
+ * adviseable.
+ */
 void
 WalSndWakeup(void)
 {
@@ -1511,12 +1511,19 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 
                if (walsnd->pid != 0)
                {
-                       sync_priority[i] = walsnd->sync_standby_priority;
+                       /*
+                        * Treat a standby such as a pg_basebackup background process
+                        * which always returns an invalid flush location, as an
+                        * asynchronous standby.
+                        */
+                       sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
+                               0 : walsnd->sync_standby_priority;
 
                        if (walsnd->state == WALSNDSTATE_STREAMING &&
                                walsnd->sync_standby_priority > 0 &&
                                (priority == 0 ||
-                                priority > walsnd->sync_standby_priority))
+                                priority > walsnd->sync_standby_priority) &&
+                               !XLogRecPtrIsInvalid(walsnd->flush))
                        {
                                priority = walsnd->sync_standby_priority;
                                sync_standby = i;
@@ -1565,25 +1572,25 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                        values[1] = CStringGetTextDatum(WalSndGetStateString(state));
 
                        snprintf(location, sizeof(location), "%X/%X",
-                                        sentPtr.xlogid, sentPtr.xrecoff);
+                                        (uint32) (sentPtr >> 32), (uint32) sentPtr);
                        values[2] = CStringGetTextDatum(location);
 
-                       if (write.xlogid == 0 && write.xrecoff == 0)
+                       if (write == 0)
                                nulls[3] = true;
                        snprintf(location, sizeof(location), "%X/%X",
-                                        write.xlogid, write.xrecoff);
+                                        (uint32) (write >> 32), (uint32) write);
                        values[3] = CStringGetTextDatum(location);
 
-                       if (flush.xlogid == 0 && flush.xrecoff == 0)
+                       if (flush == 0)
                                nulls[4] = true;
                        snprintf(location, sizeof(location), "%X/%X",
-                                        flush.xlogid, flush.xrecoff);
+                                        (uint32) (flush >> 32), (uint32) flush);
                        values[4] = CStringGetTextDatum(location);
 
-                       if (apply.xlogid == 0 && apply.xrecoff == 0)
+                       if (apply == 0)
                                nulls[5] = true;
                        snprintf(location, sizeof(location), "%X/%X",
-                                        apply.xlogid, apply.xrecoff);
+                                        (uint32) (apply >> 32), (uint32) apply);
                        values[5] = CStringGetTextDatum(location);
 
                        values[6] = Int32GetDatum(sync_priority[i]);