]> granicus.if.org Git - postgresql/commitdiff
Wait between tablesync worker restarts
authorPeter Eisentraut <peter_e@gmx.net>
Thu, 27 Apr 2017 18:57:26 +0000 (14:57 -0400)
committerPeter Eisentraut <peter_e@gmx.net>
Fri, 28 Apr 2017 17:47:46 +0000 (13:47 -0400)
Before restarting a tablesync worker for the same relation, wait
wal_retrieve_retry_interval (currently 5s by default).  This avoids
restarting failing workers in a tight loop.

We keep the last start times in a hash table last_start_times that is
separate from the table_states list, because that list is cleared out on
syscache invalidation, which happens whenever a table finishes syncing.
The hash table is kept until all tables have finished syncing.

A future project might be to unify these two and keep everything in one
data structure, but for now this is a less invasive change to accomplish
the original purpose.

For the test suite, set wal_retrieve_retry_interval to its minimum
value, to not increase the test suite run time.

Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reported-by: Masahiko Sawada <sawada.mshk@gmail.com>
src/backend/replication/logical/tablesync.c
src/test/subscription/t/004_sync.pl

index e63d26b0bcfd971e8d95267ea4710c19b91a4510..0823000f001b4dbabf1a1672dc7c6e28d0b42a61 100644 (file)
@@ -245,7 +245,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
  *
  * If there are tables that need synchronizing and are not being synchronized
  * yet, start sync workers for them (if there are free slots for sync
- * workers).
+ * workers).  To prevent starting the sync worker for the same relation at a
+ * high frequency after a failure, we store its last start time with each sync
+ * state info.  We start the sync worker for the same relation after waiting
+ * at least wal_retrieve_retry_interval.
  *
  * For tables that are being synchronized already, check if sync workers
  * either need action from the apply worker or have finished.
@@ -263,7 +266,13 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 static void
 process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 {
+       struct tablesync_start_time_mapping
+       {
+               Oid                     relid;
+               TimestampTz     last_start_time;
+       };
        static List *table_states = NIL;
+       static HTAB *last_start_times = NULL;
        ListCell   *lc;
 
        Assert(!IsTransactionState());
@@ -300,6 +309,31 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                table_states_valid = true;
        }
 
+       /*
+        * Prepare hash table for tracking last start times of workers, to avoid
+        * immediate restarts.  We don't need it if there are no tables that need
+        * syncing.
+        */
+       if (table_states && !last_start_times)
+       {
+               HASHCTL         ctl;
+
+               memset(&ctl, 0, sizeof(ctl));
+               ctl.keysize = sizeof(Oid);
+               ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
+               last_start_times = hash_create("Logical replication table sync worker start times",
+                                                                          256, &ctl, HASH_ELEM | HASH_BLOBS);
+       }
+       /*
+        * Clean up the hash table when we're done with all tables (just to
+        * release the bit of memory).
+        */
+       else if (!table_states && last_start_times)
+       {
+               hash_destroy(last_start_times);
+               last_start_times = NULL;
+       }
+
        /* Process all tables that are being synchronized. */
        foreach(lc, table_states)
        {
@@ -403,11 +437,23 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                         */
                        else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription)
                        {
-                               logicalrep_worker_launch(MyLogicalRepWorker->dbid,
-                                                                                MySubscription->oid,
-                                                                                MySubscription->name,
-                                                                                MyLogicalRepWorker->userid,
-                                                                                rstate->relid);
+                               TimestampTz     now = GetCurrentTimestamp();
+                               struct tablesync_start_time_mapping *hentry;
+                               bool            found;
+
+                               hentry = hash_search(last_start_times, &rstate->relid, HASH_ENTER, &found);
+
+                               if (!found ||
+                                       TimestampDifferenceExceeds(hentry->last_start_time, now,
+                                                                                          wal_retrieve_retry_interval))
+                               {
+                                       logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+                                                                                        MySubscription->oid,
+                                                                                        MySubscription->name,
+                                                                                        MyLogicalRepWorker->userid,
+                                                                                        rstate->relid);
+                                       hentry->last_start_time = now;
+                               }
                        }
                }
        }
index 87541a0e6e18a28e93a00a5a30a920fc05d95985..fa0bf7f49fc07b8c087f2b325778ff1a04db41b9 100644 (file)
@@ -13,6 +13,7 @@ $node_publisher->start;
 # Create subscriber node
 my $node_subscriber = get_new_node('subscriber');
 $node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf', "wal_retrieve_retry_interval = 1ms");
 $node_subscriber->start;
 
 # Create some preexisting content on publisher