]> granicus.if.org Git - postgresql/blobdiff - src/backend/replication/walsender.c
Introduce timeout handling framework
[postgresql] / src / backend / replication / walsender.c
index 3611713434a439bfb7d3d3481d561274498944f5..37a030b5f5e4e1536c44227672be05ffcff88537 100644 (file)
@@ -63,6 +63,7 @@
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/resowner.h"
+#include "utils/timeout.h"
 #include "utils/timestamp.h"
 
 
@@ -74,27 +75,31 @@ WalSnd         *MyWalSnd = NULL;
 
 /* Global state */
 bool           am_walsender = false;           /* Am I a walsender process ? */
-bool           am_cascading_walsender = false; /* Am I cascading WAL to another standby ? */
+bool           am_cascading_walsender = false;         /* Am I cascading WAL to
+                                                                                                * another standby ? */
 
 /* User-settable parameters for walsender */
 int                    max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
 int                    replication_timeout = 60 * 1000;        /* maximum time to send one
                                                                                                 * WAL data message */
+/*
+ * State for WalSndWakeupRequest
+ */
+bool wake_wal_senders = false;
 
 /*
  * These variables are used similarly to openLogFile/Id/Seg/Off,
  * but for walsender to read the XLOG.
  */
 static int     sendFile = -1;
-static uint32 sendId = 0;
-static uint32 sendSeg = 0;
+static XLogSegNo sendSegNo = 0;
 static uint32 sendOff = 0;
 
 /*
  * How far have we sent WAL already? This is also advertised in
  * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
  */
-static XLogRecPtr sentPtr = {0, 0};
+static XLogRecPtr sentPtr = 0;
 
 /*
  * Buffer for processing reply messages.
@@ -120,7 +125,7 @@ static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
 static bool HandleReplicationCommand(const char *cmd_string);
-static int     WalSndLoop(void);
+static void WalSndLoop(void) __attribute__((noreturn));
 static void InitWalSnd(void);
 static void WalSndHandshake(void);
 static void WalSndKill(int code, Datum arg);
@@ -135,7 +140,7 @@ static void WalSndKeepalive(char *msgbuf);
 
 
 /* Main entry point for walsender process */
-int
+void
 WalSenderMain(void)
 {
        MemoryContext walsnd_context;
@@ -192,7 +197,7 @@ WalSenderMain(void)
        SyncRepInitConfig();
 
        /* Main loop of walsender */
-       return WalSndLoop();
+       WalSndLoop();
 }
 
 /*
@@ -300,8 +305,7 @@ IdentifySystem(void)
 
        logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
 
-       snprintf(xpos, sizeof(xpos), "%X/%X",
-                        logptr.xlogid, logptr.xrecoff);
+       snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
 
        /* Send a RowDescription message */
        pq_beginmessage(&buf, 'T');
@@ -372,31 +376,31 @@ StartReplication(StartReplicationCmd *cmd)
        SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
 
        /*
-        * When promoting a cascading standby, postmaster sends SIGUSR2 to
-        * any cascading walsenders to kill them. But there is a corner-case where
-        * such walsender fails to receive SIGUSR2 and survives a standby promotion
-        * unexpectedly. This happens when postmaster sends SIGUSR2 before
-        * the walsender marks itself as a WAL sender, because postmaster sends
-        * SIGUSR2 to only the processes marked as a WAL sender.
+        * When promoting a cascading standby, postmaster sends SIGUSR2 to any
+        * cascading walsenders to kill them. But there is a corner-case where
+        * such walsender fails to receive SIGUSR2 and survives a standby
+        * promotion unexpectedly. This happens when postmaster sends SIGUSR2
+        * before the walsender marks itself as a WAL sender, because postmaster
+        * sends SIGUSR2 to only the processes marked as a WAL sender.
         *
         * To avoid this corner-case, if recovery is NOT in progress even though
         * the walsender is cascading one, we do the same thing as SIGUSR2 signal
         * handler does, i.e., set walsender_ready_to_stop to true. Which causes
         * the walsender to end later.
         *
-        * When terminating cascading walsenders, usually postmaster writes
-        * the log message announcing the terminations. But there is a race condition
-        * here. If there is no walsender except this process before reaching here,
-        * postmaster thinks that there is no walsender and suppresses that
+        * When terminating cascading walsenders, usually postmaster writes the
+        * log message announcing the terminations. But there is a race condition
+        * here. If there is no walsender except this process before reaching
+        * here, postmaster thinks that there is no walsender and suppresses that
         * log message. To handle this case, we always emit that log message here.
-        * This might cause duplicate log messages, but which is less likely to happen,
-        * so it's not worth writing some code to suppress them.
+        * This might cause duplicate log messages, but which is less likely to
+        * happen, so it's not worth writing some code to suppress them.
         */
        if (am_cascading_walsender && !RecoveryInProgress())
        {
                ereport(LOG,
-                               (errmsg("terminating walsender process to force cascaded standby "
-                                               "to update timeline and reconnect")));
+                  (errmsg("terminating walsender process to force cascaded standby "
+                                  "to update timeline and reconnect")));
                walsender_ready_to_stop = true;
        }
 
@@ -405,8 +409,8 @@ StartReplication(StartReplicationCmd *cmd)
         * log-shipping, since this is checked in PostmasterMain().
         *
         * NOTE: wal_level can only change at shutdown, so in most cases it is
-        * difficult for there to be WAL data that we can still see that was written
-        * at wal_level='minimal'.
+        * difficult for there to be WAL data that we can still see that was
+        * written at wal_level='minimal'.
         */
 
        /*
@@ -613,9 +617,9 @@ ProcessStandbyReplyMessage(void)
        pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
 
        elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
-                reply.write.xlogid, reply.write.xrecoff,
-                reply.flush.xlogid, reply.flush.xrecoff,
-                reply.apply.xlogid, reply.apply.xrecoff);
+                (uint32) (reply.write >> 32), (uint32) reply.write,
+                (uint32) (reply.flush >> 32), (uint32) reply.flush,
+                (uint32) (reply.apply >> 32), (uint32) reply.apply);
 
        /*
         * Update shared state for this WalSender process based on reply data from
@@ -693,7 +697,7 @@ ProcessStandbyHSFeedbackMessage(void)
         * far enough to make reply.xmin wrap around.  In that case the xmin we
         * set here would be "in the future" and have no effect.  No point in
         * worrying about this since it's too late to save the desired data
-        * anyway.  Assuming that the standby sends us an increasing sequence of
+        * anyway.      Assuming that the standby sends us an increasing sequence of
         * xmins, this could only happen during the first reply cycle, else our
         * own xmin would prevent nextXid from advancing so far.
         *
@@ -707,7 +711,7 @@ ProcessStandbyHSFeedbackMessage(void)
 }
 
 /* Main loop of walsender process */
-static int
+static void
 WalSndLoop(void)
 {
        char       *output_message;
@@ -792,8 +796,8 @@ WalSndLoop(void)
                        if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
                        {
                                ereport(DEBUG1,
-                                               (errmsg("standby \"%s\" has now caught up with primary",
-                                                               application_name)));
+                                        (errmsg("standby \"%s\" has now caught up with primary",
+                                                        application_name)));
                                WalSndSetState(WALSNDSTATE_STREAMING);
                        }
 
@@ -810,7 +814,7 @@ WalSndLoop(void)
                                if (caughtup && !pq_is_send_pending())
                                {
                                        walsender_shutdown_requested = true;
-                                       continue;               /* don't want to wait more */
+                                       continue;       /* don't want to wait more */
                                }
                        }
                }
@@ -825,7 +829,7 @@ WalSndLoop(void)
                if (caughtup || pq_is_send_pending())
                {
                        TimestampTz timeout = 0;
-                       long            sleeptime = 10000; /* 10 s */
+                       long            sleeptime = 10000;              /* 10 s */
                        int                     wakeEvents;
 
                        wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
@@ -845,7 +849,7 @@ WalSndLoop(void)
                        if (replication_timeout > 0)
                        {
                                timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
-                                                                                                                 replication_timeout);
+                                                                                                         replication_timeout);
                                sleeptime = 1 + (replication_timeout / 10);
                        }
 
@@ -883,7 +887,7 @@ WalSndLoop(void)
                whereToSendOutput = DestNone;
 
        proc_exit(0);
-       return 1;                                       /* keep the compiler quiet */
+       abort();                                        /* keep the compiler quiet */
 }
 
 /* Initialize a per-walsender data structure for this walsender process */
@@ -973,13 +977,11 @@ WalSndKill(int code, Datum arg)
 void
 XLogRead(char *buf, XLogRecPtr startptr, Size count)
 {
-       char               *p;
+       char       *p;
        XLogRecPtr      recptr;
-       Size                    nbytes;
-       uint32          lastRemovedLog;
-       uint32          lastRemovedSeg;
-       uint32          log;
-       uint32          seg;
+       Size            nbytes;
+       XLogSegNo       lastRemovedSegNo;
+       XLogSegNo       segno;
 
 retry:
        p = buf;
@@ -992,9 +994,9 @@ retry:
                int                     segbytes;
                int                     readbytes;
 
-               startoff = recptr.xrecoff % XLogSegSize;
+               startoff = recptr % XLogSegSize;
 
-               if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
+               if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
                {
                        char            path[MAXPGPATH];
 
@@ -1002,8 +1004,8 @@ retry:
                        if (sendFile >= 0)
                                close(sendFile);
 
-                       XLByteToSeg(recptr, sendId, sendSeg);
-                       XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
+                       XLByteToSeg(recptr, sendSegNo);
+                       XLogFilePath(path, ThisTimeLineID, sendSegNo);
 
                        sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
                        if (sendFile < 0)
@@ -1014,20 +1016,15 @@ retry:
                                 * removed or recycled.
                                 */
                                if (errno == ENOENT)
-                               {
-                                       char            filename[MAXFNAMELEN];
-
-                                       XLogFileName(filename, ThisTimeLineID, sendId, sendSeg);
                                        ereport(ERROR,
                                                        (errcode_for_file_access(),
                                                         errmsg("requested WAL segment %s has already been removed",
-                                                                       filename)));
-                               }
+                                                                       XLogFileNameP(ThisTimeLineID, sendSegNo))));
                                else
                                        ereport(ERROR,
                                                        (errcode_for_file_access(),
-                                                        errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
-                                                                       path, sendId, sendSeg)));
+                                                        errmsg("could not open file \"%s\": %m",
+                                                                       path)));
                        }
                        sendOff = 0;
                }
@@ -1038,8 +1035,9 @@ retry:
                        if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
                                ereport(ERROR,
                                                (errcode_for_file_access(),
-                                                errmsg("could not seek in log file %u, segment %u to offset %u: %m",
-                                                               sendId, sendSeg, startoff)));
+                                                errmsg("could not seek in log segment %s to offset %u: %m",
+                                                               XLogFileNameP(ThisTimeLineID, sendSegNo),
+                                                               startoff)));
                        sendOff = startoff;
                }
 
@@ -1051,11 +1049,13 @@ retry:
 
                readbytes = read(sendFile, p, segbytes);
                if (readbytes <= 0)
+               {
                        ereport(ERROR,
                                        (errcode_for_file_access(),
-                       errmsg("could not read from log file %u, segment %u, offset %u, "
-                                  "length %lu: %m",
-                                  sendId, sendSeg, sendOff, (unsigned long) segbytes)));
+                       errmsg("could not read from log segment %s, offset %u, length %lu: %m",
+                                  XLogFileNameP(ThisTimeLineID, sendSegNo),
+                                  sendOff, (unsigned long) segbytes)));
+               }
 
                /* Update state for read */
                XLByteAdvance(recptr, readbytes);
@@ -1072,24 +1072,18 @@ retry:
         * read() succeeds in that case, but the data we tried to read might
         * already have been overwritten with new WAL records.
         */
-       XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
-       XLByteToSeg(startptr, log, seg);
-       if (log < lastRemovedLog ||
-               (log == lastRemovedLog && seg <= lastRemovedSeg))
-       {
-               char            filename[MAXFNAMELEN];
-
-               XLogFileName(filename, ThisTimeLineID, log, seg);
+       XLogGetLastRemoved(&lastRemovedSegNo);
+       XLByteToSeg(startptr, segno);
+       if (segno <= lastRemovedSegNo)
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("requested WAL segment %s has already been removed",
-                                               filename)));
-       }
+                                               XLogFileNameP(ThisTimeLineID, segno))));
 
        /*
-        * During recovery, the currently-open WAL file might be replaced with
-        * the file of the same name retrieved from archive. So we always need
-        * to check what we read was valid after reading into the buffer. If it's
+        * During recovery, the currently-open WAL file might be replaced with the
+        * file of the same name retrieved from archive. So we always need to
+        * check what we read was valid after reading into the buffer. If it's
         * invalid, we try to open and read the file again.
         */
        if (am_cascading_walsender)
@@ -1164,25 +1158,8 @@ XLogSend(char *msgbuf, bool *caughtup)
         * SendRqstPtr never points to the middle of a WAL record.
         */
        startptr = sentPtr;
-       if (startptr.xrecoff >= XLogFileSize)
-       {
-               /*
-                * crossing a logid boundary, skip the non-existent last log segment
-                * in previous logical log file.
-                */
-               startptr.xlogid += 1;
-               startptr.xrecoff = 0;
-       }
-
        endptr = startptr;
        XLByteAdvance(endptr, MAX_SEND_SIZE);
-       if (endptr.xlogid != startptr.xlogid)
-       {
-               /* Don't cross a logfile boundary within one message */
-               Assert(endptr.xlogid == startptr.xlogid + 1);
-               endptr.xlogid = startptr.xlogid;
-               endptr.xrecoff = XLogFileSize;
-       }
 
        /* if we went beyond SendRqstPtr, back off */
        if (XLByteLE(SendRqstPtr, endptr))
@@ -1193,11 +1170,11 @@ XLogSend(char *msgbuf, bool *caughtup)
        else
        {
                /* round down to page boundary. */
-               endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
+               endptr -= (endptr % XLOG_BLCKSZ);
                *caughtup = false;
        }
 
-       nbytes = endptr.xrecoff - startptr.xrecoff;
+       nbytes = endptr - startptr;
        Assert(nbytes <= MAX_SEND_SIZE);
 
        /*
@@ -1241,7 +1218,7 @@ XLogSend(char *msgbuf, bool *caughtup)
                char            activitymsg[50];
 
                snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
-                                sentPtr.xlogid, sentPtr.xrecoff);
+                                (uint32) (sentPtr >> 32), (uint32) sentPtr);
                set_ps_display(activitymsg, false);
        }
 
@@ -1294,8 +1271,8 @@ WalSndShutdownHandler(SIGNAL_ARGS)
                SetLatch(&MyWalSnd->latch);
 
        /*
-        * Set the standard (non-walsender) state as well, so that we can
-        * abort things like do_pg_stop_backup().
+        * Set the standard (non-walsender) state as well, so that we can abort
+        * things like do_pg_stop_backup().
         */
        InterruptPending = true;
        ProcDiePending = true;
@@ -1369,7 +1346,7 @@ WalSndSignals(void)
        pqsignal(SIGINT, SIG_IGN);      /* not used */
        pqsignal(SIGTERM, WalSndShutdownHandler);       /* request shutdown */
        pqsignal(SIGQUIT, WalSndQuickDieHandler);       /* hard crash time */
-       pqsignal(SIGALRM, SIG_IGN);
+       InitializeTimeouts();           /* establishes SIGALRM handler */
        pqsignal(SIGPIPE, SIG_IGN);
        pqsignal(SIGUSR1, WalSndXLogSendHandler);       /* request WAL sending */
        pqsignal(SIGUSR2, WalSndLastCycleHandler);      /* request a last cycle and
@@ -1410,7 +1387,8 @@ WalSndShmemInit(void)
                /* First time through, so initialize */
                MemSet(WalSndCtl, 0, WalSndShmemSize());
 
-               SHMQueueInit(&(WalSndCtl->SyncRepQueue));
+               for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+                       SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
 
                for (i = 0; i < max_wal_senders; i++)
                {
@@ -1422,7 +1400,12 @@ WalSndShmemInit(void)
        }
 }
 
-/* Wake up all walsenders */
+/*
+ * Wake up all walsenders
+ *
+ * This will be called inside critical sections, so throwing an error is not
+ * adviseable.
+ */
 void
 WalSndWakeup(void)
 {
@@ -1528,12 +1511,19 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 
                if (walsnd->pid != 0)
                {
-                       sync_priority[i] = walsnd->sync_standby_priority;
+                       /*
+                        * Treat a standby such as a pg_basebackup background process
+                        * which always returns an invalid flush location, as an
+                        * asynchronous standby.
+                        */
+                       sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
+                               0 : walsnd->sync_standby_priority;
 
                        if (walsnd->state == WALSNDSTATE_STREAMING &&
                                walsnd->sync_standby_priority > 0 &&
                                (priority == 0 ||
-                                priority > walsnd->sync_standby_priority))
+                                priority > walsnd->sync_standby_priority) &&
+                               !XLogRecPtrIsInvalid(walsnd->flush))
                        {
                                priority = walsnd->sync_standby_priority;
                                sync_standby = i;
@@ -1582,25 +1572,25 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                        values[1] = CStringGetTextDatum(WalSndGetStateString(state));
 
                        snprintf(location, sizeof(location), "%X/%X",
-                                        sentPtr.xlogid, sentPtr.xrecoff);
+                                        (uint32) (sentPtr >> 32), (uint32) sentPtr);
                        values[2] = CStringGetTextDatum(location);
 
-                       if (write.xlogid == 0 && write.xrecoff == 0)
+                       if (write == 0)
                                nulls[3] = true;
                        snprintf(location, sizeof(location), "%X/%X",
-                                        write.xlogid, write.xrecoff);
+                                        (uint32) (write >> 32), (uint32) write);
                        values[3] = CStringGetTextDatum(location);
 
-                       if (flush.xlogid == 0 && flush.xrecoff == 0)
+                       if (flush == 0)
                                nulls[4] = true;
                        snprintf(location, sizeof(location), "%X/%X",
-                                        flush.xlogid, flush.xrecoff);
+                                        (uint32) (flush >> 32), (uint32) flush);
                        values[4] = CStringGetTextDatum(location);
 
-                       if (apply.xlogid == 0 && apply.xrecoff == 0)
+                       if (apply == 0)
                                nulls[5] = true;
                        snprintf(location, sizeof(location), "%X/%X",
-                                        apply.xlogid, apply.xrecoff);
+                                        (uint32) (apply >> 32), (uint32) apply);
                        values[5] = CStringGetTextDatum(location);
 
                        values[6] = Int32GetDatum(sync_priority[i]);