logicalrep_read_begin(s, &begin_data);
- replorigin_session_origin_timestamp = begin_data.committime;
- replorigin_session_origin_lsn = begin_data.final_lsn;
-
remote_final_lsn = begin_data.final_lsn;
in_remote_transaction = true;
logicalrep_read_commit(s, &commit_data);
- Assert(commit_data.commit_lsn == replorigin_session_origin_lsn);
- Assert(commit_data.committime == replorigin_session_origin_timestamp);
-
Assert(commit_data.commit_lsn == remote_final_lsn);
/* The synchronization worker runs in single transaction. */
if (IsTransactionState() && !am_tablesync_worker())
{
+ /*
+ * Update origin state so we can restart streaming from correct
+ * position in case of crash.
+ */
+ replorigin_session_origin_lsn = commit_data.end_lsn;
+ replorigin_session_origin_timestamp = commit_data.committime;
+
CommitTransactionCommand();
store_flush_position(commit_data.end_lsn);