]> granicus.if.org Git - postgresql/commitdiff
Put the logic to decide which synchronous standby is active into a function.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 12 Dec 2014 11:39:36 +0000 (13:39 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 12 Dec 2014 12:26:42 +0000 (14:26 +0200)
This avoids duplicating the code.

Michael Paquier, reviewed by Simon Riggs and me

src/backend/replication/syncrep.c
src/backend/replication/walsender.c
src/include/replication/syncrep.h

index aa54bfba6cf87f4a09e6f15b777ecab4c12a8735..a2b17f151639fec5d7f8c3a67826ed89720dffd9 100644 (file)
@@ -5,7 +5,7 @@
  * Synchronous replication is new as of PostgreSQL 9.1.
  *
  * If requested, transaction commits wait until their commit LSN is
- * acknowledged by the sync standby.
+ * acknowledged by the synchronous standby.
  *
  * This module contains the code for waiting and release of backends.
  * All code in this module executes on the primary. The core streaming
@@ -357,6 +357,60 @@ SyncRepInitConfig(void)
        }
 }
 
+/*
+ * Find the WAL sender servicing the synchronous standby with the lowest
+ * priority value, or NULL if no synchronous standby is connected. If there
+ * are multiple standbys with the same lowest priority value, the first one
+ * found is selected. The caller must hold SyncRepLock.
+ */
+WalSnd *
+SyncRepGetSynchronousStandby(void)
+{
+       WalSnd     *result = NULL;
+       int                     result_priority = 0;
+       int                     i;
+
+       for (i = 0; i < max_wal_senders; i++)
+       {
+               /* Use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+               int                     this_priority;
+
+               /* Must be active */
+               if (walsnd->pid == 0)
+                       continue;
+
+               /* Must be streaming */
+               if (walsnd->state != WALSNDSTATE_STREAMING)
+                       continue;
+
+               /* Must be synchronous */
+               this_priority = walsnd->sync_standby_priority;
+               if (this_priority == 0)
+                       continue;
+
+               /* Must have a lower priority value than any previous ones */
+               if (result != NULL && result_priority <= this_priority)
+                       continue;
+
+               /* Must have a valid flush position */
+               if (XLogRecPtrIsInvalid(walsnd->flush))
+                       continue;
+
+               result = (WalSnd *) walsnd;
+               result_priority = this_priority;
+
+               /*
+                * If priority is equal to 1, there cannot be any other WAL senders
+                * with a lower priority, so we're done.
+                */
+               if (this_priority == 1)
+                       return result;
+       }
+
+       return result;
+}
+
 /*
  * Update the LSNs on each queue based upon our latest state. This
  * implements a simple policy of first-valid-standby-releases-waiter.
@@ -368,11 +422,9 @@ void
 SyncRepReleaseWaiters(void)
 {
        volatile WalSndCtlData *walsndctl = WalSndCtl;
-       volatile WalSnd *syncWalSnd = NULL;
+       WalSnd     *syncWalSnd;
        int                     numwrite = 0;
        int                     numflush = 0;
-       int                     priority = 0;
-       int                     i;
 
        /*
         * If this WALSender is serving a standby that is not on the list of
@@ -387,33 +439,13 @@ SyncRepReleaseWaiters(void)
 
        /*
         * We're a potential sync standby. Release waiters if we are the highest
-        * priority standby. If there are multiple standbys with same priorities
-        * then we use the first mentioned standby. If you change this, also
-        * change pg_stat_get_wal_senders().
+        * priority standby.
         */
        LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+       syncWalSnd = SyncRepGetSynchronousStandby();
 
-       for (i = 0; i < max_wal_senders; i++)
-       {
-               /* use volatile pointer to prevent code rearrangement */
-               volatile WalSnd *walsnd = &walsndctl->walsnds[i];
-
-               if (walsnd->pid != 0 &&
-                       walsnd->state == WALSNDSTATE_STREAMING &&
-                       walsnd->sync_standby_priority > 0 &&
-                       (priority == 0 ||
-                        priority > walsnd->sync_standby_priority) &&
-                       !XLogRecPtrIsInvalid(walsnd->flush))
-               {
-                       priority = walsnd->sync_standby_priority;
-                       syncWalSnd = walsnd;
-               }
-       }
-
-       /*
-        * We should have found ourselves at least.
-        */
-       Assert(syncWalSnd);
+       /* We should have found ourselves at least */
+       Assert(syncWalSnd != NULL);
 
        /*
         * If we aren't managing the highest priority standby then just leave.
index addae8f6ce512e6864548d86ec91a1daaae58812..5937cbbb026b8443f72687b30d14ee589b068f6c 100644 (file)
@@ -2741,9 +2741,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
        Tuplestorestate *tupstore;
        MemoryContext per_query_ctx;
        MemoryContext oldcontext;
-       int                *sync_priority;
-       int                     priority = 0;
-       int                     sync_standby = -1;
+       WalSnd     *sync_standby;
        int                     i;
 
        /* check to see if caller supports us returning a tuplestore */
@@ -2772,38 +2770,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
        MemoryContextSwitchTo(oldcontext);
 
        /*
-        * Get the priorities of sync standbys all in one go, to minimise lock
-        * acquisitions and to allow us to evaluate who is the current sync
-        * standby. This code must match the code in SyncRepReleaseWaiters().
+        * Get the currently active synchronous standby.
         */
-       sync_priority = palloc(sizeof(int) * max_wal_senders);
        LWLockAcquire(SyncRepLock, LW_SHARED);
-       for (i = 0; i < max_wal_senders; i++)
-       {
-               /* use volatile pointer to prevent code rearrangement */
-               volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
-
-               if (walsnd->pid != 0)
-               {
-                       /*
-                        * Treat a standby such as a pg_basebackup background process
-                        * which always returns an invalid flush location, as an
-                        * asynchronous standby.
-                        */
-                       sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
-                               0 : walsnd->sync_standby_priority;
-
-                       if (walsnd->state == WALSNDSTATE_STREAMING &&
-                               walsnd->sync_standby_priority > 0 &&
-                               (priority == 0 ||
-                                priority > walsnd->sync_standby_priority) &&
-                               !XLogRecPtrIsInvalid(walsnd->flush))
-                       {
-                               priority = walsnd->sync_standby_priority;
-                               sync_standby = i;
-                       }
-               }
-       }
+       sync_standby = SyncRepGetSynchronousStandby();
        LWLockRelease(SyncRepLock);
 
        for (i = 0; i < max_wal_senders; i++)
@@ -2814,6 +2784,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                XLogRecPtr      write;
                XLogRecPtr      flush;
                XLogRecPtr      apply;
+               int                     priority;
                WalSndState state;
                Datum           values[PG_STAT_GET_WAL_SENDERS_COLS];
                bool            nulls[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -2827,6 +2798,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                write = walsnd->write;
                flush = walsnd->flush;
                apply = walsnd->apply;
+               priority = walsnd->sync_standby_priority;
                SpinLockRelease(&walsnd->mutex);
 
                memset(nulls, 0, sizeof(nulls));
@@ -2857,15 +2829,22 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                                nulls[5] = true;
                        values[5] = LSNGetDatum(apply);
 
-                       values[6] = Int32GetDatum(sync_priority[i]);
+                       /*
+                        * Treat a standby such as a pg_basebackup background process
+                        * which always returns an invalid flush location, as an
+                        * asynchronous standby.
+                        */
+                       priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
+
+                       values[6] = Int32GetDatum(priority);
 
                        /*
                         * More easily understood version of standby state. This is purely
                         * informational, not different from priority.
                         */
-                       if (sync_priority[i] == 0)
+                       if (priority == 0)
                                values[7] = CStringGetTextDatum("async");
-                       else if (i == sync_standby)
+                       else if (walsnd == sync_standby)
                                values[7] = CStringGetTextDatum("sync");
                        else
                                values[7] = CStringGetTextDatum("potential");
@@ -2873,7 +2852,6 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 
                tuplestore_putvalues(tupstore, tupdesc, values, nulls);
        }
-       pfree(sync_priority);
 
        /* clean up and return the tuplestore */
        tuplestore_donestoring(tupstore);
index 7eeaf3b04c53a9a79b30f3f6c3d6bc1177fb5ef8..6f78fee47337d08bedb30cc97f31b146a79375d6 100644 (file)
@@ -50,6 +50,10 @@ extern void SyncRepUpdateSyncStandbysDefined(void);
 /* called by various procs */
 extern int     SyncRepWakeQueue(bool all, int mode);
 
+/* forward declaration to avoid pulling in walsender_private.h */
+struct WalSnd;
+extern struct WalSnd *SyncRepGetSynchronousStandby(void);
+
 extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
 extern void assign_synchronous_commit(int newval, void *extra);