]> granicus.if.org Git - postgresql/commitdiff
Add option to modify sync commit per subscription
authorPeter Eisentraut <peter_e@gmx.net>
Fri, 14 Apr 2017 17:58:46 +0000 (13:58 -0400)
committerPeter Eisentraut <peter_e@gmx.net>
Fri, 14 Apr 2017 17:58:46 +0000 (13:58 -0400)
This also changes default behaviour of subscription workers to
synchronous_commit = off.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>

13 files changed:
doc/src/sgml/catalogs.sgml
doc/src/sgml/ref/alter_subscription.sgml
doc/src/sgml/ref/create_subscription.sgml
src/backend/catalog/pg_subscription.c
src/backend/commands/subscriptioncmds.c
src/backend/replication/logical/launcher.c
src/backend/replication/logical/worker.c
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/pg_dump.h
src/bin/psql/describe.c
src/include/catalog/pg_subscription.h
src/test/regress/expected/subscription.out
src/test/regress/sql/subscription.sql

index 5883673448caf64a029ddf3610804e2fe503269d..5254bb3025a9238f2f7d20b517c3fd779f479ed0 100644 (file)
       <entry>If true, the subscription is enabled and should be replicating.</entry>
      </row>
 
+     <row>
+      <entry><structfield>subsynccommit</structfield></entry>
+      <entry><type>text</type></entry>
+      <entry></entry>
+      <entry>
+       Contains the value of the <varname>synchronous_commit</varname>
+       setting for the subscription workers.
+      </entry>
+     </row>
+
      <row>
       <entry><structfield>subconninfo</structfield></entry>
       <entry><type>text</type></entry>
index 640fac0a1590351bcdbd5d92ea5d8dfccb2c2b3b..f71ee38b40cfaff06f69a2a4f62fdc1e2f069b73 100644 (file)
@@ -26,6 +26,7 @@ ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> WITH ( <rep
 <phrase>where <replaceable class="PARAMETER">suboption</replaceable> can be:</phrase>
 
     SLOT NAME = <replaceable class="PARAMETER">slot_name</replaceable>
+    | SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable>
 
 ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> SET PUBLICATION <replaceable class="PARAMETER">publication_name</replaceable> [, ...] { REFRESH WITH ( <replaceable class="PARAMETER">puboption</replaceable> [, ... ] ) | NOREFRESH }
 ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> REFRESH PUBLICATION WITH ( <replaceable class="PARAMETER">puboption</replaceable> [, ... ] )
@@ -91,6 +92,7 @@ ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> DISABLE
    <varlistentry>
     <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
     <term><literal>SLOT NAME = <replaceable class="parameter">slot_name</replaceable></literal></term>
+    <term><literal>SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable></literal></term>
     <listitem>
      <para>
       These clauses alter properties originally set by
index 3410d6fc8c2fd481a02b5f097cf3077b95ca1730..3c51012df8b998673f78171a92e32f233f2b35ce 100644 (file)
@@ -32,6 +32,7 @@ CREATE SUBSCRIPTION <replaceable class="PARAMETER">subscription_name</replaceabl
     | CREATE SLOT | NOCREATE SLOT
     | SLOT NAME = <replaceable class="PARAMETER">slot_name</replaceable>
     | COPY DATA | NOCOPY DATA
+    | SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable>
     | NOCONNECT
 </synopsis>
  </refsynopsisdiv>
@@ -147,6 +148,36 @@ CREATE SUBSCRIPTION <replaceable class="PARAMETER">subscription_name</replaceabl
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable></literal></term>
+    <listitem>
+     <para>
+      The value of this parameter overrides the
+      <xref linkend="guc-synchronous-commit"> setting.  The default value is
+      <literal>off</literal>.
+     </para>
+
+     <para>
+      It is safe to use <literal>off</literal> for logical replication: If the
+      subscriber loses transactions because of missing synchronization, the
+      data will be resent from the publisher.
+     </para>
+
+     <para>
+      A different setting might be appropriate when doing synchronous logical
+      replication.  The logical replication workers report the positions of
+      writes and flushes to the publisher, and when using synchronous
+      replication, the publisher will wait for the actual flush.  This means
+      that setting <literal>SYNCHRONOUS_COMMIT</literal> for the subscriber
+      to <literal>off</literal> when the subscription is used for synchronous
+      replication might increase the latency for <command>COMMIT</command> on
+      the publisher.  In this scenario, it can be advantageous to set
+      <literal>SYNCHRONOUS_COMMIT</literal> to <literal>local</literal> or
+      higher.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>NOCONNECT</literal></term>
     <listitem>
index 7e38b1a31cdf4c30158850e46ef2105188aa3c18..a18385055ef8fdf202b2981fbfffb19a226f4fe5 100644 (file)
@@ -85,6 +85,14 @@ GetSubscription(Oid subid, bool missing_ok)
        Assert(!isnull);
        sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
 
+       /* Get synccommit */
+       datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+                                                       tup,
+                                                       Anum_pg_subscription_subsynccommit,
+                                                       &isnull);
+       Assert(!isnull);
+       sub->synccommit = TextDatumGetCString(datum);
+
        /* Get publications */
        datum = SysCacheGetAttr(SUBSCRIPTIONOID,
                                                        tup,
index 7b8b11cb81f14af3fa0e758e345256e9c66c0d84..519c6846e35a1724aae35231c11d1bbe4c3b0b7b 100644 (file)
@@ -44,6 +44,7 @@
 #include "storage/lmgr.h"
 
 #include "utils/builtins.h"
+#include "utils/guc.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/syscache.h"
@@ -60,7 +61,7 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
 static void
 parse_subscription_options(List *options, bool *connect, bool *enabled_given,
                                                   bool *enabled, bool *create_slot, char **slot_name,
-                                                  bool *copy_data)
+                                                  bool *copy_data, char **synchronous_commit)
 {
        ListCell   *lc;
        bool            connect_given = false;
@@ -80,6 +81,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
                *slot_name = NULL;
        if (copy_data)
                *copy_data = true;
+       if (synchronous_commit)
+               *synchronous_commit = NULL;
 
        /* Parse options */
        foreach (lc, options)
@@ -165,6 +168,21 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
                        copy_data_given = true;
                        *copy_data = !defGetBoolean(defel);
                }
+               else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
+                                synchronous_commit)
+               {
+                       if (*synchronous_commit)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
+
+                       *synchronous_commit = defGetString(defel);
+
+                       /* Test if the given value is valid for synchronous_commit GUC. */
+                       (void) set_config_option("synchronous_commit", *synchronous_commit,
+                                                                        PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
+                                                                        false, 0, false);
+               }
                else
                        elog(ERROR, "unrecognized option: %s", defel->defname);
        }
@@ -269,6 +287,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
        bool            enabled_given;
        bool            enabled;
        bool            copy_data;
+       char       *synchronous_commit;
        char       *conninfo;
        char       *slotname;
        char            originname[NAMEDATALEN];
@@ -280,7 +299,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
         * Connection and publication should not be specified here.
         */
        parse_subscription_options(stmt->options, &connect, &enabled_given,
-                                                          &enabled, &create_slot, &slotname, &copy_data);
+                                                          &enabled, &create_slot, &slotname, &copy_data,
+                                                          &synchronous_commit);
 
        /*
         * Since creating a replication slot is not transactional, rolling back
@@ -311,6 +331,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 
        if (slotname == NULL)
                slotname = stmt->subname;
+       /* The default for synchronous_commit of subscriptions is off. */
+       if (synchronous_commit == NULL)
+               synchronous_commit = "off";
 
        conninfo = stmt->conninfo;
        publications = stmt->publication;
@@ -334,6 +357,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
                CStringGetTextDatum(conninfo);
        values[Anum_pg_subscription_subslotname - 1] =
                DirectFunctionCall1(namein, CStringGetDatum(slotname));
+       values[Anum_pg_subscription_subsynccommit - 1] =
+               CStringGetTextDatum(synchronous_commit);
        values[Anum_pg_subscription_subpublications - 1] =
                 publicationListToArray(publications);
 
@@ -582,13 +607,24 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
                case ALTER_SUBSCRIPTION_OPTIONS:
                        {
                                char *slot_name;
+                               char       *synchronous_commit;
 
                                parse_subscription_options(stmt->options, NULL, NULL, NULL,
-                                                                                  NULL, &slot_name, NULL);
+                                                                                  NULL, &slot_name, NULL,
+                                                                                  &synchronous_commit);
 
-                               values[Anum_pg_subscription_subslotname - 1] =
-                                       DirectFunctionCall1(namein, CStringGetDatum(slot_name));
-                               replaces[Anum_pg_subscription_subslotname - 1] = true;
+                               if (slot_name)
+                               {
+                                       values[Anum_pg_subscription_subslotname - 1] =
+                                               DirectFunctionCall1(namein, CStringGetDatum(slot_name));
+                                       replaces[Anum_pg_subscription_subslotname - 1] = true;
+                               }
+                               if (synchronous_commit)
+                               {
+                                       values[Anum_pg_subscription_subsynccommit - 1] =
+                                               CStringGetTextDatum(synchronous_commit);
+                                       replaces[Anum_pg_subscription_subsynccommit - 1] = true;
+                               }
 
                                update_tuple = true;
                                break;
@@ -601,7 +637,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
                                parse_subscription_options(stmt->options, NULL,
                                                                                   &enabled_given, &enabled, NULL,
-                                                                                  NULL, NULL);
+                                                                                  NULL, NULL, NULL);
                                Assert(enabled_given);
 
                                values[Anum_pg_subscription_subenabled - 1] =
@@ -626,7 +662,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
                                Subscription   *sub = GetSubscription(subid, false);
 
                                parse_subscription_options(stmt->options, NULL, NULL, NULL,
-                                                                                  NULL, NULL, &copy_data);
+                                                                                  NULL, NULL, &copy_data, NULL);
 
                                values[Anum_pg_subscription_subpublications - 1] =
                                         publicationListToArray(stmt->publication);
@@ -652,7 +688,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
                                Subscription   *sub = GetSubscription(subid, false);
 
                                parse_subscription_options(stmt->options, NULL, NULL, NULL,
-                                                                                  NULL, NULL, &copy_data);
+                                                                                  NULL, NULL, &copy_data, NULL);
 
                                AlterSubscription_refresh(sub, copy_data);
 
index 7ba239c02c19777a1728a23c536915be5e2f05ab..2d663f6308f627fb2c3f92f8f579bead120a3bba 100644 (file)
@@ -129,17 +129,13 @@ get_subscription_list(void)
                 */
                oldcxt = MemoryContextSwitchTo(resultcxt);
 
-               sub = (Subscription *) palloc(sizeof(Subscription));
+               sub = (Subscription *) palloc0(sizeof(Subscription));
                sub->oid = HeapTupleGetOid(tup);
                sub->dbid = subform->subdbid;
                sub->owner = subform->subowner;
                sub->enabled = subform->subenabled;
                sub->name = pstrdup(NameStr(subform->subname));
-
                /* We don't fill fields we are not interested in. */
-               sub->conninfo = NULL;
-               sub->slotname = NULL;
-               sub->publications = NIL;
 
                res = lappend(res, sub);
                MemoryContextSwitchTo(oldcxt);
index 3313448e7b9a408d8902e20c1938dea94ed0db16..29b6c6a168943121fd2ae5b5e93a01c199376ac7 100644 (file)
@@ -1416,6 +1416,10 @@ reread_subscription(void)
 
        MemoryContextSwitchTo(oldctx);
 
+       /* Change synchronous commit according to the user's wishes */
+       SetConfigOption("synchronous_commit", MySubscription->synccommit,
+                                       PGC_BACKEND, PGC_S_OVERRIDE);
+
        if (started_tx)
                CommitTransactionCommand();
 
@@ -1485,6 +1489,10 @@ ApplyWorkerMain(Datum main_arg)
        MySubscriptionValid = true;
        MemoryContextSwitchTo(oldctx);
 
+       /* Setup synchronous commit according to the user's wishes */
+       SetConfigOption("synchronous_commit", MySubscription->synccommit,
+                                       PGC_BACKEND, PGC_S_OVERRIDE);
+
        if (!MySubscription->enabled)
        {
                ereport(LOG,
index 102935446271e49b0a4e19ecbcc2e202aa53590f..3eccfa626bf311850897cdb1c231298aaa5b8485 100644 (file)
@@ -3683,6 +3683,7 @@ getSubscriptions(Archive *fout)
        int                     i_rolname;
        int                     i_subconninfo;
        int                     i_subslotname;
+       int                     i_subsynccommit;
        int                     i_subpublications;
        int                     i,
                                ntups;
@@ -3714,7 +3715,8 @@ getSubscriptions(Archive *fout)
        appendPQExpBuffer(query,
                                          "SELECT s.tableoid, s.oid, s.subname,"
                                          "(%s s.subowner) AS rolname, "
-                                         " s.subconninfo, s.subslotname, s.subpublications "
+                                         " s.subconninfo, s.subslotname, s.subsynccommit, "
+                                         " s.subpublications "
                                          "FROM pg_catalog.pg_subscription s "
                                          "WHERE s.subdbid = (SELECT oid FROM pg_catalog.pg_database"
                                          "                   WHERE datname = current_database())",
@@ -3729,6 +3731,7 @@ getSubscriptions(Archive *fout)
        i_rolname = PQfnumber(res, "rolname");
        i_subconninfo = PQfnumber(res, "subconninfo");
        i_subslotname = PQfnumber(res, "subslotname");
+       i_subsynccommit = PQfnumber(res, "subsynccommit");
        i_subpublications = PQfnumber(res, "subpublications");
 
        subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
@@ -3744,6 +3747,8 @@ getSubscriptions(Archive *fout)
                subinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname));
                subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo));
                subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname));
+               subinfo[i].subsynccommit =
+                       pg_strdup(PQgetvalue(res, i, i_subsynccommit));
                subinfo[i].subpublications =
                        pg_strdup(PQgetvalue(res, i, i_subpublications));
 
@@ -3810,6 +3815,10 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo)
 
        appendPQExpBuffer(query, " PUBLICATION %s WITH (NOCONNECT, SLOT NAME = ", publications->data);
        appendStringLiteralAH(query, subinfo->subslotname, fout);
+
+       if (strcmp(subinfo->subsynccommit, "off") != 0)
+               appendPQExpBuffer(query, ", SYNCHRONOUS_COMMIT = %s", fmtId(subinfo->subsynccommit));
+
        appendPQExpBufferStr(query, ");\n");
 
        appendPQExpBuffer(labelq, "SUBSCRIPTION %s", fmtId(subinfo->dobj.name));
index ba85392f1183daef2681bcc1927c933eeaf7ad4e..471cfce92a95259254c1f993e0fd17eaa86c9ae0 100644 (file)
@@ -616,6 +616,7 @@ typedef struct _SubscriptionInfo
        char       *rolname;
        char       *subconninfo;
        char       *subslotname;
+       char       *subsynccommit;
        char       *subpublications;
 } SubscriptionInfo;
 
index 2494d046b25c323bd7d72ee2597c1d2dfcb151e4..59121b8d1b06eaf5d1e55858e005ed2ae5ccd038 100644 (file)
@@ -5199,7 +5199,8 @@ describeSubscriptions(const char *pattern, bool verbose)
        PQExpBufferData buf;
        PGresult   *res;
        printQueryOpt myopt = pset.popt;
-       static const bool translate_columns[] = {false, false, false, false, false};
+       static const bool translate_columns[] = {false, false, false, false,
+               false, false};
 
        if (pset.sversion < 100000)
        {
@@ -5225,7 +5226,9 @@ describeSubscriptions(const char *pattern, bool verbose)
        if (verbose)
        {
                appendPQExpBuffer(&buf,
+                                                 ",  subsynccommit AS \"%s\"\n"
                                                  ",  subconninfo AS \"%s\"\n",
+                                                 gettext_noop("Synchronous commit"),
                                                  gettext_noop("Conninfo"));
        }
 
index 0811880a8f37c772aa6832ec433c8bfba3856e96..fae542b6129e2d98533d16dd77351d2131dfb912 100644 (file)
@@ -43,7 +43,7 @@ CATALOG(pg_subscription,6100) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6101) BKI_SCHE
 #ifdef CATALOG_VARLEN                  /* variable-length fields start here */
        text            subconninfo;    /* Connection string to the publisher */
        NameData        subslotname;    /* Slot name on publisher */
-
+       text            subsynccommit;  /* Synchronous commit setting for worker */
        text            subpublications[1];     /* List of publications subscribed to */
 #endif
 } FormData_pg_subscription;
@@ -54,14 +54,15 @@ typedef FormData_pg_subscription *Form_pg_subscription;
  *             compiler constants for pg_subscription
  * ----------------
  */
-#define Natts_pg_subscription                                  7
+#define Natts_pg_subscription                                  8
 #define Anum_pg_subscription_subdbid                   1
 #define Anum_pg_subscription_subname                   2
 #define Anum_pg_subscription_subowner                  3
 #define Anum_pg_subscription_subenabled                        4
 #define Anum_pg_subscription_subconninfo               5
 #define Anum_pg_subscription_subslotname               6
-#define Anum_pg_subscription_subpublications   7
+#define Anum_pg_subscription_subsynccommit             7
+#define Anum_pg_subscription_subpublications   8
 
 
 typedef struct Subscription
@@ -73,6 +74,7 @@ typedef struct Subscription
        bool    enabled;                /* Indicates if the subscription is enabled */
        char   *conninfo;               /* Connection string to the publisher */
        char   *slotname;               /* Name of the replication slot */
+       char   *synccommit;             /* Synchronous commit setting for worker */
        List   *publications;   /* List of publication names to subscribe to */
 } Subscription;
 
index 8760d5970aa5473f58d0fc449bca15bf903b515d..47531edd1b51e8b662adefa3cb8b5fe09407a78f 100644 (file)
@@ -46,10 +46,10 @@ CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION foo WI
 ERROR:  must be superuser to create subscriptions
 SET SESSION AUTHORIZATION 'regress_subscription_user';
 \dRs+
-                               List of subscriptions
-  Name   |           Owner           | Enabled | Publication |      Conninfo       
----------+---------------------------+---------+-------------+---------------------
- testsub | regress_subscription_user | f       | {testpub}   | dbname=doesnotexist
+                                         List of subscriptions
+  Name   |           Owner           | Enabled | Publication | Synchronous commit |      Conninfo       
+---------+---------------------------+---------+-------------+--------------------+---------------------
+ testsub | regress_subscription_user | f       | {testpub}   | off                | dbname=doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION testsub SET PUBLICATION testpub2, testpub3 NOREFRESH;
@@ -59,10 +59,10 @@ ALTER SUBSCRIPTION testsub WITH (SLOT NAME = 'newname');
 ALTER SUBSCRIPTION doesnotexist CONNECTION 'dbname=doesnotexist2';
 ERROR:  subscription "doesnotexist" does not exist
 \dRs+
-                                   List of subscriptions
-  Name   |           Owner           | Enabled |     Publication     |       Conninfo       
----------+---------------------------+---------+---------------------+----------------------
- testsub | regress_subscription_user | f       | {testpub2,testpub3} | dbname=doesnotexist2
+                                              List of subscriptions
+  Name   |           Owner           | Enabled |     Publication     | Synchronous commit |       Conninfo       
+---------+---------------------------+---------+---------------------+--------------------+----------------------
+ testsub | regress_subscription_user | f       | {testpub2,testpub3} | off                | dbname=doesnotexist2
 (1 row)
 
 BEGIN;
@@ -89,11 +89,15 @@ ALTER SUBSCRIPTION testsub RENAME TO testsub_dummy;
 ERROR:  must be owner of subscription testsub
 RESET ROLE;
 ALTER SUBSCRIPTION testsub RENAME TO testsub_foo;
-\dRs
-                          List of subscriptions
-    Name     |           Owner           | Enabled |     Publication     
--------------+---------------------------+---------+---------------------
- testsub_foo | regress_subscription_user | f       | {testpub2,testpub3}
+ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = local);
+ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = foobar);
+ERROR:  invalid value for parameter "synchronous_commit": "foobar"
+HINT:  Available values: local, remote_write, remote_apply, on, off.
+\dRs+
+                                                List of subscriptions
+    Name     |           Owner           | Enabled |     Publication     | Synchronous commit |       Conninfo       
+-------------+---------------------------+---------+---------------------+--------------------+----------------------
+ testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | local              | dbname=doesnotexist2
 (1 row)
 
 -- rename back to keep the rest simple
index 7bdc2b3503c398ea5aeaeee82a1158cc9731a9e7..1b30d150cea15f5d3f04975695c87de73488c26f 100644 (file)
@@ -66,8 +66,10 @@ ALTER SUBSCRIPTION testsub RENAME TO testsub_dummy;
 RESET ROLE;
 
 ALTER SUBSCRIPTION testsub RENAME TO testsub_foo;
+ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = local);
+ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = foobar);
 
-\dRs
+\dRs+
 
 -- rename back to keep the rest simple
 ALTER SUBSCRIPTION testsub_foo RENAME TO testsub;