]> granicus.if.org Git - postgresql/commitdiff
Make walsender more responsive.
authorRobert Haas <rhaas@postgresql.org>
Mon, 2 Jul 2012 13:36:34 +0000 (09:36 -0400)
committerRobert Haas <rhaas@postgresql.org>
Mon, 2 Jul 2012 13:41:01 +0000 (09:41 -0400)
Per testing by Andres Freund, this improves replication performance
and reduces replication latency and latency jitter.  I was a bit
concerned about moving more work into XLogInsert, but testing seems
to show that it's not a problem in practice.

Along the way, improve comments for WaitLatchOrSocket.

Andres Freund.  Review and stylistic cleanup by me.

src/backend/access/transam/twophase.c
src/backend/access/transam/xact.c
src/backend/access/transam/xlog.c
src/backend/port/unix_latch.c
src/backend/port/win32_latch.c
src/backend/replication/walsender.c
src/include/replication/walsender.h

index e8fb78b33119bc3449fdc2a70977eabd87fdf623..7f198c2e3e0478c0719023673781e44372cd5a69 100644 (file)
@@ -1042,13 +1042,6 @@ EndPrepare(GlobalTransaction gxact)
 
        /* If we crash now, we have prepared: WAL replay will fix things */
 
-       /*
-        * Wake up all walsenders to send WAL up to the PREPARE record immediately
-        * if replication is enabled
-        */
-       if (max_wal_senders > 0)
-               WalSndWakeup();
-
        /* write correct CRC and close file */
        if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
        {
@@ -2045,13 +2038,6 @@ RecordTransactionCommitPrepared(TransactionId xid,
        /* Flush XLOG to disk */
        XLogFlush(recptr);
 
-       /*
-        * Wake up all walsenders to send WAL up to the COMMIT PREPARED record
-        * immediately if replication is enabled
-        */
-       if (max_wal_senders > 0)
-               WalSndWakeup();
-
        /* Mark the transaction committed in pg_clog */
        TransactionIdCommitTree(xid, nchildren, children);
 
@@ -2132,13 +2118,6 @@ RecordTransactionAbortPrepared(TransactionId xid,
        /* Always flush, since we're about to remove the 2PC state file */
        XLogFlush(recptr);
 
-       /*
-        * Wake up all walsenders to send WAL up to the ABORT PREPARED record
-        * immediately if replication is enabled
-        */
-       if (max_wal_senders > 0)
-               WalSndWakeup();
-
        /*
         * Mark the transaction aborted in clog.  This is not absolutely necessary
         * but we may as well do it while we are here.
index 4755ee6ee4081895073ce373e8092b129a9c55fb..86b1afa80d9330bcf5ac5adb223b7ebc00c08f53 100644 (file)
@@ -1141,13 +1141,6 @@ RecordTransactionCommit(void)
 
                XLogFlush(XactLastRecEnd);
 
-               /*
-                * Wake up all walsenders to send WAL up to the COMMIT record
-                * immediately if replication is enabled
-                */
-               if (max_wal_senders > 0)
-                       WalSndWakeup();
-
                /*
                 * Now we may update the CLOG, if we wrote a COMMIT record above
                 */
index cbfa68a4e7b98a9e0b18a69e2edc604b2f73bb73..a43e2eeaf306eb15146abf8e7a253fa9f38cdb50 100644 (file)
@@ -1025,6 +1025,8 @@ begin:;
 
                END_CRIT_SECTION();
 
+               /* wakeup the WalSnd now that we released the WALWriteLock */
+               WalSndWakeupProcessRequests();
                return RecPtr;
        }
 
@@ -1208,6 +1210,9 @@ begin:;
 
        END_CRIT_SECTION();
 
+       /* wakeup the WalSnd now that we outside contented locks */
+       WalSndWakeupProcessRequests();
+
        return RecPtr;
 }
 
@@ -1792,6 +1797,10 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
                        if (finishing_seg || (xlog_switch && last_iteration))
                        {
                                issue_xlog_fsync(openLogFile, openLogSegNo);
+
+                               /* signal that we need to wakeup WalSnd later */
+                               WalSndWakeupRequest();
+
                                LogwrtResult.Flush = LogwrtResult.Write;                /* end of page */
 
                                if (XLogArchivingActive())
@@ -1854,7 +1863,11 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
                                openLogFile = XLogFileOpen(openLogSegNo);
                                openLogOff = 0;
                        }
+
                        issue_xlog_fsync(openLogFile, openLogSegNo);
+
+                       /* signal that we need to wakeup WalSnd later */
+                       WalSndWakeupRequest();
                }
                LogwrtResult.Flush = LogwrtResult.Write;
        }
@@ -2120,6 +2133,9 @@ XLogFlush(XLogRecPtr record)
 
        END_CRIT_SECTION();
 
+       /* wakeup the WalSnd now that we released the WALWriteLock */
+       WalSndWakeupProcessRequests();
+
        /*
         * If we still haven't flushed to the request point then we have a
         * problem; most likely, the requested flush point is past end of XLOG.
@@ -2245,13 +2261,8 @@ XLogBackgroundFlush(void)
 
        END_CRIT_SECTION();
 
-       /*
-        * If we wrote something then we have something to send to standbys also,
-        * otherwise the replication delay become around 7s with just async
-        * commit.
-        */
-       if (wrote_something)
-               WalSndWakeup();
+       /* wakeup the WalSnd now that we released the WALWriteLock */
+       WalSndWakeupProcessRequests();
 
        return wrote_something;
 }
index 65b2fc56e0361cf76c0e8d4f7b8e3ed602cee5a0..335e9f66afb52163470ee9b69119b7fa16d2bc1c 100644 (file)
@@ -418,6 +418,9 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
  * NB: when calling this in a signal handler, be sure to save and restore
  * errno around it.  (That's standard practice in most signal handlers, of
  * course, but we used to omit it in handlers that only set a flag.)
+ *
+ * NB: this function is called from critical sections and signal handlers so
+ * throwing an error is not a good idea.
  */
 void
 SetLatch(volatile Latch *latch)
index eb46dcad1ba360d84ba2ba614b920965e2f32524..1f1ed33dc2d358ce27d7d769f4e6849d7d2d810d 100644 (file)
@@ -247,6 +247,10 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
        return result;
 }
 
+/*
+ * The comments above the unix implementation (unix_latch.c) of this function
+ * apply here as well.
+ */
 void
 SetLatch(volatile Latch *latch)
 {
index 616d4e73e3b66b64a60de6fecc787ef56c0ecaeb..912ce9d450380682e064f241b857bf608c375359 100644 (file)
@@ -81,6 +81,10 @@ bool         am_cascading_walsender = false;         /* Am I cascading WAL to
 int                    max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
 int                    replication_timeout = 60 * 1000;        /* maximum time to send one
                                                                                                 * WAL data message */
+/*
+ * State for WalSndWakeupRequest
+ */
+bool wake_wal_senders = false;
 
 /*
  * These variables are used similarly to openLogFile/Id/Seg/Off,
@@ -1395,7 +1399,12 @@ WalSndShmemInit(void)
        }
 }
 
-/* Wake up all walsenders */
+/*
+ * Wake up all walsenders
+ *
+ * This will be called inside critical sections, so throwing an error is not
+ * adviseable.
+ */
 void
 WalSndWakeup(void)
 {
index 65536016c28bf31be0b065c8aa0678bb0172ca86..bb85ccf7b22cea2a927a82be0ba6219c6fb5efe8 100644 (file)
@@ -21,6 +21,7 @@ extern bool am_walsender;
 extern bool am_cascading_walsender;
 extern volatile sig_atomic_t walsender_shutdown_requested;
 extern volatile sig_atomic_t walsender_ready_to_stop;
+extern bool wake_wal_senders;
 
 /* user-settable parameters */
 extern int     max_wal_senders;
@@ -35,4 +36,27 @@ extern void WalSndRqstFileReload(void);
 
 extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);
 
+/*
+ * Remember that we want to wakeup walsenders later
+ *
+ * This is separated from doing the actual wakeup because the writeout is done
+ * while holding contended locks.
+ */
+#define WalSndWakeupRequest() \
+       do { wake_wal_senders = true; } while (0)
+
+/*
+ * wakeup walsenders if there is work to be done
+ */
+#define WalSndWakeupProcessRequests()          \
+       do                                                                              \
+       {                                                                               \
+               if (wake_wal_senders)                           \
+               {                                                                       \
+                       wake_wal_senders = false;               \
+                       if (max_wal_senders > 0)                \
+                               WalSndWakeup();                         \
+               }                                                                       \
+       } while (0)
+
 #endif   /* _WALSENDER_H */