]> granicus.if.org Git - zfs/blobdiff - module/zfs/metaslab.c
Log Spacemap Project
[zfs] / module / zfs / metaslab.c
index 5da929b48430dd6ff17536669008abf15369cf6e..0b22aa875ed00f71c172e65047e4886e32498777 100644 (file)
@@ -56,12 +56,21 @@ unsigned long metaslab_aliquot = 512 << 10;
 unsigned long metaslab_force_ganging = SPA_MAXBLOCKSIZE + 1;
 
 /*
- * Since we can touch multiple metaslabs (and their respective space maps)
- * with each transaction group, we benefit from having a smaller space map
+ * In pools where the log space map feature is not enabled we touch
+ * multiple metaslabs (and their respective space maps) with each
+ * transaction group. Thus, we benefit from having a small space map
  * block size since it allows us to issue more I/O operations scattered
- * around the disk.
+ * around the disk. So a sane default for the space map block size
+ * is 8~16K.
  */
-int zfs_metaslab_sm_blksz = (1 << 12);
+int zfs_metaslab_sm_blksz_no_log = (1 << 14);
+
+/*
+ * When the log space map feature is enabled, we accumulate a lot of
+ * changes per metaslab that are flushed once in a while so we benefit
+ * from a bigger block size like 128K for the metaslab space maps.
+ */
+int zfs_metaslab_sm_blksz_with_log = (1 << 17);
 
 /*
  * The in-core space map representation is more compact than its on-disk form.
@@ -270,6 +279,7 @@ static void metaslab_check_free_impl(vdev_t *, uint64_t, uint64_t);
 
 static void metaslab_passivate(metaslab_t *msp, uint64_t weight);
 static uint64_t metaslab_weight_from_range_tree(metaslab_t *msp);
+static void metaslab_flush_update(metaslab_t *, dmu_tx_t *);
 #ifdef _METASLAB_TRACING
 kmem_cache_t *metaslab_alloc_trace_cache;
 #endif
@@ -540,67 +550,6 @@ metaslab_compare(const void *x1, const void *x2)
        return (AVL_CMP(m1->ms_start, m2->ms_start));
 }
 
-uint64_t
-metaslab_allocated_space(metaslab_t *msp)
-{
-       return (msp->ms_allocated_space);
-}
-
-/*
- * Verify that the space accounting on disk matches the in-core range_trees.
- */
-static void
-metaslab_verify_space(metaslab_t *msp, uint64_t txg)
-{
-       spa_t *spa = msp->ms_group->mg_vd->vdev_spa;
-       uint64_t allocating = 0;
-       uint64_t sm_free_space, msp_free_space;
-
-       ASSERT(MUTEX_HELD(&msp->ms_lock));
-       ASSERT(!msp->ms_condensing);
-
-       if ((zfs_flags & ZFS_DEBUG_METASLAB_VERIFY) == 0)
-               return;
-
-       /*
-        * We can only verify the metaslab space when we're called
-        * from syncing context with a loaded metaslab that has an
-        * allocated space map. Calling this in non-syncing context
-        * does not provide a consistent view of the metaslab since
-        * we're performing allocations in the future.
-        */
-       if (txg != spa_syncing_txg(spa) || msp->ms_sm == NULL ||
-           !msp->ms_loaded)
-               return;
-
-       /*
-        * Even though the smp_alloc field can get negative (e.g.
-        * see vdev_checkpoint_sm), that should never be the case
-        * when it come's to a metaslab's space map.
-        */
-       ASSERT3S(space_map_allocated(msp->ms_sm), >=, 0);
-
-       sm_free_space = msp->ms_size - metaslab_allocated_space(msp);
-
-       /*
-        * Account for future allocations since we would have
-        * already deducted that space from the ms_allocatable.
-        */
-       for (int t = 0; t < TXG_CONCURRENT_STATES; t++) {
-               allocating +=
-                   range_tree_space(msp->ms_allocating[(txg + t) & TXG_MASK]);
-       }
-
-       ASSERT3U(msp->ms_deferspace, ==,
-           range_tree_space(msp->ms_defer[0]) +
-           range_tree_space(msp->ms_defer[1]));
-
-       msp_free_space = range_tree_space(msp->ms_allocatable) + allocating +
-           msp->ms_deferspace + range_tree_space(msp->ms_freed);
-
-       VERIFY3U(sm_free_space, ==, msp_free_space);
-}
-
 /*
  * ==========================================================================
  * Metaslab groups
@@ -689,6 +638,25 @@ metaslab_group_alloc_update(metaslab_group_t *mg)
        mutex_exit(&mg->mg_lock);
 }
 
+int
+metaslab_sort_by_flushed(const void *va, const void *vb)
+{
+       const metaslab_t *a = va;
+       const metaslab_t *b = vb;
+
+       int cmp = AVL_CMP(a->ms_unflushed_txg, b->ms_unflushed_txg);
+       if (likely(cmp))
+               return (cmp);
+
+       uint64_t a_vdev_id = a->ms_group->mg_vd->vdev_id;
+       uint64_t b_vdev_id = b->ms_group->mg_vd->vdev_id;
+       cmp = AVL_CMP(a_vdev_id, b_vdev_id);
+       if (cmp)
+               return (cmp);
+
+       return (AVL_CMP(a->ms_id, b->ms_id));
+}
+
 metaslab_group_t *
 metaslab_group_create(metaslab_class_t *mc, vdev_t *vd, int allocators)
 {
@@ -703,7 +671,7 @@ metaslab_group_create(metaslab_class_t *mc, vdev_t *vd, int allocators)
        mg->mg_secondaries = kmem_zalloc(allocators * sizeof (metaslab_t *),
            KM_SLEEP);
        avl_create(&mg->mg_metaslab_tree, metaslab_compare,
-           sizeof (metaslab_t), offsetof(struct metaslab, ms_group_node));
+           sizeof (metaslab_t), offsetof(metaslab_t, ms_group_node));
        mg->mg_vd = vd;
        mg->mg_class = mc;
        mg->mg_activation_count = 0;
@@ -900,7 +868,6 @@ metaslab_group_histogram_verify(metaslab_group_t *mg)
 
        for (int m = 0; m < vd->vdev_ms_count; m++) {
                metaslab_t *msp = vd->vdev_ms[m];
-               ASSERT(msp != NULL);
 
                /* skip if not active or not a member */
                if (msp->ms_sm == NULL || msp->ms_group != mg)
@@ -1454,6 +1421,101 @@ metaslab_ops_t *zfs_metaslab_ops = &metaslab_ndf_ops;
  * ==========================================================================
  */
 
+/*
+ * Wait for any in-progress metaslab loads to complete.
+ */
+void
+metaslab_load_wait(metaslab_t *msp)
+{
+       ASSERT(MUTEX_HELD(&msp->ms_lock));
+
+       while (msp->ms_loading) {
+               ASSERT(!msp->ms_loaded);
+               cv_wait(&msp->ms_load_cv, &msp->ms_lock);
+       }
+}
+
+/*
+ * Wait for any in-progress flushing to complete.
+ */
+void
+metaslab_flush_wait(metaslab_t *msp)
+{
+       ASSERT(MUTEX_HELD(&msp->ms_lock));
+
+       while (msp->ms_flushing)
+               cv_wait(&msp->ms_flush_cv, &msp->ms_lock);
+}
+
+uint64_t
+metaslab_allocated_space(metaslab_t *msp)
+{
+       return (msp->ms_allocated_space);
+}
+
+/*
+ * Verify that the space accounting on disk matches the in-core range_trees.
+ */
+static void
+metaslab_verify_space(metaslab_t *msp, uint64_t txg)
+{
+       spa_t *spa = msp->ms_group->mg_vd->vdev_spa;
+       uint64_t allocating = 0;
+       uint64_t sm_free_space, msp_free_space;
+
+       ASSERT(MUTEX_HELD(&msp->ms_lock));
+       ASSERT(!msp->ms_condensing);
+
+       if ((zfs_flags & ZFS_DEBUG_METASLAB_VERIFY) == 0)
+               return;
+
+       /*
+        * We can only verify the metaslab space when we're called
+        * from syncing context with a loaded metaslab that has an
+        * allocated space map. Calling this in non-syncing context
+        * does not provide a consistent view of the metaslab since
+        * we're performing allocations in the future.
+        */
+       if (txg != spa_syncing_txg(spa) || msp->ms_sm == NULL ||
+           !msp->ms_loaded)
+               return;
+
+       /*
+        * Even though the smp_alloc field can get negative,
+        * when it comes to a metaslab's space map, that should
+        * never be the case.
+        */
+       ASSERT3S(space_map_allocated(msp->ms_sm), >=, 0);
+
+       ASSERT3U(space_map_allocated(msp->ms_sm), >=,
+           range_tree_space(msp->ms_unflushed_frees));
+
+       ASSERT3U(metaslab_allocated_space(msp), ==,
+           space_map_allocated(msp->ms_sm) +
+           range_tree_space(msp->ms_unflushed_allocs) -
+           range_tree_space(msp->ms_unflushed_frees));
+
+       sm_free_space = msp->ms_size - metaslab_allocated_space(msp);
+
+       /*
+        * Account for future allocations since we would have
+        * already deducted that space from the ms_allocatable.
+        */
+       for (int t = 0; t < TXG_CONCURRENT_STATES; t++) {
+               allocating +=
+                   range_tree_space(msp->ms_allocating[(txg + t) & TXG_MASK]);
+       }
+
+       ASSERT3U(msp->ms_deferspace, ==,
+           range_tree_space(msp->ms_defer[0]) +
+           range_tree_space(msp->ms_defer[1]));
+
+       msp_free_space = range_tree_space(msp->ms_allocatable) + allocating +
+           msp->ms_deferspace + range_tree_space(msp->ms_freed);
+
+       VERIFY3U(sm_free_space, ==, msp_free_space);
+}
+
 static void
 metaslab_aux_histograms_clear(metaslab_t *msp)
 {
@@ -1651,20 +1713,6 @@ metaslab_verify_weight_and_frag(metaslab_t *msp)
        VERIFY3U(msp->ms_weight, ==, weight);
 }
 
-/*
- * Wait for any in-progress metaslab loads to complete.
- */
-static void
-metaslab_load_wait(metaslab_t *msp)
-{
-       ASSERT(MUTEX_HELD(&msp->ms_lock));
-
-       while (msp->ms_loading) {
-               ASSERT(!msp->ms_loaded);
-               cv_wait(&msp->ms_load_cv, &msp->ms_lock);
-       }
-}
-
 static int
 metaslab_load_impl(metaslab_t *msp)
 {
@@ -1679,13 +1727,19 @@ metaslab_load_impl(metaslab_t *msp)
         * are reading the space map. Therefore, metaslab_sync() and
         * metaslab_sync_done() can run at the same time as we do.
         *
-        * metaslab_sync() can append to the space map while we are loading.
-        * Therefore we load only entries that existed when we started the
-        * load. Additionally, metaslab_sync_done() has to wait for the load
-        * to complete because there are potential races like metaslab_load()
-        * loading parts of the space map that are currently being appended
-        * by metaslab_sync(). If we didn't, the ms_allocatable would have
-        * entries that metaslab_sync_done() would try to re-add later.
+        * If we are using the log space maps, metaslab_sync() can't write to
+        * the metaslab's space map while we are loading as we only write to
+        * it when we are flushing the metaslab, and that can't happen while
+        * we are loading it.
+        *
+        * If we are not using log space maps though, metaslab_sync() can
+        * append to the space map while we are loading. Therefore we load
+        * only entries that existed when we started the load. Additionally,
+        * metaslab_sync_done() has to wait for the load to complete because
+        * there are potential races like metaslab_load() loading parts of the
+        * space map that are currently being appended by metaslab_sync(). If
+        * we didn't, the ms_allocatable would have entries that
+        * metaslab_sync_done() would try to re-add later.
         *
         * That's why before dropping the lock we remember the synced length
         * of the metaslab and read up to that point of the space map,
@@ -1695,6 +1749,7 @@ metaslab_load_impl(metaslab_t *msp)
        uint64_t length = msp->ms_synced_length;
        mutex_exit(&msp->ms_lock);
 
+       hrtime_t load_start = gethrtime();
        if (msp->ms_sm != NULL) {
                error = space_map_load_length(msp->ms_sm, msp->ms_allocatable,
                    SM_FREE, length);
@@ -1706,18 +1761,37 @@ metaslab_load_impl(metaslab_t *msp)
                 */
                range_tree_add(msp->ms_allocatable,
                    msp->ms_start, msp->ms_size);
+
+               if (msp->ms_freed != NULL) {
+                       /*
+                        * If the ms_sm doesn't exist, this means that this
+                        * metaslab hasn't gone through metaslab_sync() and
+                        * thus has never been dirtied. So we shouldn't
+                        * expect any unflushed allocs or frees from previous
+                        * TXGs.
+                        *
+                        * Note: ms_freed and all the other trees except for
+                        * the ms_allocatable, can be NULL at this point only
+                        * if this is a new metaslab of a vdev that just got
+                        * expanded.
+                        */
+                       ASSERT(range_tree_is_empty(msp->ms_unflushed_allocs));
+                       ASSERT(range_tree_is_empty(msp->ms_unflushed_frees));
+               }
        }
 
        /*
         * We need to grab the ms_sync_lock to prevent metaslab_sync() from
-        * changing the ms_sm and the metaslab's range trees while we are
-        * about to use them and populate the ms_allocatable. The ms_lock
-        * is insufficient for this because metaslab_sync() doesn't hold
-        * the ms_lock while writing the ms_checkpointing tree to disk.
+        * changing the ms_sm (or log_sm) and the metaslab's range trees
+        * while we are about to use them and populate the ms_allocatable.
+        * The ms_lock is insufficient for this because metaslab_sync() doesn't
+        * hold the ms_lock while writing the ms_checkpointing tree to disk.
         */
        mutex_enter(&msp->ms_sync_lock);
        mutex_enter(&msp->ms_lock);
+
        ASSERT(!msp->ms_condensing);
+       ASSERT(!msp->ms_flushing);
 
        if (error != 0) {
                mutex_exit(&msp->ms_sync_lock);
@@ -1728,10 +1802,60 @@ metaslab_load_impl(metaslab_t *msp)
        msp->ms_loaded = B_TRUE;
 
        /*
-        * The ms_allocatable contains the segments that exist in the
-        * ms_defer trees [see ms_synced_length]. Thus we need to remove
-        * them from ms_allocatable as they will be added again in
+        * Apply all the unflushed changes to ms_allocatable right
+        * away so any manipulations we do below have a clear view
+        * of what is allocated and what is free.
+        */
+       range_tree_walk(msp->ms_unflushed_allocs,
+           range_tree_remove, msp->ms_allocatable);
+       range_tree_walk(msp->ms_unflushed_frees,
+           range_tree_add, msp->ms_allocatable);
+
+       msp->ms_loaded = B_TRUE;
+
+       ASSERT3P(msp->ms_group, !=, NULL);
+       spa_t *spa = msp->ms_group->mg_vd->vdev_spa;
+       if (spa_syncing_log_sm(spa) != NULL) {
+               ASSERT(spa_feature_is_enabled(spa,
+                   SPA_FEATURE_LOG_SPACEMAP));
+
+               /*
+                * If we use a log space map we add all the segments
+                * that are in ms_unflushed_frees so they are available
+                * for allocation.
+                *
+                * ms_allocatable needs to contain all free segments
+                * that are ready for allocations (thus not segments
+                * from ms_freeing, ms_freed, and the ms_defer trees).
+                * But if we grab the lock in this code path at a sync
+                * pass later that 1, then it also contains the
+                * segments of ms_freed (they were added to it earlier
+                * in this path through ms_unflushed_frees). So we
+                * need to remove all the segments that exist in
+                * ms_freed from ms_allocatable as they will be added
+                * later in metaslab_sync_done().
+                *
+                * When there's no log space map, the ms_allocatable
+                * correctly doesn't contain any segments that exist
+                * in ms_freed [see ms_synced_length].
+                */
+               range_tree_walk(msp->ms_freed,
+                   range_tree_remove, msp->ms_allocatable);
+       }
+
+       /*
+        * If we are not using the log space map, ms_allocatable
+        * contains the segments that exist in the ms_defer trees
+        * [see ms_synced_length]. Thus we need to remove them
+        * from ms_allocatable as they will be added again in
         * metaslab_sync_done().
+        *
+        * If we are using the log space map, ms_allocatable still
+        * contains the segments that exist in the ms_defer trees.
+        * Not because it read them through the ms_sm though. But
+        * because these segments are part of ms_unflushed_frees
+        * whose segments we add to ms_allocatable earlier in this
+        * code path.
         */
        for (int t = 0; t < TXG_DEFER_SIZE; t++) {
                range_tree_walk(msp->ms_defer[t],
@@ -1756,10 +1880,26 @@ metaslab_load_impl(metaslab_t *msp)
                ASSERT3U(weight, <=, msp->ms_weight);
        msp->ms_max_size = metaslab_block_maxsize(msp);
 
-       spa_t *spa = msp->ms_group->mg_vd->vdev_spa;
+       hrtime_t load_end = gethrtime();
+       if (zfs_flags & ZFS_DEBUG_LOG_SPACEMAP) {
+               zfs_dbgmsg("loading: txg %llu, spa %s, vdev_id %llu, "
+                   "ms_id %llu, smp_length %llu, "
+                   "unflushed_allocs %llu, unflushed_frees %llu, "
+                   "freed %llu, defer %llu + %llu, "
+                   "loading_time %lld ms",
+                   spa_syncing_txg(spa), spa_name(spa),
+                   msp->ms_group->mg_vd->vdev_id, msp->ms_id,
+                   space_map_length(msp->ms_sm),
+                   range_tree_space(msp->ms_unflushed_allocs),
+                   range_tree_space(msp->ms_unflushed_frees),
+                   range_tree_space(msp->ms_freed),
+                   range_tree_space(msp->ms_defer[0]),
+                   range_tree_space(msp->ms_defer[1]),
+                   (longlong_t)((load_end - load_start) / 1000000));
+       }
+
        metaslab_verify_space(msp, spa_syncing_txg(spa));
        mutex_exit(&msp->ms_sync_lock);
-
        return (0);
 }
 
@@ -1778,8 +1918,32 @@ metaslab_load(metaslab_t *msp)
        VERIFY(!msp->ms_loading);
        ASSERT(!msp->ms_condensing);
 
+       /*
+        * We set the loading flag BEFORE potentially dropping the lock to
+        * wait for an ongoing flush (see ms_flushing below). This way other
+        * threads know that there is already a thread that is loading this
+        * metaslab.
+        */
        msp->ms_loading = B_TRUE;
+
+       /*
+        * Wait for any in-progress flushing to finish as we drop the ms_lock
+        * both here (during space_map_load()) and in metaslab_flush() (when
+        * we flush our changes to the ms_sm).
+        */
+       if (msp->ms_flushing)
+               metaslab_flush_wait(msp);
+
+       /*
+        * In the possibility that we were waiting for the metaslab to be
+        * flushed (where we temporarily dropped the ms_lock), ensure that
+        * no one else loaded the metaslab somehow.
+        */
+       ASSERT(!msp->ms_loaded);
+
        int error = metaslab_load_impl(msp);
+
+       ASSERT(MUTEX_HELD(&msp->ms_lock));
        msp->ms_loading = B_FALSE;
        cv_broadcast(&msp->ms_load_cv);
 
@@ -1806,7 +1970,7 @@ metaslab_unload(metaslab_t *msp)
         * have their weights calculated from the space map histograms, while
         * loaded ones have it calculated from their in-core range tree
         * [see metaslab_load()]. This way, the weight reflects the information
-        * available in-core, whether it is loaded or not
+        * available in-core, whether it is loaded or not.
         *
         * If ms_group == NULL means that we came here from metaslab_fini(),
         * at which point it doesn't make sense for us to do the recalculation
@@ -1816,7 +1980,7 @@ metaslab_unload(metaslab_t *msp)
                metaslab_recalculate_weight_and_sort(msp);
 }
 
-static void
+void
 metaslab_space_update(vdev_t *vd, metaslab_class_t *mc, int64_t alloc_delta,
     int64_t defer_delta, int64_t space_delta)
 {
@@ -1830,8 +1994,8 @@ metaslab_space_update(vdev_t *vd, metaslab_class_t *mc, int64_t alloc_delta,
 }
 
 int
-metaslab_init(metaslab_group_t *mg, uint64_t id, uint64_t object, uint64_t txg,
-    metaslab_t **msp)
+metaslab_init(metaslab_group_t *mg, uint64_t id, uint64_t object,
+    uint64_t txg, metaslab_t **msp)
 {
        vdev_t *vd = mg->mg_vd;
        spa_t *spa = vd->vdev_spa;
@@ -1843,6 +2007,7 @@ metaslab_init(metaslab_group_t *mg, uint64_t id, uint64_t object, uint64_t txg,
        mutex_init(&ms->ms_lock, NULL, MUTEX_DEFAULT, NULL);
        mutex_init(&ms->ms_sync_lock, NULL, MUTEX_DEFAULT, NULL);
        cv_init(&ms->ms_load_cv, NULL, CV_DEFAULT, NULL);
+       cv_init(&ms->ms_flush_cv, NULL, CV_DEFAULT, NULL);
 
        ms->ms_id = id;
        ms->ms_start = id << vd->vdev_ms_shift;
@@ -1905,17 +2070,6 @@ metaslab_init(metaslab_group_t *mg, uint64_t id, uint64_t object, uint64_t txg,
                    metaslab_allocated_space(ms), 0, 0);
        }
 
-       /*
-        * If metaslab_debug_load is set and we're initializing a metaslab
-        * that has an allocated space map object then load the space map
-        * so that we can verify frees.
-        */
-       if (metaslab_debug_load && ms->ms_sm != NULL) {
-               mutex_enter(&ms->ms_lock);
-               VERIFY0(metaslab_load(ms));
-               mutex_exit(&ms->ms_lock);
-       }
-
        if (txg != 0) {
                vdev_dirty(vd, 0, NULL, txg);
                vdev_dirty(vd, VDD_METASLAB, ms, txg);
@@ -1926,11 +2080,42 @@ metaslab_init(metaslab_group_t *mg, uint64_t id, uint64_t object, uint64_t txg,
        return (0);
 }
 
+static void
+metaslab_fini_flush_data(metaslab_t *msp)
+{
+       spa_t *spa = msp->ms_group->mg_vd->vdev_spa;
+
+       if (metaslab_unflushed_txg(msp) == 0) {
+               ASSERT3P(avl_find(&spa->spa_metaslabs_by_flushed, msp, NULL),
+                   ==, NULL);
+               return;
+       }
+       ASSERT(spa_feature_is_active(spa, SPA_FEATURE_LOG_SPACEMAP));
+
+       mutex_enter(&spa->spa_flushed_ms_lock);
+       avl_remove(&spa->spa_metaslabs_by_flushed, msp);
+       mutex_exit(&spa->spa_flushed_ms_lock);
+
+       spa_log_sm_decrement_mscount(spa, metaslab_unflushed_txg(msp));
+       spa_log_summary_decrement_mscount(spa, metaslab_unflushed_txg(msp));
+}
+
+uint64_t
+metaslab_unflushed_changes_memused(metaslab_t *ms)
+{
+       return ((range_tree_numsegs(ms->ms_unflushed_allocs) +
+           range_tree_numsegs(ms->ms_unflushed_frees)) *
+           sizeof (range_seg_t));
+}
+
 void
 metaslab_fini(metaslab_t *msp)
 {
        metaslab_group_t *mg = msp->ms_group;
        vdev_t *vd = mg->mg_vd;
+       spa_t *spa = vd->vdev_spa;
+
+       metaslab_fini_flush_data(msp);
 
        metaslab_group_remove(mg, msp);
 
@@ -1940,13 +2125,22 @@ metaslab_fini(metaslab_t *msp)
            -metaslab_allocated_space(msp), 0, -msp->ms_size);
 
        space_map_close(msp->ms_sm);
+       msp->ms_sm = NULL;
 
        metaslab_unload(msp);
-
        range_tree_destroy(msp->ms_allocatable);
        range_tree_destroy(msp->ms_freeing);
        range_tree_destroy(msp->ms_freed);
 
+       ASSERT3U(spa->spa_unflushed_stats.sus_memused, >=,
+           metaslab_unflushed_changes_memused(msp));
+       spa->spa_unflushed_stats.sus_memused -=
+           metaslab_unflushed_changes_memused(msp);
+       range_tree_vacate(msp->ms_unflushed_allocs, NULL, NULL);
+       range_tree_destroy(msp->ms_unflushed_allocs);
+       range_tree_vacate(msp->ms_unflushed_frees, NULL, NULL);
+       range_tree_destroy(msp->ms_unflushed_frees);
+
        for (int t = 0; t < TXG_SIZE; t++) {
                range_tree_destroy(msp->ms_allocating[t]);
        }
@@ -1966,6 +2160,7 @@ metaslab_fini(metaslab_t *msp)
 
        mutex_exit(&msp->ms_lock);
        cv_destroy(&msp->ms_load_cv);
+       cv_destroy(&msp->ms_flush_cv);
        mutex_destroy(&msp->ms_lock);
        mutex_destroy(&msp->ms_sync_lock);
        ASSERT3U(msp->ms_allocator, ==, -1);
@@ -2207,9 +2402,9 @@ metaslab_weight_from_range_tree(metaslab_t *msp)
 }
 
 /*
- * Calculate the weight based on the on-disk histogram. This should only
- * be called after a sync pass has completely finished since the on-disk
- * information is updated in metaslab_sync().
+ * Calculate the weight based on the on-disk histogram. Should be applied
+ * only to unloaded metaslabs  (i.e no incoming allocations) in-order to
+ * give results consistent with the on-disk state
  */
 static uint64_t
 metaslab_weight_from_spacemap(metaslab_t *msp)
@@ -2283,7 +2478,6 @@ metaslab_segment_weight(metaslab_t *msp)
                }
                WEIGHT_SET_ACTIVE(weight, 0);
                ASSERT(!WEIGHT_IS_SPACEBASED(weight));
-
                return (weight);
        }
 
@@ -2651,18 +2845,19 @@ metaslab_group_preload(metaslab_group_t *mg)
 }
 
 /*
- * Determine if the space map's on-disk footprint is past our tolerance
- * for inefficiency. We would like to use the following criteria to make
- * our decision:
+ * Determine if the space map's on-disk footprint is past our tolerance for
+ * inefficiency. We would like to use the following criteria to make our
+ * decision:
  *
- * 1. The size of the space map object should not dramatically increase as a
- * result of writing out the free space range tree.
+ * 1. Do not condense if the size of the space map object would dramatically
+ *    increase as a result of writing out the free space range tree.
  *
- * 2. The minimal on-disk space map representation is zfs_condense_pct/100
- * times the size than the free space range tree representation
- * (i.e. zfs_condense_pct = 110 and in-core = 1MB, minimal = 1.1MB).
+ * 2. Condense if the on on-disk space map representation is at least
+ *    zfs_condense_pct/100 times the size of the optimal representation
+ *    (i.e. zfs_condense_pct = 110 and in-core = 1MB, optimal = 1.1MB).
  *
- * 3. The on-disk size of the space map should actually decrease.
+ * 3. Do not condense if the on-disk size of the space map does not actually
+ *    decrease.
  *
  * Unfortunately, we cannot compute the on-disk size of the space map in this
  * context because we cannot accurately compute the effects of compression, etc.
@@ -2676,27 +2871,11 @@ metaslab_should_condense(metaslab_t *msp)
        space_map_t *sm = msp->ms_sm;
        vdev_t *vd = msp->ms_group->mg_vd;
        uint64_t vdev_blocksize = 1 << vd->vdev_ashift;
-       uint64_t current_txg = spa_syncing_txg(vd->vdev_spa);
 
        ASSERT(MUTEX_HELD(&msp->ms_lock));
        ASSERT(msp->ms_loaded);
-
-       /*
-        * Allocations and frees in early passes are generally more space
-        * efficient (in terms of blocks described in space map entries)
-        * than the ones in later passes (e.g. we don't compress after
-        * sync pass 5) and condensing a metaslab multiple times in a txg
-        * could degrade performance.
-        *
-        * Thus we prefer condensing each metaslab at most once every txg at
-        * the earliest sync pass possible. If a metaslab is eligible for
-        * condensing again after being considered for condensing within the
-        * same txg, it will hopefully be dirty in the next txg where it will
-        * be condensed at an earlier pass.
-        */
-       if (msp->ms_condense_checked_txg == current_txg)
-               return (B_FALSE);
-       msp->ms_condense_checked_txg = current_txg;
+       ASSERT(sm != NULL);
+       ASSERT3U(spa_sync_pass(vd->vdev_spa), ==, 1);
 
        /*
         * We always condense metaslabs that are empty and metaslabs for
@@ -2706,97 +2885,343 @@ metaslab_should_condense(metaslab_t *msp)
            msp->ms_condense_wanted)
                return (B_TRUE);
 
-       uint64_t object_size = space_map_length(msp->ms_sm);
+       uint64_t record_size = MAX(sm->sm_blksz, vdev_blocksize);
+       uint64_t object_size = space_map_length(sm);
        uint64_t optimal_size = space_map_estimate_optimal_size(sm,
            msp->ms_allocatable, SM_NO_VDEVID);
 
-       dmu_object_info_t doi;
-       dmu_object_info_from_db(sm->sm_dbuf, &doi);
-       uint64_t record_size = MAX(doi.doi_data_block_size, vdev_blocksize);
-
        return (object_size >= (optimal_size * zfs_condense_pct / 100) &&
            object_size > zfs_metaslab_condense_block_threshold * record_size);
 }
 
 /*
  * Condense the on-disk space map representation to its minimized form.
- * The minimized form consists of a small number of allocations followed by
- * the entries of the free range tree.
+ * The minimized form consists of a small number of allocations followed
+ * by the entries of the free range tree (ms_allocatable). The condensed
+ * spacemap contains all the entries of previous TXGs (including those in
+ * the pool-wide log spacemaps; thus this is effectively a superset of
+ * metaslab_flush()), but this TXG's entries still need to be written.
  */
 static void
-metaslab_condense(metaslab_t *msp, uint64_t txg, dmu_tx_t *tx)
+metaslab_condense(metaslab_t *msp, dmu_tx_t *tx)
 {
        range_tree_t *condense_tree;
        space_map_t *sm = msp->ms_sm;
+       uint64_t txg = dmu_tx_get_txg(tx);
+       spa_t *spa = msp->ms_group->mg_vd->vdev_spa;
 
        ASSERT(MUTEX_HELD(&msp->ms_lock));
        ASSERT(msp->ms_loaded);
+       ASSERT(msp->ms_sm != NULL);
 
+       /*
+        * In order to condense the space map, we need to change it so it
+        * only describes which segments are currently allocated and free.
+        *
+        * All the current free space resides in the ms_allocatable, all
+        * the ms_defer trees, and all the ms_allocating trees. We ignore
+        * ms_freed because it is empty because we're in sync pass 1. We
+        * ignore ms_freeing because these changes are not yet reflected
+        * in the spacemap (they will be written later this txg).
+        *
+        * So to truncate the space map to represent all the entries of
+        * previous TXGs we do the following:
+        *
+        * 1] We create a range tree (condense tree) that is 100% allocated.
+        * 2] We remove from it all segments found in the ms_defer trees
+        *    as those segments are marked as free in the original space
+        *    map. We do the same with the ms_allocating trees for the same
+        *    reason. Removing these segments should be a relatively
+        *    inexpensive operation since we expect these trees to have a
+        *    small number of nodes.
+        * 3] We vacate any unflushed allocs as they should already exist
+        *    in the condense tree. Then we vacate any unflushed frees as
+        *    they should already be part of ms_allocatable.
+        * 4] At this point, we would ideally like to remove all segments
+        *    in the ms_allocatable tree from the condense tree. This way
+        *    we would write all the entries of the condense tree as the
+        *    condensed space map, which would only contain allocated
+        *    segments with everything else assumed to be freed.
+        *
+        *    Doing so can be prohibitively expensive as ms_allocatable can
+        *    be large, and therefore computationally expensive to subtract
+        *    from the condense_tree. Instead we first sync out the
+        *    condense_tree and then the ms_allocatable, in the condensed
+        *    space map. While this is not optimal, it is typically close to
+        *    optimal and more importantly much cheaper to compute.
+        *
+        * 5] Finally, as both of the unflushed trees were written to our
+        *    new and condensed metaslab space map, we basically flushed
+        *    all the unflushed changes to disk, thus we call
+        *    metaslab_flush_update().
+        */
+       ASSERT3U(spa_sync_pass(spa), ==, 1);
+       ASSERT(range_tree_is_empty(msp->ms_freed)); /* since it is pass 1 */
 
        zfs_dbgmsg("condensing: txg %llu, msp[%llu] %px, vdev id %llu, "
            "spa %s, smp size %llu, segments %lu, forcing condense=%s", txg,
            msp->ms_id, msp, msp->ms_group->mg_vd->vdev_id,
-           msp->ms_group->mg_vd->vdev_spa->spa_name,
-           space_map_length(msp->ms_sm),
+           spa->spa_name, space_map_length(msp->ms_sm),
            avl_numnodes(&msp->ms_allocatable->rt_root),
            msp->ms_condense_wanted ? "TRUE" : "FALSE");
 
        msp->ms_condense_wanted = B_FALSE;
 
-       /*
-        * Create an range tree that is 100% allocated. We remove segments
-        * that have been freed in this txg, any deferred frees that exist,
-        * and any allocation in the future. Removing segments should be
-        * a relatively inexpensive operation since we expect these trees to
-        * have a small number of nodes.
-        */
        condense_tree = range_tree_create(NULL, NULL);
        range_tree_add(condense_tree, msp->ms_start, msp->ms_size);
 
-       range_tree_walk(msp->ms_freeing, range_tree_remove, condense_tree);
-       range_tree_walk(msp->ms_freed, range_tree_remove, condense_tree);
-
        for (int t = 0; t < TXG_DEFER_SIZE; t++) {
                range_tree_walk(msp->ms_defer[t],
                    range_tree_remove, condense_tree);
        }
 
-       for (int t = 1; t < TXG_CONCURRENT_STATES; t++) {
+       for (int t = 0; t < TXG_CONCURRENT_STATES; t++) {
                range_tree_walk(msp->ms_allocating[(txg + t) & TXG_MASK],
                    range_tree_remove, condense_tree);
        }
 
+       ASSERT3U(spa->spa_unflushed_stats.sus_memused, >=,
+           metaslab_unflushed_changes_memused(msp));
+       spa->spa_unflushed_stats.sus_memused -=
+           metaslab_unflushed_changes_memused(msp);
+       range_tree_vacate(msp->ms_unflushed_allocs, NULL, NULL);
+       range_tree_vacate(msp->ms_unflushed_frees, NULL, NULL);
+
        /*
-        * We're about to drop the metaslab's lock thus allowing
-        * other consumers to change it's content. Set the
-        * metaslab's ms_condensing flag to ensure that
-        * allocations on this metaslab do not occur while we're
-        * in the middle of committing it to disk. This is only critical
-        * for ms_allocatable as all other range trees use per txg
+        * We're about to drop the metaslab's lock thus allowing other
+        * consumers to change it's content. Set the metaslab's ms_condensing
+        * flag to ensure that allocations on this metaslab do not occur
+        * while we're in the middle of committing it to disk. This is only
+        * critical for ms_allocatable as all other range trees use per TXG
         * views of their content.
         */
        msp->ms_condensing = B_TRUE;
 
        mutex_exit(&msp->ms_lock);
-       space_map_truncate(sm, zfs_metaslab_sm_blksz, tx);
+       uint64_t object = space_map_object(msp->ms_sm);
+       space_map_truncate(sm,
+           spa_feature_is_enabled(spa, SPA_FEATURE_LOG_SPACEMAP) ?
+           zfs_metaslab_sm_blksz_with_log : zfs_metaslab_sm_blksz_no_log, tx);
+
+       /*
+        * space_map_truncate() may have reallocated the spacemap object.
+        * If so, update the vdev_ms_array.
+        */
+       if (space_map_object(msp->ms_sm) != object) {
+               object = space_map_object(msp->ms_sm);
+               dmu_write(spa->spa_meta_objset,
+                   msp->ms_group->mg_vd->vdev_ms_array, sizeof (uint64_t) *
+                   msp->ms_id, sizeof (uint64_t), &object, tx);
+       }
 
        /*
-        * While we would ideally like to create a space map representation
-        * that consists only of allocation records, doing so can be
-        * prohibitively expensive because the in-core free tree can be
-        * large, and therefore computationally expensive to subtract
-        * from the condense_tree. Instead we sync out two trees, a cheap
-        * allocation only tree followed by the in-core free tree. While not
-        * optimal, this is typically close to optimal, and much cheaper to
-        * compute.
+        * Note:
+        * When the log space map feature is enabled, each space map will
+        * always have ALLOCS followed by FREES for each sync pass. This is
+        * typically true even when the log space map feature is disabled,
+        * except from the case where a metaslab goes through metaslab_sync()
+        * and gets condensed. In that case the metaslab's space map will have
+        * ALLOCS followed by FREES (due to condensing) followed by ALLOCS
+        * followed by FREES (due to space_map_write() in metaslab_sync()) for
+        * sync pass 1.
         */
        space_map_write(sm, condense_tree, SM_ALLOC, SM_NO_VDEVID, tx);
+       space_map_write(sm, msp->ms_allocatable, SM_FREE, SM_NO_VDEVID, tx);
+
        range_tree_vacate(condense_tree, NULL, NULL);
        range_tree_destroy(condense_tree);
-
-       space_map_write(sm, msp->ms_allocatable, SM_FREE, SM_NO_VDEVID, tx);
        mutex_enter(&msp->ms_lock);
+
        msp->ms_condensing = B_FALSE;
+       metaslab_flush_update(msp, tx);
+}
+
+/*
+ * Called when the metaslab has been flushed (its own spacemap now reflects
+ * all the contents of the pool-wide spacemap log). Updates the metaslab's
+ * metadata and any pool-wide related log space map data (e.g. summary,
+ * obsolete logs, etc..) to reflect that.
+ */
+static void
+metaslab_flush_update(metaslab_t *msp, dmu_tx_t *tx)
+{
+       metaslab_group_t *mg = msp->ms_group;
+       spa_t *spa = mg->mg_vd->vdev_spa;
+
+       ASSERT(MUTEX_HELD(&msp->ms_lock));
+
+       ASSERT3U(spa_sync_pass(spa), ==, 1);
+       ASSERT(range_tree_is_empty(msp->ms_unflushed_allocs));
+       ASSERT(range_tree_is_empty(msp->ms_unflushed_frees));
+
+       /*
+        * Just because a metaslab got flushed, that doesn't mean that
+        * it will pass through metaslab_sync_done(). Thus, make sure to
+        * update ms_synced_length here in case it doesn't.
+        */
+       msp->ms_synced_length = space_map_length(msp->ms_sm);
+
+       /*
+        * We may end up here from metaslab_condense() without the
+        * feature being active. In that case this is a no-op.
+        */
+       if (!spa_feature_is_active(spa, SPA_FEATURE_LOG_SPACEMAP))
+               return;
+
+       ASSERT(spa_syncing_log_sm(spa) != NULL);
+       ASSERT(msp->ms_sm != NULL);
+       ASSERT(metaslab_unflushed_txg(msp) != 0);
+       ASSERT3P(avl_find(&spa->spa_metaslabs_by_flushed, msp, NULL), ==, msp);
+
+       VERIFY3U(tx->tx_txg, <=, spa_final_dirty_txg(spa));
+
+       /* update metaslab's position in our flushing tree */
+       uint64_t ms_prev_flushed_txg = metaslab_unflushed_txg(msp);
+       mutex_enter(&spa->spa_flushed_ms_lock);
+       avl_remove(&spa->spa_metaslabs_by_flushed, msp);
+       metaslab_set_unflushed_txg(msp, spa_syncing_txg(spa), tx);
+       avl_add(&spa->spa_metaslabs_by_flushed, msp);
+       mutex_exit(&spa->spa_flushed_ms_lock);
+
+       /* update metaslab counts of spa_log_sm_t nodes */
+       spa_log_sm_decrement_mscount(spa, ms_prev_flushed_txg);
+       spa_log_sm_increment_current_mscount(spa);
+
+       /* cleanup obsolete logs if any */
+       uint64_t log_blocks_before = spa_log_sm_nblocks(spa);
+       spa_cleanup_old_sm_logs(spa, tx);
+       uint64_t log_blocks_after = spa_log_sm_nblocks(spa);
+       VERIFY3U(log_blocks_after, <=, log_blocks_before);
+
+       /* update log space map summary */
+       uint64_t blocks_gone = log_blocks_before - log_blocks_after;
+       spa_log_summary_add_flushed_metaslab(spa);
+       spa_log_summary_decrement_mscount(spa, ms_prev_flushed_txg);
+       spa_log_summary_decrement_blkcount(spa, blocks_gone);
+}
+
+boolean_t
+metaslab_flush(metaslab_t *msp, dmu_tx_t *tx)
+{
+       spa_t *spa = msp->ms_group->mg_vd->vdev_spa;
+
+       ASSERT(MUTEX_HELD(&msp->ms_lock));
+       ASSERT3U(spa_sync_pass(spa), ==, 1);
+       ASSERT(spa_feature_is_active(spa, SPA_FEATURE_LOG_SPACEMAP));
+
+       ASSERT(msp->ms_sm != NULL);
+       ASSERT(metaslab_unflushed_txg(msp) != 0);
+       ASSERT(avl_find(&spa->spa_metaslabs_by_flushed, msp, NULL) != NULL);
+
+       /*
+        * There is nothing wrong with flushing the same metaslab twice, as
+        * this codepath should work on that case. However, the current
+        * flushing scheme makes sure to avoid this situation as we would be
+        * making all these calls without having anything meaningful to write
+        * to disk. We assert this behavior here.
+        */
+       ASSERT3U(metaslab_unflushed_txg(msp), <, dmu_tx_get_txg(tx));
+
+       /*
+        * We can not flush while loading, because then we would
+        * not load the ms_unflushed_{allocs,frees}.
+        */
+       if (msp->ms_loading)
+               return (B_FALSE);
+
+       metaslab_verify_space(msp, dmu_tx_get_txg(tx));
+       metaslab_verify_weight_and_frag(msp);
+
+       /*
+        * Metaslab condensing is effectively flushing. Therefore if the
+        * metaslab can be condensed we can just condense it instead of
+        * flushing it.
+        *
+        * Note that metaslab_condense() does call metaslab_flush_update()
+        * so we can just return immediately after condensing. We also
+        * don't need to care about setting ms_flushing or broadcasting
+        * ms_flush_cv, even if we temporarily drop the ms_lock in
+        * metaslab_condense(), as the metaslab is already loaded.
+        */
+       if (msp->ms_loaded && metaslab_should_condense(msp)) {
+               metaslab_group_t *mg = msp->ms_group;
+
+               /*
+                * For all histogram operations below refer to the
+                * comments of metaslab_sync() where we follow a
+                * similar procedure.
+                */
+               metaslab_group_histogram_verify(mg);
+               metaslab_class_histogram_verify(mg->mg_class);
+               metaslab_group_histogram_remove(mg, msp);
+
+               metaslab_condense(msp, tx);
+
+               space_map_histogram_clear(msp->ms_sm);
+               space_map_histogram_add(msp->ms_sm, msp->ms_allocatable, tx);
+               ASSERT(range_tree_is_empty(msp->ms_freed));
+               for (int t = 0; t < TXG_DEFER_SIZE; t++) {
+                       space_map_histogram_add(msp->ms_sm,
+                           msp->ms_defer[t], tx);
+               }
+               metaslab_aux_histograms_update(msp);
+
+               metaslab_group_histogram_add(mg, msp);
+               metaslab_group_histogram_verify(mg);
+               metaslab_class_histogram_verify(mg->mg_class);
+
+               metaslab_verify_space(msp, dmu_tx_get_txg(tx));
+
+               /*
+                * Since we recreated the histogram (and potentially
+                * the ms_sm too while condensing) ensure that the
+                * weight is updated too because we are not guaranteed
+                * that this metaslab is dirty and will go through
+                * metaslab_sync_done().
+                */
+               metaslab_recalculate_weight_and_sort(msp);
+               return (B_TRUE);
+       }
+
+       msp->ms_flushing = B_TRUE;
+       uint64_t sm_len_before = space_map_length(msp->ms_sm);
+
+       mutex_exit(&msp->ms_lock);
+       space_map_write(msp->ms_sm, msp->ms_unflushed_allocs, SM_ALLOC,
+           SM_NO_VDEVID, tx);
+       space_map_write(msp->ms_sm, msp->ms_unflushed_frees, SM_FREE,
+           SM_NO_VDEVID, tx);
+       mutex_enter(&msp->ms_lock);
+
+       uint64_t sm_len_after = space_map_length(msp->ms_sm);
+       if (zfs_flags & ZFS_DEBUG_LOG_SPACEMAP) {
+               zfs_dbgmsg("flushing: txg %llu, spa %s, vdev_id %llu, "
+                   "ms_id %llu, unflushed_allocs %llu, unflushed_frees %llu, "
+                   "appended %llu bytes", dmu_tx_get_txg(tx), spa_name(spa),
+                   msp->ms_group->mg_vd->vdev_id, msp->ms_id,
+                   range_tree_space(msp->ms_unflushed_allocs),
+                   range_tree_space(msp->ms_unflushed_frees),
+                   (sm_len_after - sm_len_before));
+       }
+
+       ASSERT3U(spa->spa_unflushed_stats.sus_memused, >=,
+           metaslab_unflushed_changes_memused(msp));
+       spa->spa_unflushed_stats.sus_memused -=
+           metaslab_unflushed_changes_memused(msp);
+       range_tree_vacate(msp->ms_unflushed_allocs, NULL, NULL);
+       range_tree_vacate(msp->ms_unflushed_frees, NULL, NULL);
+
+       metaslab_verify_space(msp, dmu_tx_get_txg(tx));
+       metaslab_verify_weight_and_frag(msp);
+
+       metaslab_flush_update(msp, tx);
+
+       metaslab_verify_space(msp, dmu_tx_get_txg(tx));
+       metaslab_verify_weight_and_frag(msp);
+
+       msp->ms_flushing = B_FALSE;
+       cv_broadcast(&msp->ms_flush_cv);
+       return (B_TRUE);
 }
 
 /*
@@ -2811,7 +3236,6 @@ metaslab_sync(metaslab_t *msp, uint64_t txg)
        objset_t *mos = spa_meta_objset(spa);
        range_tree_t *alloctree = msp->ms_allocating[txg & TXG_MASK];
        dmu_tx_t *tx;
-       uint64_t object = space_map_object(msp->ms_sm);
 
        ASSERT(!vd->vdev_ishole);
 
@@ -2858,25 +3282,53 @@ metaslab_sync(metaslab_t *msp, uint64_t txg)
         */
        tx = dmu_tx_create_assigned(spa_get_dsl(spa), txg);
 
-       if (msp->ms_sm == NULL) {
-               uint64_t new_object;
+       /*
+        * Generate a log space map if one doesn't exist already.
+        */
+       spa_generate_syncing_log_sm(spa, tx);
 
-               new_object = space_map_alloc(mos, zfs_metaslab_sm_blksz, tx);
+       if (msp->ms_sm == NULL) {
+               uint64_t new_object = space_map_alloc(mos,
+                   spa_feature_is_enabled(spa, SPA_FEATURE_LOG_SPACEMAP) ?
+                   zfs_metaslab_sm_blksz_with_log :
+                   zfs_metaslab_sm_blksz_no_log, tx);
                VERIFY3U(new_object, !=, 0);
 
+               dmu_write(mos, vd->vdev_ms_array, sizeof (uint64_t) *
+                   msp->ms_id, sizeof (uint64_t), &new_object, tx);
+
                VERIFY0(space_map_open(&msp->ms_sm, mos, new_object,
                    msp->ms_start, msp->ms_size, vd->vdev_ashift));
-
                ASSERT(msp->ms_sm != NULL);
+
+               ASSERT(range_tree_is_empty(msp->ms_unflushed_allocs));
+               ASSERT(range_tree_is_empty(msp->ms_unflushed_frees));
                ASSERT0(metaslab_allocated_space(msp));
        }
 
+       if (metaslab_unflushed_txg(msp) == 0 &&
+           spa_feature_is_active(spa, SPA_FEATURE_LOG_SPACEMAP)) {
+               ASSERT(spa_syncing_log_sm(spa) != NULL);
+
+               metaslab_set_unflushed_txg(msp, spa_syncing_txg(spa), tx);
+               spa_log_sm_increment_current_mscount(spa);
+               spa_log_summary_add_flushed_metaslab(spa);
+
+               ASSERT(msp->ms_sm != NULL);
+               mutex_enter(&spa->spa_flushed_ms_lock);
+               avl_add(&spa->spa_metaslabs_by_flushed, msp);
+               mutex_exit(&spa->spa_flushed_ms_lock);
+
+               ASSERT(range_tree_is_empty(msp->ms_unflushed_allocs));
+               ASSERT(range_tree_is_empty(msp->ms_unflushed_frees));
+       }
+
        if (!range_tree_is_empty(msp->ms_checkpointing) &&
            vd->vdev_checkpoint_sm == NULL) {
                ASSERT(spa_has_checkpoint(spa));
 
                uint64_t new_object = space_map_alloc(mos,
-                   vdev_standard_sm_blksz, tx);
+                   zfs_vdev_standard_sm_blksz, tx);
                VERIFY3U(new_object, !=, 0);
 
                VERIFY0(space_map_open(&vd->vdev_checkpoint_sm,
@@ -2905,10 +3357,39 @@ metaslab_sync(metaslab_t *msp, uint64_t txg)
        metaslab_class_histogram_verify(mg->mg_class);
        metaslab_group_histogram_remove(mg, msp);
 
-       if (msp->ms_loaded && metaslab_should_condense(msp)) {
-               metaslab_condense(msp, txg, tx);
+       if (spa->spa_sync_pass == 1 && msp->ms_loaded &&
+           metaslab_should_condense(msp))
+               metaslab_condense(msp, tx);
+
+       /*
+        * We'll be going to disk to sync our space accounting, thus we
+        * drop the ms_lock during that time so allocations coming from
+        * open-context (ZIL) for future TXGs do not block.
+        */
+       mutex_exit(&msp->ms_lock);
+       space_map_t *log_sm = spa_syncing_log_sm(spa);
+       if (log_sm != NULL) {
+               ASSERT(spa_feature_is_enabled(spa, SPA_FEATURE_LOG_SPACEMAP));
+
+               space_map_write(log_sm, alloctree, SM_ALLOC,
+                   vd->vdev_id, tx);
+               space_map_write(log_sm, msp->ms_freeing, SM_FREE,
+                   vd->vdev_id, tx);
+               mutex_enter(&msp->ms_lock);
+
+               ASSERT3U(spa->spa_unflushed_stats.sus_memused, >=,
+                   metaslab_unflushed_changes_memused(msp));
+               spa->spa_unflushed_stats.sus_memused -=
+                   metaslab_unflushed_changes_memused(msp);
+               range_tree_remove_xor_add(alloctree,
+                   msp->ms_unflushed_frees, msp->ms_unflushed_allocs);
+               range_tree_remove_xor_add(msp->ms_freeing,
+                   msp->ms_unflushed_allocs, msp->ms_unflushed_frees);
+               spa->spa_unflushed_stats.sus_memused +=
+                   metaslab_unflushed_changes_memused(msp);
        } else {
-               mutex_exit(&msp->ms_lock);
+               ASSERT(!spa_feature_is_enabled(spa, SPA_FEATURE_LOG_SPACEMAP));
+
                space_map_write(msp->ms_sm, alloctree, SM_ALLOC,
                    SM_NO_VDEVID, tx);
                space_map_write(msp->ms_sm, msp->ms_freeing, SM_FREE,
@@ -2928,7 +3409,8 @@ metaslab_sync(metaslab_t *msp, uint64_t txg)
                /*
                 * Since we are doing writes to disk and the ms_checkpointing
                 * tree won't be changing during that time, we drop the
-                * ms_lock while writing to the checkpoint space map.
+                * ms_lock while writing to the checkpoint space map, for the
+                * same reason mentioned above.
                 */
                mutex_exit(&msp->ms_lock);
                space_map_write(vd->vdev_checkpoint_sm,
@@ -2996,6 +3478,10 @@ metaslab_sync(metaslab_t *msp, uint64_t txg)
         * and instead will just swap the pointers for freeing and freed.
         * We can safely do this since the freed_tree is guaranteed to be
         * empty on the initial pass.
+        *
+        * Keep in mind that even if we are currently using a log spacemap
+        * we want current frees to end up in the ms_allocatable (but not
+        * get appended to the ms_sm) so their ranges can be reused as usual.
         */
        if (spa_sync_pass(spa) == 1) {
                range_tree_swap(&msp->ms_freeing, &msp->ms_freed);
@@ -3015,11 +3501,15 @@ metaslab_sync(metaslab_t *msp, uint64_t txg)
 
        mutex_exit(&msp->ms_lock);
 
-       if (object != space_map_object(msp->ms_sm)) {
-               object = space_map_object(msp->ms_sm);
-               dmu_write(mos, vd->vdev_ms_array, sizeof (uint64_t) *
-                   msp->ms_id, sizeof (uint64_t), &object, tx);
-       }
+       /*
+        * Verify that the space map object ID has been recorded in the
+        * vdev_ms_array.
+        */
+       uint64_t object;
+       VERIFY0(dmu_read(mos, vd->vdev_ms_array,
+           msp->ms_id * sizeof (uint64_t), sizeof (uint64_t), &object, 0));
+       VERIFY3U(object, ==, space_map_object(msp->ms_sm));
+
        mutex_exit(&msp->ms_sync_lock);
        dmu_tx_commit(tx);
 }
@@ -3084,14 +3574,18 @@ metaslab_sync_done(metaslab_t *msp, uint64_t txg)
                msp->ms_freed = range_tree_create(NULL, NULL);
 
                for (int t = 0; t < TXG_DEFER_SIZE; t++) {
-                       ASSERT(msp->ms_defer[t] == NULL);
-
+                       ASSERT3P(msp->ms_defer[t], ==, NULL);
                        msp->ms_defer[t] = range_tree_create(NULL, NULL);
                }
 
                ASSERT3P(msp->ms_checkpointing, ==, NULL);
                msp->ms_checkpointing = range_tree_create(NULL, NULL);
 
+               ASSERT3P(msp->ms_unflushed_allocs, ==, NULL);
+               msp->ms_unflushed_allocs = range_tree_create(NULL, NULL);
+               ASSERT3P(msp->ms_unflushed_frees, ==, NULL);
+               msp->ms_unflushed_frees = range_tree_create(NULL, NULL);
+
                metaslab_space_update(vd, mg->mg_class, 0, 0, msp->ms_size);
        }
        ASSERT0(range_tree_space(msp->ms_freeing));
@@ -3108,21 +3602,28 @@ metaslab_sync_done(metaslab_t *msp, uint64_t txg)
        defer_delta = 0;
        alloc_delta = msp->ms_allocated_this_txg -
            range_tree_space(msp->ms_freed);
+
        if (defer_allowed) {
                defer_delta = range_tree_space(msp->ms_freed) -
                    range_tree_space(*defer_tree);
        } else {
                defer_delta -= range_tree_space(*defer_tree);
        }
-
        metaslab_space_update(vd, mg->mg_class, alloc_delta + defer_delta,
            defer_delta, 0);
 
-       /*
-        * If there's a metaslab_load() in progress, wait for it to complete
-        * so that we have a consistent view of the in-core space map.
-        */
-       metaslab_load_wait(msp);
+       if (spa_syncing_log_sm(spa) == NULL) {
+               /*
+                * If there's a metaslab_load() in progress and we don't have
+                * a log space map, it means that we probably wrote to the
+                * metaslab's space map. If this is the case, we need to
+                * make sure that we wait for the load to complete so that we
+                * have a consistent view at the in-core side of the metaslab.
+                */
+               metaslab_load_wait(msp);
+       } else {
+               ASSERT(spa_feature_is_active(spa, SPA_FEATURE_LOG_SPACEMAP));
+       }
 
        /*
         * When auto-trimming is enabled, free ranges which are added to
@@ -3451,6 +3952,7 @@ metaslab_block_alloc(metaslab_t *msp, uint64_t size, uint64_t txg)
        range_tree_t *rt = msp->ms_allocatable;
        metaslab_class_t *mc = msp->ms_group->mg_class;
 
+       ASSERT(MUTEX_HELD(&msp->ms_lock));
        VERIFY(!msp->ms_condensing);
        VERIFY0(msp->ms_disabled);
 
@@ -4864,12 +5366,23 @@ metaslab_check_free_impl(vdev_t *vd, uint64_t offset, uint64_t size)
                    offset, size);
        }
 
-       range_tree_verify_not_present(msp->ms_trim, offset, size);
+       /*
+        * Check all segments that currently exist in the freeing pipeline.
+        *
+        * It would intuitively make sense to also check the current allocating
+        * tree since metaslab_unalloc_dva() exists for extents that are
+        * allocated and freed in the same sync pass withing the same txg.
+        * Unfortunately there are places (e.g. the ZIL) where we allocate a
+        * segment but then we free part of it within the same txg
+        * [see zil_sync()]. Thus, we don't call range_tree_verify() in the
+        * current allocating tree.
+        */
        range_tree_verify_not_present(msp->ms_freeing, offset, size);
        range_tree_verify_not_present(msp->ms_checkpointing, offset, size);
        range_tree_verify_not_present(msp->ms_freed, offset, size);
        for (int j = 0; j < TXG_DEFER_SIZE; j++)
                range_tree_verify_not_present(msp->ms_defer[j], offset, size);
+       range_tree_verify_not_present(msp->ms_trim, offset, size);
        mutex_exit(&msp->ms_lock);
 }
 
@@ -4979,6 +5492,57 @@ metaslab_enable(metaslab_t *msp, boolean_t sync)
        mutex_exit(&mg->mg_ms_disabled_lock);
 }
 
+static void
+metaslab_update_ondisk_flush_data(metaslab_t *ms, dmu_tx_t *tx)
+{
+       vdev_t *vd = ms->ms_group->mg_vd;
+       spa_t *spa = vd->vdev_spa;
+       objset_t *mos = spa_meta_objset(spa);
+
+       ASSERT(spa_feature_is_active(spa, SPA_FEATURE_LOG_SPACEMAP));
+
+       metaslab_unflushed_phys_t entry = {
+               .msp_unflushed_txg = metaslab_unflushed_txg(ms),
+       };
+       uint64_t entry_size = sizeof (entry);
+       uint64_t entry_offset = ms->ms_id * entry_size;
+
+       uint64_t object = 0;
+       int err = zap_lookup(mos, vd->vdev_top_zap,
+           VDEV_TOP_ZAP_MS_UNFLUSHED_PHYS_TXGS, sizeof (uint64_t), 1,
+           &object);
+       if (err == ENOENT) {
+               object = dmu_object_alloc(mos, DMU_OTN_UINT64_METADATA,
+                   SPA_OLD_MAXBLOCKSIZE, DMU_OT_NONE, 0, tx);
+               VERIFY0(zap_add(mos, vd->vdev_top_zap,
+                   VDEV_TOP_ZAP_MS_UNFLUSHED_PHYS_TXGS, sizeof (uint64_t), 1,
+                   &object, tx));
+       } else {
+               VERIFY0(err);
+       }
+
+       dmu_write(spa_meta_objset(spa), object, entry_offset, entry_size,
+           &entry, tx);
+}
+
+void
+metaslab_set_unflushed_txg(metaslab_t *ms, uint64_t txg, dmu_tx_t *tx)
+{
+       spa_t *spa = ms->ms_group->mg_vd->vdev_spa;
+
+       if (!spa_feature_is_active(spa, SPA_FEATURE_LOG_SPACEMAP))
+               return;
+
+       ms->ms_unflushed_txg = txg;
+       metaslab_update_ondisk_flush_data(ms, tx);
+}
+
+uint64_t
+metaslab_unflushed_txg(metaslab_t *ms)
+{
+       return (ms->ms_unflushed_txg);
+}
+
 #if defined(_KERNEL)
 /* BEGIN CSTYLED */
 module_param(metaslab_aliquot, ulong, 0644);