]> granicus.if.org Git - postgresql/commitdiff
Fix statistics reporting in logical replication workers
authorPeter Eisentraut <peter_e@gmx.net>
Mon, 8 May 2017 16:07:59 +0000 (12:07 -0400)
committerPeter Eisentraut <peter_e@gmx.net>
Mon, 8 May 2017 16:10:22 +0000 (12:10 -0400)
This new arrangement ensures that statistics are reported right after
commit of transactions.  The previous arrangement didn't get this quite
right and could lead to assertion failures.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reported-by: Erik Rijkers <er@xs4all.nl>
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c

index 0823000f001b4dbabf1a1672dc7c6e28d0b42a61..7e51076b376d9fbb9e3c0eb955aeef26afe04a88 100644 (file)
@@ -274,6 +274,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
        static List *table_states = NIL;
        static HTAB *last_start_times = NULL;
        ListCell   *lc;
+       bool            started_tx = false;
 
        Assert(!IsTransactionState());
 
@@ -290,6 +291,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                table_states = NIL;
 
                StartTransactionCommand();
+               started_tx = true;
 
                /* Fetch all non-ready tables. */
                rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
@@ -304,8 +306,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                }
                MemoryContextSwitchTo(oldctx);
 
-               CommitTransactionCommand();
-
                table_states_valid = true;
        }
 
@@ -350,11 +350,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                        {
                                rstate->state = SUBREL_STATE_READY;
                                rstate->lsn = current_lsn;
-                               StartTransactionCommand();
+                               if (!started_tx)
+                               {
+                                       StartTransactionCommand();
+                                       started_tx = true;
+                               }
                                SetSubscriptionRelState(MyLogicalRepWorker->subid,
                                                                                rstate->relid, rstate->state,
                                                                                rstate->lsn);
-                               CommitTransactionCommand();
                        }
                }
                else
@@ -457,6 +460,12 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                        }
                }
        }
+
+       if (started_tx)
+       {
+               CommitTransactionCommand();
+               pgstat_report_stat(false);
+       }
 }
 
 /*
@@ -806,6 +815,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                                                                                MyLogicalRepWorker->relstate,
                                                                                MyLogicalRepWorker->relstate_lsn);
                                CommitTransactionCommand();
+                               pgstat_report_stat(false);
 
                                /*
                                 * We want to do the table data sync in single
index 2d7770d4dc1502db71e338a72701c8b60b371f1c..a61240ceee7d3719316926664024c5e9bf9b9fa8 100644 (file)
@@ -453,6 +453,7 @@ apply_handle_commit(StringInfo s)
                replorigin_session_origin_timestamp = commit_data.committime;
 
                CommitTransactionCommand();
+               pgstat_report_stat(false);
 
                store_flush_position(commit_data.end_lsn);
        }
@@ -462,7 +463,6 @@ apply_handle_commit(StringInfo s)
        /* Process any tables that are being synchronized in parallel. */
        process_syncing_tables(commit_data.end_lsn);
 
-       pgstat_report_stat(false);
        pgstat_report_activity(STATE_IDLE, NULL);
 }