]> granicus.if.org Git - zfs/commitdiff
Add 'feature-commit-cb' branch for DMU commit callbacks.
authorBrian Behlendorf <behlendorf1@llnl.gov>
Fri, 20 Mar 2009 03:30:14 +0000 (20:30 -0700)
committerBrian Behlendorf <behlendorf1@llnl.gov>
Fri, 20 Mar 2009 03:30:14 +0000 (20:30 -0700)
.topdeps [new file with mode: 0644]
.topmsg [new file with mode: 0644]
cmd/ztest/ztest.c
module/zfs/dmu_tx.c
module/zfs/include/sys/dmu.h
module/zfs/include/sys/dmu_tx.h
module/zfs/include/sys/txg.h
module/zfs/include/sys/txg_impl.h
module/zfs/txg.c

diff --git a/.topdeps b/.topdeps
new file mode 100644 (file)
index 0000000..1f7391f
--- /dev/null
+++ b/.topdeps
@@ -0,0 +1 @@
+master
diff --git a/.topmsg b/.topmsg
new file mode 100644 (file)
index 0000000..33c1e1c
--- /dev/null
+++ b/.topmsg
@@ -0,0 +1,8 @@
+From: Brian Behlendorf <behlendorf1@llnl.gov>
+Subject: [PATCH] feature commit cb
+
+DMU commit callbacks. Provides an API to attach callbacks to transactions,
+which are called by the DMU when they are safely committed to disk.
+See Lustre bug 14117 for details.
+
+Signed-off-by: Brian Behlendorf <behlendorf1@llnl.gov>
index 4503a3d02802104262b458430f54318b7901ae92..efe4fd06bc621286765b0f122c93e70f0b1bced0 100644 (file)
@@ -164,6 +164,7 @@ typedef void ztest_func_t(ztest_args_t *);
 ztest_func_t ztest_dmu_read_write;
 ztest_func_t ztest_dmu_write_parallel;
 ztest_func_t ztest_dmu_object_alloc_free;
+ztest_func_t ztest_dmu_commit_callbacks;
 ztest_func_t ztest_zap;
 ztest_func_t ztest_zap_parallel;
 ztest_func_t ztest_traverse;
@@ -198,6 +199,7 @@ ztest_info_t ztest_info[] = {
        { ztest_dmu_read_write,                 1,      &zopt_always    },
        { ztest_dmu_write_parallel,             30,     &zopt_always    },
        { ztest_dmu_object_alloc_free,          1,      &zopt_always    },
+       { ztest_dmu_commit_callbacks,           10,     &zopt_always    },
        { ztest_zap,                            30,     &zopt_always    },
        { ztest_zap_parallel,                   100,    &zopt_always    },
        { ztest_dsl_prop_get_set,               1,      &zopt_sometimes },
@@ -217,6 +219,16 @@ ztest_info_t ztest_info[] = {
 
 #define        ZTEST_SYNC_LOCKS        16
 
+/*
+ * The following struct is used to hold a list of uncalled commit callbacks.
+ *
+ * The callbacks are ordered by txg number.
+ */
+typedef struct ztest_cb_list {
+       mutex_t zcl_callbacks_lock;
+       list_t  zcl_callbacks;
+} ztest_cb_list_t;
+
 /*
  * Stuff we need to share writably between parent and child.
  */
@@ -233,6 +245,7 @@ typedef struct ztest_shared {
        ztest_info_t    zs_info[ZTEST_FUNCS];
        mutex_t         zs_sync_lock[ZTEST_SYNC_LOCKS];
        uint64_t        zs_seq[ZTEST_SYNC_LOCKS];
+       ztest_cb_list_t zs_cb_list;
 } ztest_shared_t;
 
 static char ztest_dev_template[] = "%s/%s.%llua";
@@ -2433,6 +2446,205 @@ ztest_zap_parallel(ztest_args_t *za)
                dmu_tx_commit(tx);
 }
 
+/*
+ * Commit callback data.
+ */
+typedef struct ztest_cb_data {
+       list_node_t             zcd_node;
+       ztest_cb_list_t         *zcd_zcl;
+       uint64_t                zcd_txg;
+       int                     zcd_expected_err;
+       boolean_t               zcd_added;
+       boolean_t               zcd_called;
+       spa_t                   *zcd_spa;
+} ztest_cb_data_t;
+
+static void
+ztest_commit_callback(void *arg, int error)
+{
+       ztest_cb_data_t *data = arg;
+       ztest_cb_list_t *zcl;
+       uint64_t synced_txg;
+
+       VERIFY(data != NULL);
+       VERIFY3S(data->zcd_expected_err, ==, error);
+       VERIFY(!data->zcd_called);
+
+       synced_txg = spa_last_synced_txg(data->zcd_spa);
+       if (data->zcd_txg > synced_txg)
+               fatal(0, "commit callback of txg %llu prematurely called, last"
+                   " synced txg = %llu\n", data->zcd_txg, synced_txg);
+
+       zcl = data->zcd_zcl;
+       data->zcd_called = B_TRUE;
+
+       if (error == ECANCELED) {
+               ASSERT3U(data->zcd_txg, ==, 0);
+               ASSERT(!data->zcd_added);
+
+               /*
+                * The private callback data should be destroyed here, but
+                * since we are going to check the zcd_called field after
+                * dmu_tx_abort(), we will destroy it there.
+                */
+               return;
+       }
+
+       if (!data->zcd_added)
+               goto out;
+
+       ASSERT3U(data->zcd_txg, !=, 0);
+
+       /* Remove our callback from the list */
+       (void) mutex_lock(&zcl->zcl_callbacks_lock);
+       list_remove(&zcl->zcl_callbacks, data);
+       (void) mutex_unlock(&zcl->zcl_callbacks_lock);
+
+out:
+       umem_free(data, sizeof (ztest_cb_data_t));
+}
+
+static ztest_cb_data_t *
+ztest_create_cb_data(objset_t *os, ztest_cb_list_t *zcl, uint64_t txg)
+{
+       ztest_cb_data_t *cb_data;
+
+       cb_data = umem_zalloc(sizeof (ztest_cb_data_t), UMEM_NOFAIL);
+
+       cb_data->zcd_zcl = zcl;
+       cb_data->zcd_txg = txg;
+       cb_data->zcd_spa = os->os->os_spa;
+
+       return (cb_data);
+}
+
+/*
+ * If a number of txgs equal to this threshold have been created after a commit
+ * callback has been registered but not called, then we assume there is an
+ * implementation bug.
+ */
+#define        ZTEST_COMMIT_CALLBACK_THRESH    3
+
+/*
+ * Commit callback test.
+ */
+void
+ztest_dmu_commit_callbacks(ztest_args_t *za)
+{
+       objset_t *os = za->za_os;
+       dmu_tx_t *tx;
+       ztest_cb_list_t *zcl = &ztest_shared->zs_cb_list;
+       ztest_cb_data_t *cb_data[3], *tmp_cb;
+       uint64_t old_txg, txg;
+       int i, error;
+
+       tx = dmu_tx_create(os);
+
+       cb_data[0] = ztest_create_cb_data(os, zcl, 0);
+       dmu_tx_callback_register(tx, ztest_commit_callback, cb_data[0]);
+
+       dmu_tx_hold_write(tx, ZTEST_DIROBJ, za->za_diroff, sizeof (uint64_t));
+
+       /* Every once in a while, abort the transaction on purpose */
+       if (ztest_random(100) == 0)
+               error = -1;
+
+       if (!error)
+               error = dmu_tx_assign(tx, TXG_NOWAIT);
+
+       txg = error ? 0 : dmu_tx_get_txg(tx);
+
+       cb_data[1] = ztest_create_cb_data(os, zcl, txg);
+       dmu_tx_callback_register(tx, ztest_commit_callback, cb_data[1]);
+
+       if (error) {
+               /*
+                * It's not a strict requirement to call the registered
+                * callbacks from inside dmu_tx_abort(), but that's what
+                * happens in the current implementation so we will check for
+                * that.
+                */
+               for (i = 0; i < 2; i++) {
+                       cb_data[i]->zcd_expected_err = ECANCELED;
+                       VERIFY(!cb_data[i]->zcd_called);
+               }
+
+               dmu_tx_abort(tx);
+
+               for (i = 0; i < 2; i++) {
+                       VERIFY(cb_data[i]->zcd_called);
+                       umem_free(cb_data[i], sizeof (ztest_cb_data_t));
+               }
+
+               return;
+       }
+
+       cb_data[0]->zcd_txg = txg;
+       cb_data[2] = ztest_create_cb_data(os, zcl, txg);
+       dmu_tx_callback_register(tx, ztest_commit_callback, cb_data[2]);
+
+       /*
+        * Read existing data to make sure there isn't a future leak.
+        */
+       VERIFY(0 == dmu_read(os, ZTEST_DIROBJ, za->za_diroff, sizeof (uint64_t),
+           &old_txg));
+
+       if (old_txg > txg)
+               fatal(0, "future leak: got %llx, open txg is %llx", old_txg,
+                   txg);
+
+       dmu_write(os, ZTEST_DIROBJ, za->za_diroff, sizeof (uint64_t), &txg, tx);
+
+       (void) mutex_lock(&zcl->zcl_callbacks_lock);
+
+       /*
+        * Since commit callbacks don't have any ordering requirement and since
+        * it is theoretically possible for a commit callback to be called
+        * after an arbitrary amount of time has elapsed since its txg has been
+        * synced, it is difficult to reliably determine whether a commit
+        * callback hasn't been called due to high load or due to a flawed
+        * implementation.
+        *
+        * In practice, we will assume that if after a certain number of txgs a
+        * commit callback hasn't been called, then most likely there's an
+        * implementation bug..
+        */
+       tmp_cb = list_head(&zcl->zcl_callbacks);
+       if (tmp_cb != NULL)
+               VERIFY3U(tmp_cb->zcd_txg, >,
+                   txg - ZTEST_COMMIT_CALLBACK_THRESH);
+
+       /*
+        * Let's find the place to insert our callbacks.
+        *
+        * Even though the list is ordered by txg, it is possible for the
+        * insertion point to not be the end because our txg may already be
+        * quiescing at this point and other callbacks in the open txg may
+        * have sneaked in.
+        */
+       tmp_cb = list_tail(&zcl->zcl_callbacks);
+       while (tmp_cb != NULL && tmp_cb->zcd_txg > txg)
+               tmp_cb = list_prev(&zcl->zcl_callbacks, tmp_cb);
+
+       /* Add the 3 callbacks to the list */
+       for (i = 0; i < 3; i++) {
+               if (tmp_cb == NULL)
+                       list_insert_head(&zcl->zcl_callbacks, cb_data[i]);
+               else
+                       list_insert_after(&zcl->zcl_callbacks, tmp_cb,
+                           cb_data[i]);
+
+               cb_data[i]->zcd_added = B_TRUE;
+               VERIFY(!cb_data[i]->zcd_called);
+
+               tmp_cb = cb_data[i];
+       }
+
+       (void) mutex_unlock(&zcl->zcl_callbacks_lock);
+
+       dmu_tx_commit(tx);
+}
+
 void
 ztest_dsl_prop_get_set(ztest_args_t *za)
 {
@@ -3041,6 +3253,11 @@ ztest_run(char *pool)
 
        (void) _mutex_init(&zs->zs_vdev_lock, USYNC_THREAD, NULL);
        (void) rwlock_init(&zs->zs_name_lock, USYNC_THREAD, NULL);
+       (void) _mutex_init(&zs->zs_cb_list.zcl_callbacks_lock, USYNC_THREAD,
+           NULL);
+
+       list_create(&zs->zs_cb_list.zcl_callbacks, sizeof (ztest_cb_data_t),
+           offsetof(ztest_cb_data_t, zcd_node));
 
        for (t = 0; t < ZTEST_SYNC_LOCKS; t++)
                (void) _mutex_init(&zs->zs_sync_lock[t], USYNC_THREAD, NULL);
@@ -3240,6 +3457,12 @@ ztest_run(char *pool)
        spa_close(spa, FTAG);
 
        kernel_fini();
+
+       list_destroy(&zs->zs_cb_list.zcl_callbacks);
+
+       (void) _mutex_destroy(&zs->zs_cb_list.zcl_callbacks_lock);
+       (void) rwlock_destroy(&zs->zs_name_lock);
+       (void) _mutex_destroy(&zs->zs_vdev_lock);
 }
 
 void
index bf560e5657c1c2db023e072b4f3e56418e2bd558..0af089198c4526956d1991b34557d53540e9d9d1 100644 (file)
@@ -48,6 +48,8 @@ dmu_tx_create_dd(dsl_dir_t *dd)
                tx->tx_pool = dd->dd_pool;
        list_create(&tx->tx_holds, sizeof (dmu_tx_hold_t),
            offsetof(dmu_tx_hold_t, txh_node));
+       list_create(&tx->tx_callbacks, sizeof (dmu_tx_callback_t),
+           offsetof(dmu_tx_callback_t, dcb_node));
 #ifdef ZFS_DEBUG
        refcount_create(&tx->tx_space_written);
        refcount_create(&tx->tx_space_freed);
@@ -1020,8 +1022,13 @@ dmu_tx_commit(dmu_tx_t *tx)
        if (tx->tx_tempreserve_cookie)
                dsl_dir_tempreserve_clear(tx->tx_tempreserve_cookie, tx);
 
+       if (!list_is_empty(&tx->tx_callbacks))
+               txg_register_callbacks(&tx->tx_txgh, &tx->tx_callbacks);
+
        if (tx->tx_anyobj == FALSE)
                txg_rele_to_sync(&tx->tx_txgh);
+
+       list_destroy(&tx->tx_callbacks);
        list_destroy(&tx->tx_holds);
 #ifdef ZFS_DEBUG
        dprintf("towrite=%llu written=%llu tofree=%llu freed=%llu\n",
@@ -1050,6 +1057,14 @@ dmu_tx_abort(dmu_tx_t *tx)
                if (dn != NULL)
                        dnode_rele(dn, tx);
        }
+
+       /*
+        * Call any registered callbacks with an error code.
+        */
+       if (!list_is_empty(&tx->tx_callbacks))
+               dmu_tx_callback(&tx->tx_callbacks, ECANCELED);
+
+       list_destroy(&tx->tx_callbacks);
        list_destroy(&tx->tx_holds);
 #ifdef ZFS_DEBUG
        refcount_destroy_many(&tx->tx_space_written,
@@ -1066,3 +1081,31 @@ dmu_tx_get_txg(dmu_tx_t *tx)
        ASSERT(tx->tx_txg != 0);
        return (tx->tx_txg);
 }
+
+void
+dmu_tx_callback_register(dmu_tx_t *tx, dmu_tx_callback_func_t *func, void *data)
+{
+       dmu_tx_callback_t *dcb;
+
+       dcb = kmem_alloc(sizeof (dmu_tx_callback_t), KM_SLEEP);
+
+       dcb->dcb_func = func;
+       dcb->dcb_data = data;
+
+       list_insert_tail(&tx->tx_callbacks, dcb);
+}
+
+/*
+ * Call all the commit callbacks on a list, with a given error code.
+ */
+void
+dmu_tx_callback(list_t *cb_list, int error)
+{
+       dmu_tx_callback_t *dcb;
+
+       while (dcb = list_head(cb_list)) {
+               list_remove(cb_list, dcb);
+               dcb->dcb_func(dcb->dcb_data, error);
+               kmem_free(dcb, sizeof (dmu_tx_callback_t));
+       }
+}
index 3b1e5c8fbc1fda3be2800e557ee61acc1e02b534..392431a18ecca3bd6cd0a6602df33fad63879279 100644 (file)
@@ -429,6 +429,26 @@ int dmu_tx_assign(dmu_tx_t *tx, uint64_t txg_how);
 void dmu_tx_wait(dmu_tx_t *tx);
 void dmu_tx_commit(dmu_tx_t *tx);
 
+/*
+ * To register a commit callback, dmu_tx_callback_register() must be called.
+ *
+ * dcb_data is a pointer to caller private data that is passed on as a
+ * callback parameter. The caller is responsible for properly allocating and
+ * freeing it.
+ *
+ * When registering a callback, the transaction must be already created, but
+ * it cannot be committed or aborted. It can be assigned to a txg or not.
+ *
+ * The callback will be called after the transaction has been safely written
+ * to stable storage and will also be called if the dmu_tx is aborted.
+ * If there is any error which prevents the transaction from being committed to
+ * disk, the callback will be called with a value of error != 0.
+ */
+typedef void dmu_tx_callback_func_t(void *dcb_data, int error);
+
+void dmu_tx_callback_register(dmu_tx_t *tx, dmu_tx_callback_func_t *dcb_func,
+    void *dcb_data);
+
 /*
  * Free up the data blocks for a defined range of a file.  If size is
  * zero, the range from offset to end-of-file is freed.
index 2727daaaa76b1ce294b84b7bee61ae94e36b3c3b..7fcab936f0f12c1b37f6aee2b4720089c2765cc6 100644 (file)
@@ -26,8 +26,6 @@
 #ifndef        _SYS_DMU_TX_H
 #define        _SYS_DMU_TX_H
 
-#pragma ident  "%Z%%M% %I%     %E% SMI"
-
 #include <sys/inttypes.h>
 #include <sys/dmu.h>
 #include <sys/txg.h>
@@ -59,6 +57,7 @@ struct dmu_tx {
        txg_handle_t tx_txgh;
        void *tx_tempreserve_cookie;
        struct dmu_tx_hold *tx_needassign_txh;
+       list_t tx_callbacks; /* list of dmu_tx_callback_t on this dmu_tx */
        uint8_t tx_anyobj;
        int tx_err;
 #ifdef ZFS_DEBUG
@@ -98,6 +97,11 @@ typedef struct dmu_tx_hold {
 #endif
 } dmu_tx_hold_t;
 
+typedef struct dmu_tx_callback {
+       list_node_t             dcb_node;    /* linked to tx_callbacks list */
+       dmu_tx_callback_func_t  *dcb_func;   /* caller function pointer */
+       void                    *dcb_data;   /* caller private data */
+} dmu_tx_callback_t;
 
 /*
  * These routines are defined in dmu.h, and are called by the user.
@@ -109,6 +113,10 @@ void dmu_tx_abort(dmu_tx_t *tx);
 uint64_t dmu_tx_get_txg(dmu_tx_t *tx);
 void dmu_tx_wait(dmu_tx_t *tx);
 
+void dmu_tx_callback_register(dmu_tx_t *tx, dmu_tx_callback_func_t *dcb_func,
+    void *dcb_data);
+void dmu_tx_callback(list_t *cb_list, int error);
+
 /*
  * These routines are defined in dmu_spa.h, and are called by the SPA.
  */
index 23bdff211b4a407ab18f04b0a3a443504188dc18..e679898dbc69807a78b7734f5999abf4fe264b5a 100644 (file)
@@ -26,8 +26,6 @@
 #ifndef _SYS_TXG_H
 #define        _SYS_TXG_H
 
-#pragma ident  "%Z%%M% %I%     %E% SMI"
-
 #include <sys/spa.h>
 #include <sys/zfs_context.h>
 
@@ -71,6 +69,7 @@ extern void txg_sync_stop(struct dsl_pool *dp);
 extern uint64_t txg_hold_open(struct dsl_pool *dp, txg_handle_t *txghp);
 extern void txg_rele_to_quiesce(txg_handle_t *txghp);
 extern void txg_rele_to_sync(txg_handle_t *txghp);
+extern void txg_register_callbacks(txg_handle_t *txghp, list_t *tx_callbacks);
 extern void txg_suspend(struct dsl_pool *dp);
 extern void txg_resume(struct dsl_pool *dp);
 
index 7413c662b35559a4f9fe70c1a86ffe949541a3d2..bc7d7c7e514c6ecc89142ea3befa24c2d4c2cee3 100644 (file)
 extern "C" {
 #endif
 
+typedef struct tx_cb {
+       tx_cpu_t        *tcb_tc;
+       uint64_t        tcb_txg;
+} tx_cb_t;
+
 struct tx_cpu {
        kmutex_t        tc_lock;
        kcondvar_t      tc_cv[TXG_SIZE];
        uint64_t        tc_count[TXG_SIZE];
+       list_t          tc_callbacks[TXG_SIZE]; /* commit cb list */
        char            tc_pad[16];
 };
 
@@ -64,6 +70,8 @@ typedef struct tx_state {
 
        kthread_t       *tx_sync_thread;
        kthread_t       *tx_quiesce_thread;
+
+       taskq_t         *tx_commit_cb_taskq; /* commit callback taskq */
 } tx_state_t;
 
 #ifdef __cplusplus
index e3c0e2a134239eb05cd9f4c98c0574f49dfcc670..b5fcc8c4a858a67c92cd112b8b7239120b992e1a 100644 (file)
@@ -26,6 +26,7 @@
 #include <sys/zfs_context.h>
 #include <sys/txg_impl.h>
 #include <sys/dmu_impl.h>
+#include <sys/dmu_tx.h>
 #include <sys/dsl_pool.h>
 #include <sys/callb.h>
 
@@ -57,6 +58,9 @@ txg_init(dsl_pool_t *dp, uint64_t txg)
                for (i = 0; i < TXG_SIZE; i++) {
                        cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
                            NULL);
+                       list_create(&tx->tx_cpu[c].tc_callbacks[i],
+                           sizeof (dmu_tx_callback_t),
+                           offsetof(dmu_tx_callback_t, dcb_node));
                }
        }
 
@@ -96,10 +100,15 @@ txg_fini(dsl_pool_t *dp)
                int i;
 
                mutex_destroy(&tx->tx_cpu[c].tc_lock);
-               for (i = 0; i < TXG_SIZE; i++)
+               for (i = 0; i < TXG_SIZE; i++) {
                        cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
+                       list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
+               }
        }
 
+       if (tx->tx_commit_cb_taskq != NULL)
+               taskq_destroy(tx->tx_commit_cb_taskq);
+
        kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
 
        bzero(tx, sizeof (tx_state_t));
@@ -229,25 +238,55 @@ txg_rele_to_quiesce(txg_handle_t *th)
 }
 
 void
-txg_rele_to_sync(txg_handle_t *th)
+txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
 {
        tx_cpu_t *tc = th->th_cpu;
        int g = th->th_txg & TXG_MASK;
 
+       mutex_enter(&tc->tc_lock);
+       list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
+       mutex_exit(&tc->tc_lock);
+}
+
+static void
+txg_exit(tx_cpu_t *tc, uint64_t txg)
+{
+       int g = txg & TXG_MASK;
+
        mutex_enter(&tc->tc_lock);
        ASSERT(tc->tc_count[g] != 0);
        if (--tc->tc_count[g] == 0)
                cv_broadcast(&tc->tc_cv[g]);
        mutex_exit(&tc->tc_lock);
+}
+
+void
+txg_rele_to_sync(txg_handle_t *th)
+{
+       txg_exit(th->th_cpu, th->th_txg);
 
        th->th_cpu = NULL;      /* defensive */
 }
 
+static void
+txg_wait_exit(tx_state_t *tx, uint64_t txg)
+{
+       int g = txg & TXG_MASK;
+       int c;
+
+       for (c = 0; c < max_ncpus; c++) {
+               tx_cpu_t *tc = &tx->tx_cpu[c];
+               mutex_enter(&tc->tc_lock);
+               while (tc->tc_count[g] != 0)
+                       cv_wait(&tc->tc_cv[g], &tc->tc_lock);
+               mutex_exit(&tc->tc_lock);
+       }
+}
+
 static void
 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
 {
        tx_state_t *tx = &dp->dp_tx;
-       int g = txg & TXG_MASK;
        int c;
 
        /*
@@ -269,12 +308,60 @@ txg_quiesce(dsl_pool_t *dp, uint64_t txg)
        /*
         * Quiesce the transaction group by waiting for everyone to txg_exit().
         */
+       txg_wait_exit(tx, txg);
+}
+
+static void
+txg_callback(tx_cb_t *tcb)
+{
+       tx_cpu_t *tc = tcb->tcb_tc;
+       int g = tcb->tcb_txg & TXG_MASK;
+
+       dmu_tx_callback(&tc->tc_callbacks[g], 0);
+
+       txg_exit(tc, tcb->tcb_txg);
+
+       kmem_free(tcb, sizeof (tx_cb_t));
+}
+
+/*
+ * Dispatch the commit callbacks registered on this txg to worker threads.
+ */
+static void
+txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
+{
+       int c;
+       tx_state_t *tx = &dp->dp_tx;
+       tx_cb_t *tcb;
+
        for (c = 0; c < max_ncpus; c++) {
                tx_cpu_t *tc = &tx->tx_cpu[c];
-               mutex_enter(&tc->tc_lock);
-               while (tc->tc_count[g] != 0)
-                       cv_wait(&tc->tc_cv[g], &tc->tc_lock);
-               mutex_exit(&tc->tc_lock);
+               /* No need to lock tx_cpu_t at this point */
+
+               int g = txg & TXG_MASK;
+
+               if (list_is_empty(&tc->tc_callbacks[g]))
+                       continue;
+
+               if (tx->tx_commit_cb_taskq == NULL) {
+                       /*
+                        * Commit callback taskq hasn't been created yet.
+                        */
+                       tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
+                           max_ncpus, minclsyspri, max_ncpus, max_ncpus * 4,
+                           TASKQ_PREPOPULATE);
+               }
+
+               tcb = kmem_alloc(sizeof (tx_cb_t), KM_SLEEP);
+               tcb->tcb_txg = txg;
+               tcb->tcb_tc = tc;
+
+               /* There shouldn't be any holders on this txg at this point */
+               ASSERT3U(tc->tc_count[g], ==, 0);
+               tc->tc_count[g]++;
+
+               (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
+                   txg_callback, tcb, TQ_SLEEP);
        }
 }
 
@@ -345,6 +432,13 @@ txg_sync_thread(dsl_pool_t *dp)
                spa_sync(dp->dp_spa, txg);
                delta = lbolt - start;
 
+               /*
+                * Dispatch commit callbacks to worker threads and wait for
+                * them to finish.
+                */
+               txg_dispatch_callbacks(dp, txg);
+               txg_wait_exit(tx, txg);
+
                mutex_enter(&tx->tx_sync_lock);
                rw_enter(&tx->tx_suspend, RW_WRITER);
                tx->tx_synced_txg = txg;