]> granicus.if.org Git - postgresql/commitdiff
Support quorum-based synchronous replication.
authorFujii Masao <fujii@postgresql.org>
Mon, 19 Dec 2016 12:15:30 +0000 (21:15 +0900)
committerFujii Masao <fujii@postgresql.org>
Mon, 19 Dec 2016 12:15:30 +0000 (21:15 +0900)
This feature is also known as "quorum commit" especially in discussion
on pgsql-hackers.

This commit adds the following new syntaxes into synchronous_standby_names
GUC. By using FIRST and ANY keywords, users can specify the method to
choose synchronous standbys from the listed servers.

  FIRST num_sync (standby_name [, ...])
  ANY num_sync (standby_name [, ...])

The keyword FIRST specifies a priority-based synchronous replication
which was available also in 9.6 or before. This method makes transaction
commits wait until their WAL records are replicated to num_sync
synchronous standbys chosen based on their priorities.

The keyword ANY specifies a quorum-based synchronous replication
and makes transaction commits wait until their WAL records are
replicated to *at least* num_sync listed standbys. In this method,
the values of sync_state.pg_stat_replication for the listed standbys
are reported as "quorum". The priority is still assigned to each standby,
but not used in this method.

The existing syntaxes having neither FIRST nor ANY keyword are still
supported. They are the same as new syntax with FIRST keyword, i.e.,
a priorirty-based synchronous replication.

Author: Masahiko Sawada
Reviewed-By: Michael Paquier, Amit Kapila and me
Discussion: <CAD21AoAACi9NeC_ecm+Vahm+MMA6nYh=Kqs3KB3np+MBOS_gZg@mail.gmail.com>

Many thanks to the various individuals who were involved in
discussing and developing this feature.

doc/src/sgml/config.sgml
doc/src/sgml/high-availability.sgml
doc/src/sgml/monitoring.sgml
src/backend/replication/Makefile
src/backend/replication/syncrep.c
src/backend/replication/syncrep_gram.y
src/backend/replication/syncrep_scanner.l
src/backend/replication/walsender.c
src/backend/utils/misc/postgresql.conf.sample
src/include/replication/syncrep.h
src/test/recovery/t/007_sync_rep.pl

index 3b614b6ecdf2bedcfa65fee04b813a18a1e5114b..1b98c416e068f8b7ac80b2505ff01d91e6d9cd3f 100644 (file)
@@ -3054,41 +3054,71 @@ include_dir 'conf.d'
         transactions waiting for commit will be allowed to proceed after
         these standby servers confirm receipt of their data.
         The synchronous standbys will be those whose names appear
-        earlier in this list, and
+        in this list, and
         that are both currently connected and streaming data in real-time
         (as shown by a state of <literal>streaming</literal> in the
         <link linkend="monitoring-stats-views-table">
         <literal>pg_stat_replication</></link> view).
-        Other standby servers appearing later in this list represent potential
-        synchronous standbys. If any of the current synchronous
-        standbys disconnects for whatever reason,
-        it will be replaced immediately with the next-highest-priority standby.
-        Specifying more than one standby name can allow very high availability.
+        Specifying more than one standby names can allow very high availability.
        </para>
        <para>
         This parameter specifies a list of standby servers using
         either of the following syntaxes:
 <synopsis>
-<replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">standby_name</replaceable> [, ...] )
+[FIRST] <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">standby_name</replaceable> [, ...] )
+ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">standby_name</replaceable> [, ...] )
 <replaceable class="parameter">standby_name</replaceable> [, ...]
 </synopsis>
         where <replaceable class="parameter">num_sync</replaceable> is
         the number of synchronous standbys that transactions need to
         wait for replies from,
         and <replaceable class="parameter">standby_name</replaceable>
-        is the name of a standby server. For example, a setting of
-        <literal>3 (s1, s2, s3, s4)</> makes transaction commits wait
-        until their WAL records are received by three higher-priority standbys
-        chosen from standby servers <literal>s1</>, <literal>s2</>,
-        <literal>s3</> and <literal>s4</>.
-        </para>
-        <para>
-        The second syntax was used before <productname>PostgreSQL</>
+        is the name of a standby server.
+        <literal>FIRST</> and <literal>ANY</> specify the method to choose
+        synchronous standbys from the listed servers.
+       </para>
+       <para>
+        The keyword <literal>FIRST</>, coupled with
+        <replaceable class="parameter">num_sync</replaceable>, specifies a
+        priority-based synchronous replication and makes transaction commits
+        wait until their WAL records are replicated to
+        <replaceable class="parameter">num_sync</replaceable> synchronous
+        standbys chosen based on their priorities. For example, a setting of
+        <literal>FIRST 3 (s1, s2, s3, s4)</> will cause each commit to wait for
+        replies from three higher-priority standbys chosen from standby servers
+        <literal>s1</>, <literal>s2</>, <literal>s3</> and <literal>s4</>.
+        The standbys whose names appear earlier in the list are given higher
+        priority and will be considered as synchronous. Other standby servers
+        appearing later in this list represent potential synchronous standbys.
+        If any of the current synchronous standbys disconnects for whatever
+        reason, it will be replaced immediately with the next-highest-priority
+        standby. The keyword <literal>FIRST</> is optional.
+       </para>
+       <para>
+        The keyword <literal>ANY</>, coupled with
+        <replaceable class="parameter">num_sync</replaceable>, specifies a
+        quorum-based synchronous replication and makes transaction commits
+        wait until their WAL records are replicated to <emphasis>at least</>
+        <replaceable class="parameter">num_sync</replaceable> listed standbys.
+        For example, a setting of <literal>ANY 3 (s1, s2, s3, s4)</> will cause
+        each commit to proceed as soon as at least any three standbys of
+        <literal>s1</>, <literal>s2</>, <literal>s3</> and <literal>s4</>
+        reply.
+       </para>
+       <para>
+        <literal>FIRST</> and <literal>ANY</> are case-insensitive. If these
+        keywords are used as the name of a standby server,
+        its <replaceable class="parameter">standby_name</replaceable> must
+        be double-quoted.
+       </para>
+       <para>
+        The third syntax was used before <productname>PostgreSQL</>
         version 9.6 and is still supported. It's the same as the first syntax
-        with <replaceable class="parameter">num_sync</replaceable> equal to 1.
-        For example, <literal>1 (s1, s2)</> and
-        <literal>s1, s2</> have the same meaning: either <literal>s1</>
-        or <literal>s2</> is chosen as a synchronous standby.
+        with <literal>FIRST</> and
+        <replaceable class="parameter">num_sync</replaceable> equal to 1.
+        For example, <literal>FIRST 1 (s1, s2)</> and <literal>s1, s2</> have
+        the same meaning: either <literal>s1</> or <literal>s2</> is chosen
+        as a synchronous standby.
        </para>
        <para>
         The name of a standby server for this purpose is the
index 6b89507c8c93d6d340762084b75575b6cf2c0705..a1a9532088a3d47403d707db795da69e63303ea0 100644 (file)
@@ -1138,19 +1138,25 @@ primary_slot_name = 'node_a_slot'
     as synchronous confirm receipt of their data. The number of synchronous
     standbys that transactions must wait for replies from is specified in
     <varname>synchronous_standby_names</>. This parameter also specifies
-    a list of standby names, which determines the priority of each standby
-    for being chosen as a synchronous standby. The standbys whose names
-    appear earlier in the list are given higher priority and will be considered
-    as synchronous. Other standby servers appearing later in this list
-    represent potential synchronous standbys. If any of the current
-    synchronous standbys disconnects for whatever reason, it will be replaced
-    immediately with the next-highest-priority standby.
+    a list of standby names and the method (<literal>FIRST</> and
+    <literal>ANY</>) to choose synchronous standbys from the listed ones.
    </para>
    <para>
-    An example of <varname>synchronous_standby_names</> for multiple
-    synchronous standbys is:
+    The method <literal>FIRST</> specifies a priority-based synchronous
+    replication and makes transaction commits wait until their WAL records are
+    replicated to the requested number of synchronous standbys chosen based on
+    their priorities. The standbys whose names appear earlier in the list are
+    given higher priority and will be considered as synchronous. Other standby
+    servers appearing later in this list represent potential synchronous
+    standbys. If any of the current synchronous standbys disconnects for
+    whatever reason, it will be replaced immediately with the
+    next-highest-priority standby.
+   </para>
+   <para>
+    An example of <varname>synchronous_standby_names</> for
+    a priority-based multiple synchronous standbys is:
 <programlisting>
-synchronous_standby_names = '2 (s1, s2, s3)'
+synchronous_standby_names = 'FIRST 2 (s1, s2, s3)'
 </programlisting>
     In this example, if four standby servers <literal>s1</>, <literal>s2</>,
     <literal>s3</> and <literal>s4</> are running, the two standbys
@@ -1161,6 +1167,24 @@ synchronous_standby_names = '2 (s1, s2, s3)'
     <literal>s2</> fails. <literal>s4</> is an asynchronous standby since
     its name is not in the list.
    </para>
+   <para>
+    The method <literal>ANY</> specifies a quorum-based synchronous
+    replication and makes transaction commits wait until their WAL records
+    are replicated to <emphasis>at least</> the requested number of
+    synchronous standbys in the list.
+   </para>
+   <para>
+    An example of <varname>synchronous_standby_names</> for
+    a quorum-based multiple synchronous standbys is:
+<programlisting>
+ synchronous_standby_names = 'ANY 2 (s1, s2, s3)'
+</programlisting>
+    In this example, if four standby servers <literal>s1</>, <literal>s2</>,
+    <literal>s3</> and <literal>s4</> are running, transaction commits will
+    wait for replies from at least any two standbys of <literal>s1</>,
+    <literal>s2</> and <literal>s3</>. <literal>s4</> is an asynchronous
+    standby since its name is not in the list.
+   </para>
    <para>
     The synchronous states of standby servers can be viewed using
     the <structname>pg_stat_replication</structname> view.
index 5b58d2e84dcf064c27d433ee29caa3da272bd437..02bc8feca7740da121b7399e0eaab9286a7931c7 100644 (file)
@@ -1404,7 +1404,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
      <entry><structfield>sync_priority</></entry>
      <entry><type>integer</></entry>
      <entry>Priority of this standby server for being chosen as the
-      synchronous standby</entry>
+      synchronous standby in a priority-based synchronous replication.
+      This has no effect in a quorum-based synchronous replication.</entry>
     </row>
     <row>
      <entry><structfield>sync_state</></entry>
@@ -1429,6 +1430,12 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
            <literal>sync</>: This standby server is synchronous.
           </para>
          </listitem>
+         <listitem>
+          <para>
+           <literal>quorum</>: This standby server is considered as a candidate
+           for quorum standbys.
+          </para>
+         </listitem>
        </itemizedlist>
      </entry>
     </row>
index c99717e0aeef38aea9b1306d9d30001decd755ad..da8bcf0471cc9a227bf9d487398d65d26f45c8ec 100644 (file)
@@ -26,7 +26,7 @@ repl_gram.o: repl_scanner.c
 
 # syncrep_scanner is complied as part of syncrep_gram
 syncrep_gram.o: syncrep_scanner.c
-syncrep_scanner.c: FLEXFLAGS = -CF -p
+syncrep_scanner.c: FLEXFLAGS = -CF -p -i
 syncrep_scanner.c: FLEX_NO_BACKUP=yes
 
 # repl_gram.c, repl_scanner.c, syncrep_gram.c and syncrep_scanner.c
index ce2009882d9a61ee8f85593046995eab0e1e34f0..9143c47f92dbecead52e04134759708c0275a02b 100644 (file)
  * searching the through all waiters each time we receive a reply.
  *
  * In 9.5 or before only a single standby could be considered as
- * synchronous. In 9.6 we support multiple synchronous standbys.
- * The number of synchronous standbys that transactions must wait for
- * replies from is specified in synchronous_standby_names.
- * This parameter also specifies a list of standby names,
- * which determines the priority of each standby for being chosen as
- * a synchronous standby. The standbys whose names appear earlier
- * in the list are given higher priority and will be considered as
- * synchronous. Other standby servers appearing later in this list
- * represent potential synchronous standbys. If any of the current
- * synchronous standbys disconnects for whatever reason, it will be
- * replaced immediately with the next-highest-priority standby.
+ * synchronous. In 9.6 we support a priority-based multiple synchronous
+ * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
+ * supported. The number of synchronous standbys that transactions
+ * must wait for replies from is specified in synchronous_standby_names.
+ * This parameter also specifies a list of standby names and the method
+ * (FIRST and ANY) to choose synchronous standbys from the listed ones.
+ *
+ * The method FIRST specifies a priority-based synchronous replication
+ * and makes transaction commits wait until their WAL records are
+ * replicated to the requested number of synchronous standbys chosen based
+ * on their priorities. The standbys whose names appear earlier in the list
+ * are given higher priority and will be considered as synchronous.
+ * Other standby servers appearing later in this list represent potential
+ * synchronous standbys. If any of the current synchronous standbys
+ * disconnects for whatever reason, it will be replaced immediately with
+ * the next-highest-priority standby.
+ *
+ * The method ANY specifies a quorum-based synchronous replication
+ * and makes transaction commits wait until their WAL records are
+ * replicated to at least the requested number of synchronous standbys
+ * in the list. All the standbys appearing in the list are considered as
+ * candidates for quorum synchronous standbys.
  *
  * Before the standbys chosen from synchronous_standby_names can
  * become the synchronous standbys they must have caught up with
  * the primary; that may take some time. Once caught up,
- * the current higher priority standbys which are considered as
- * synchronous at that moment will release waiters from the queue.
+ * the standbys which are considered as synchronous at that moment
+ * will release waiters from the queue.
  *
  * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
  *
@@ -79,18 +90,29 @@ char           *SyncRepStandbyNames;
 
 static bool announce_next_takeover = true;
 
-static SyncRepConfigData *SyncRepConfig = NULL;
+SyncRepConfigData *SyncRepConfig = NULL;
 static int     SyncRepWaitMode = SYNC_REP_NO_WAIT;
 
 static void SyncRepQueueInsert(int mode);
 static void SyncRepCancelWait(void);
 static int     SyncRepWakeQueue(bool all, int mode);
 
-static bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
-                                                  XLogRecPtr *flushPtr,
-                                                  XLogRecPtr *applyPtr,
-                                                  bool *am_sync);
+static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
+                                                                XLogRecPtr *flushPtr,
+                                                                XLogRecPtr *applyPtr,
+                                                                bool *am_sync);
+static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
+                                                                          XLogRecPtr *flushPtr,
+                                                                          XLogRecPtr *applyPtr,
+                                                                          List *sync_standbys);
+static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
+                                                                                 XLogRecPtr *flushPtr,
+                                                                                 XLogRecPtr *applyPtr,
+                                                                                 List *sync_standbys, uint8 nth);
 static int     SyncRepGetStandbyPriority(void);
+static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
+static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
+static int     cmp_lsn(const void *a, const void *b);
 
 #ifdef USE_ASSERT_CHECKING
 static bool SyncRepQueueIsOrderedByLSN(int mode);
@@ -386,7 +408,7 @@ SyncRepReleaseWaiters(void)
        XLogRecPtr      writePtr;
        XLogRecPtr      flushPtr;
        XLogRecPtr      applyPtr;
-       bool            got_oldest;
+       bool            got_recptr;
        bool            am_sync;
        int                     numwrite = 0;
        int                     numflush = 0;
@@ -413,11 +435,10 @@ SyncRepReleaseWaiters(void)
        LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
 
        /*
-        * Check whether we are a sync standby or not, and calculate the oldest
+        * Check whether we are a sync standby or not, and calculate the synced
         * positions among all sync standbys.
         */
-       got_oldest = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr,
-                                                                                       &applyPtr, &am_sync);
+       got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
 
        /*
         * If we are managing a sync standby, though we weren't prior to this,
@@ -426,16 +447,22 @@ SyncRepReleaseWaiters(void)
        if (announce_next_takeover && am_sync)
        {
                announce_next_takeover = false;
-               ereport(LOG,
-                               (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
-                                               application_name, MyWalSnd->sync_standby_priority)));
+
+               if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
+                       ereport(LOG,
+                                       (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
+                                                       application_name, MyWalSnd->sync_standby_priority)));
+               else
+                       ereport(LOG,
+                                       (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
+                                                       application_name)));
        }
 
        /*
         * If the number of sync standbys is less than requested or we aren't
         * managing a sync standby then just leave.
         */
-       if (!got_oldest || !am_sync)
+       if (!got_recptr || !am_sync)
        {
                LWLockRelease(SyncRepLock);
                announce_next_takeover = !am_sync;
@@ -471,21 +498,20 @@ SyncRepReleaseWaiters(void)
 }
 
 /*
- * Calculate the oldest Write, Flush and Apply positions among sync standbys.
+ * Calculate the synced Write, Flush and Apply positions among sync standbys.
  *
  * Return false if the number of sync standbys is less than
  * synchronous_standby_names specifies. Otherwise return true and
- * store the oldest positions into *writePtr, *flushPtr and *applyPtr.
+ * store the positions into *writePtr, *flushPtr and *applyPtr.
  *
  * On return, *am_sync is set to true if this walsender is connecting to
  * sync standby. Otherwise it's set to false.
  */
 static bool
-SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
+SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
                                                   XLogRecPtr *applyPtr, bool *am_sync)
 {
        List       *sync_standbys;
-       ListCell   *cell;
 
        *writePtr = InvalidXLogRecPtr;
        *flushPtr = InvalidXLogRecPtr;
@@ -508,12 +534,49 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
        }
 
        /*
-        * Scan through all sync standbys and calculate the oldest Write, Flush
-        * and Apply positions.
+        * In a priority-based sync replication, the synced positions are the
+        * oldest ones among sync standbys. In a quorum-based, they are the Nth
+        * latest ones.
+        *
+        * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest positions.
+        * But we use SyncRepGetOldestSyncRecPtr() for that calculation because
+        * it's a bit more efficient.
+        *
+        * XXX If the numbers of current and requested sync standbys are the same,
+        * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
+        * positions even in a quorum-based sync replication.
+        */
+       if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
+       {
+               SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
+                                                                  sync_standbys);
+       }
+       else
+       {
+               SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
+                                                                         sync_standbys, SyncRepConfig->num_sync);
+       }
+
+       list_free(sync_standbys);
+       return true;
+}
+
+/*
+ * Calculate the oldest Write, Flush and Apply positions among sync standbys.
+ */
+static void
+SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
+                                                  XLogRecPtr *applyPtr, List *sync_standbys)
+{
+       ListCell        *cell;
+
+       /*
+        * Scan through all sync standbys and calculate the oldest
+        * Write, Flush and Apply positions.
         */
-       foreach(cell, sync_standbys)
+       foreach (cell, sync_standbys)
        {
-               WalSnd     *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
+               WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
                XLogRecPtr      write;
                XLogRecPtr      flush;
                XLogRecPtr      apply;
@@ -531,23 +594,163 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
                if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
                        *applyPtr = apply;
        }
+}
 
-       list_free(sync_standbys);
-       return true;
+/*
+ * Calculate the Nth latest Write, Flush and Apply positions among sync
+ * standbys.
+ */
+static void
+SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
+                                                 XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
+{
+       ListCell        *cell;
+       XLogRecPtr      *write_array;
+       XLogRecPtr      *flush_array;
+       XLogRecPtr      *apply_array;
+       int     len;
+       int     i = 0;
+
+       len = list_length(sync_standbys);
+       write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+       flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+       apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+
+       foreach (cell, sync_standbys)
+       {
+               WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
+
+               SpinLockAcquire(&walsnd->mutex);
+               write_array[i] = walsnd->write;
+               flush_array[i] = walsnd->flush;
+               apply_array[i] = walsnd->apply;
+               SpinLockRelease(&walsnd->mutex);
+
+               i++;
+       }
+
+       qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
+       qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
+       qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
+
+       /* Get Nth latest Write, Flush, Apply positions */
+       *writePtr = write_array[nth - 1];
+       *flushPtr = flush_array[nth - 1];
+       *applyPtr = apply_array[nth - 1];
+
+       pfree(write_array);
+       pfree(flush_array);
+       pfree(apply_array);
+}
+
+/*
+ * Compare lsn in order to sort array in descending order.
+ */
+static int
+cmp_lsn(const void *a, const void *b)
+{
+       XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
+       XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
+
+       if (lsn1 > lsn2)
+               return -1;
+       else if (lsn1 == lsn2)
+               return 0;
+       else
+               return 1;
 }
 
 /*
  * Return the list of sync standbys, or NIL if no sync standby is connected.
  *
- * If there are multiple standbys with the same priority,
- * the first one found is selected preferentially.
  * The caller must hold SyncRepLock.
  *
  * On return, *am_sync is set to true if this walsender is connecting to
  * sync standby. Otherwise it's set to false.
  */
 List *
-SyncRepGetSyncStandbys(bool *am_sync)
+SyncRepGetSyncStandbys(bool    *am_sync)
+{
+       /* Set default result */
+       if (am_sync != NULL)
+               *am_sync = false;
+
+       /* Quick exit if sync replication is not requested */
+       if (SyncRepConfig == NULL)
+               return NIL;
+
+       return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ?
+               SyncRepGetSyncStandbysPriority(am_sync) :
+               SyncRepGetSyncStandbysQuorum(am_sync);
+}
+
+/*
+ * Return the list of all the candidates for quorum sync standbys,
+ * or NIL if no such standby is connected.
+ *
+ * The caller must hold SyncRepLock. This function must be called only in
+ * a quorum-based sync replication.
+ *
+ * On return, *am_sync is set to true if this walsender is connecting to
+ * sync standby. Otherwise it's set to false.
+ */
+static List *
+SyncRepGetSyncStandbysQuorum(bool *am_sync)
+{
+       List    *result = NIL;
+       int i;
+       volatile WalSnd *walsnd;        /* Use volatile pointer to prevent code
+                                                                * rearrangement */
+
+       Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM);
+
+       for (i = 0; i < max_wal_senders; i++)
+       {
+               walsnd = &WalSndCtl->walsnds[i];
+
+               /* Must be active */
+               if (walsnd->pid == 0)
+                       continue;
+
+               /* Must be streaming */
+               if (walsnd->state != WALSNDSTATE_STREAMING)
+                       continue;
+
+               /* Must be synchronous */
+               if (walsnd->sync_standby_priority == 0)
+                       continue;
+
+               /* Must have a valid flush position */
+               if (XLogRecPtrIsInvalid(walsnd->flush))
+                       continue;
+
+               /*
+                * Consider this standby as a candidate for quorum sync standbys
+                * and append it to the result.
+                */
+               result = lappend_int(result, i);
+               if (am_sync != NULL && walsnd == MyWalSnd)
+                       *am_sync = true;
+       }
+
+       return result;
+}
+
+/*
+ * Return the list of sync standbys chosen based on their priorities,
+ * or NIL if no sync standby is connected.
+ *
+ * If there are multiple standbys with the same priority,
+ * the first one found is selected preferentially.
+ *
+ * The caller must hold SyncRepLock. This function must be called only in
+ * a priority-based sync replication.
+ *
+ * On return, *am_sync is set to true if this walsender is connecting to
+ * sync standby. Otherwise it's set to false.
+ */
+static List *
+SyncRepGetSyncStandbysPriority(bool *am_sync)
 {
        List       *result = NIL;
        List       *pending = NIL;
@@ -560,13 +763,7 @@ SyncRepGetSyncStandbys(bool *am_sync)
        volatile WalSnd *walsnd;        /* Use volatile pointer to prevent code
                                                                 * rearrangement */
 
-       /* Set default result */
-       if (am_sync != NULL)
-               *am_sync = false;
-
-       /* Quick exit if sync replication is not requested */
-       if (SyncRepConfig == NULL)
-               return NIL;
+       Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
 
        lowest_priority = SyncRepConfig->nmembers;
        next_highest_priority = lowest_priority + 1;
index 35c27760d1235e339718077419b2369715b0ee39..281edc6f285084a0bbaef9eabdf3044c92bd496d 100644 (file)
@@ -21,7 +21,7 @@ SyncRepConfigData *syncrep_parse_result;
 char      *syncrep_parse_error_msg;
 
 static SyncRepConfigData *create_syncrep_config(const char *num_sync,
-                                         List *members);
+                                       List *members, uint8 syncrep_method);
 
 /*
  * Bison doesn't allocate anything that needs to live across parser calls,
@@ -46,7 +46,7 @@ static SyncRepConfigData *create_syncrep_config(const char *num_sync,
        SyncRepConfigData *config;
 }
 
-%token <str> NAME NUM JUNK
+%token <str> NAME NUM JUNK ANY FIRST
 
 %type <config> result standby_config
 %type <list> standby_list
@@ -60,8 +60,10 @@ result:
        ;
 
 standby_config:
-               standby_list                            { $$ = create_syncrep_config("1", $1); }
-               | NUM '(' standby_list ')'      { $$ = create_syncrep_config($1, $3); }
+               standby_list                            { $$ = create_syncrep_config("1", $1, SYNC_REP_PRIORITY); }
+               | NUM '(' standby_list ')'              { $$ = create_syncrep_config($1, $3, SYNC_REP_PRIORITY); }
+               | ANY NUM '(' standby_list ')'          { $$ = create_syncrep_config($2, $4, SYNC_REP_QUORUM); }
+               | FIRST NUM '(' standby_list ')'                { $$ = create_syncrep_config($2, $4, SYNC_REP_PRIORITY); }
        ;
 
 standby_list:
@@ -75,9 +77,8 @@ standby_name:
        ;
 %%
 
-
 static SyncRepConfigData *
-create_syncrep_config(const char *num_sync, List *members)
+create_syncrep_config(const char *num_sync, List *members, uint8 syncrep_method)
 {
        SyncRepConfigData *config;
        int                     size;
@@ -98,6 +99,7 @@ create_syncrep_config(const char *num_sync, List *members)
 
        config->config_size = size;
        config->num_sync = atoi(num_sync);
+       config->syncrep_method = syncrep_method;
        config->nmembers = list_length(members);
        ptr = config->member_names;
        foreach(lc, members)
index d20662ed03874a5b2f6965b32205ce297791cd6e..261b30e976a67caf9e15440b6ffd05eabf522d07 100644 (file)
@@ -64,6 +64,9 @@ xdinside              [^"]+
 %%
 {space}+       { /* ignore */ }
 
+ANY            { return ANY; }
+FIRST          { return FIRST; }
+
 {xdstart}      {
                                initStringInfo(&xdbuf);
                                BEGIN(xd);
index d80bcc00a1393770c21df54d41fed6b7736edb39..5cdb8a0ad6834c9b498587bc45f97d8a11ad988f 100644 (file)
@@ -2868,12 +2868,20 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 
                        /*
                         * More easily understood version of standby state. This is purely
-                        * informational, not different from priority.
+                        * informational.
+                        *
+                        * In quorum-based sync replication, the role of each standby
+                        * listed in synchronous_standby_names can be changing very
+                        * frequently. Any standbys considered as "sync" at one moment can
+                        * be switched to "potential" ones at the next moment. So, it's
+                        * basically useless to report "sync" or "potential" as their sync
+                        * states. We report just "quorum" for them.
                         */
                        if (priority == 0)
                                values[7] = CStringGetTextDatum("async");
                        else if (list_member_int(sync_standbys, i))
-                               values[7] = CStringGetTextDatum("sync");
+                               values[7] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
+                                       CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
                        else
                                values[7] = CStringGetTextDatum("potential");
                }
index 7f9acfda06719a4e07676fef738c16e6fef30b40..2c638b2c097253b9a0e357b1838009741a846fa4 100644 (file)
 # These settings are ignored on a standby server.
 
 #synchronous_standby_names = ''        # standby servers that provide sync rep
-                               # number of sync standbys and comma-separated list of application_name
+                               # method to choose sync standbys, number of sync standbys
+                               # and comma-separated list of application_name
                                # from standby(s); '*' = all
 #vacuum_defer_cleanup_age = 0  # number of xacts by which cleanup is delayed
 
index e4e0e2737143e8f475bfe47432f9d0b338c47715..9614b3163cf3e7c1da779cc6000f931aba73be51 100644 (file)
 #define SYNC_REP_WAITING                       1
 #define SYNC_REP_WAIT_COMPLETE         2
 
+/* syncrep_method of SyncRepConfigData */
+#define SYNC_REP_PRIORITY              0
+#define SYNC_REP_QUORUM                1
+
 /*
  * Struct for the configuration of synchronous replication.
  *
@@ -44,11 +48,14 @@ typedef struct SyncRepConfigData
        int                     config_size;    /* total size of this struct, in bytes */
        int                     num_sync;               /* number of sync standbys that we need to
                                                                 * wait for */
+       uint8           syncrep_method; /* method to choose sync standbys */
        int                     nmembers;               /* number of members in the following list */
        /* member_names contains nmembers consecutive nul-terminated C strings */
        char            member_names[FLEXIBLE_ARRAY_MEMBER];
 } SyncRepConfigData;
 
+extern SyncRepConfigData *SyncRepConfig;
+
 /* communication variables for parsing synchronous_standby_names GUC */
 extern SyncRepConfigData *syncrep_parse_result;
 extern char *syncrep_parse_error_msg;
index 0c872263eab9a40f2b3b8704333abf65d836f085..e11b4289d5289a76c45d05efae337a994d7bc455 100644 (file)
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 8;
+use Test::More tests => 11;
 
 # Query checking sync_priority and sync_state of each standby
 my $check_sql =
@@ -172,3 +172,34 @@ test_sync_state(
 standby2|1|sync
 standby4|1|potential),
        'potential standby found earlier in array is promoted to sync');
+
+# Check that standby1 and standby2 are chosen as sync standbys
+# based on their priorities.
+test_sync_state(
+$node_master, qq(standby1|1|sync
+standby2|2|sync
+standby4|0|async),
+'priority-based sync replication specified by FIRST keyword',
+'FIRST 2(standby1, standby2)');
+
+# Check that all the listed standbys are considered as candidates
+# for sync standbys in a quorum-based sync replication.
+test_sync_state(
+$node_master, qq(standby1|1|quorum
+standby2|2|quorum
+standby4|0|async),
+'2 quorum and 1 async',
+'ANY 2(standby1, standby2)');
+
+# Start Standby3 which will be considered in 'quorum' state.
+$node_standby_3->start;
+
+# Check that the setting of 'ANY 2(*)' chooses all standbys as
+# candidates for quorum sync standbys.
+test_sync_state(
+$node_master, qq(standby1|1|quorum
+standby2|1|quorum
+standby3|1|quorum
+standby4|1|quorum),
+'all standbys are considered as candidates for quorum sync standbys',
+'ANY 2(*)');