]> granicus.if.org Git - zfs/commitdiff
Provide correct kthread API semantics using pthreads.
authorBrian Behlendorf <behlendorf1@llnl.gov>
Tue, 14 Jul 2009 21:16:26 +0000 (14:16 -0700)
committerBrian Behlendorf <behlendorf1@llnl.gov>
Tue, 14 Jul 2009 21:16:26 +0000 (14:16 -0700)
The intent here is to fully remove the previous Solaris thread
implementation so we don't need to simulate both Solaris kernel
and user space thread APIs.  The few user space consumers of the
thread API have been updated to use the kthread API.  In order
to support this we needed to more fully support the kthread API
and that means not doing crazy things like casting a thread id
to a pointer and using that as was done before.  This first
implementation is not effecient but it does provide all the
corrent semantics.  If/when performance becomes and issue we
can and should just natively adopt pthreads which is portable.

Let me finish by saying I'm not proud of any of this and I would
love to see it improved.  However, this slow implementation does
at least provide all the correct kthread API semantics whereas
the previous method of casting the thread ID to a pointer was
dodgy at best.

cmd/ztest/ztest.c
lib/libzpool/include/sys/zfs_context.h
lib/libzpool/kernel.c
lib/libzpool/taskq.c
module/zfs/txg.c

index facb2354a8f1e7b96804c3b60b49127ebb861cfa..e171e91deda6a147b461cc2e4b9cac48c8c6f404 100644 (file)
@@ -141,6 +141,7 @@ typedef struct ztest_args {
        objset_t        *za_os;
        zilog_t         *za_zilog;
        kthread_t       *za_thread;
+       kt_did_t        za_threadid;
        uint64_t        za_instance;
        uint64_t        za_random;
        uint64_t        za_diroff;
@@ -3806,6 +3807,8 @@ ztest_resume_thread(void *arg)
                (void) poll(NULL, 0, 1000);
                ztest_resume(spa);
        }
+
+       thread_exit();
        return (NULL);
 }
 
@@ -3870,6 +3873,7 @@ ztest_thread(void *arg)
                        break;
        }
 
+       thread_exit();
        return (NULL);
 }
 
@@ -3885,6 +3889,7 @@ ztest_run(char *pool)
        spa_t *spa;
        char name[100];
        kthread_t *resume_thread;
+       kt_did_t resume_id;
 
        ztest_exiting = B_FALSE;
 
@@ -3963,8 +3968,9 @@ ztest_run(char *pool)
        /*
         * Create a thread to periodically resume suspended I/O.
         */
-       resume_thread = thread_create(NULL, 0, ztest_resume_thread, spa,
-                                     THR_BOUND, NULL, 0, 0);
+       VERIFY3P((resume_thread = thread_create(NULL, 0, ztest_resume_thread,
+                spa, THR_BOUND, NULL, 0, 0)), !=, NULL);
+       resume_id = resume_thread->t_tid;
 
        /*
         * Verify that we can safely inquire about about any object,
@@ -4040,12 +4046,13 @@ ztest_run(char *pool)
                        za[d].za_zilog = zil_open(za[d].za_os, NULL);
                }
 
-               za[t].za_thread = thread_create(NULL, 0, ztest_thread, &za[t],
-                                               THR_BOUND, NULL, 0, 0);
+               VERIFY3P((za[t].za_thread = thread_create(NULL, 0, ztest_thread,
+                        &za[t], THR_BOUND, NULL, 0, 0)), !=, NULL);
+               za[t].za_threadid = za[t].za_thread->t_tid;
        }
 
        while (--t >= 0) {
-               VERIFY(thr_join(za[t].za_thread, NULL, NULL) == 0);
+               VERIFY(thread_join(za[t].za_threadid, NULL, NULL) == 0);
                if (t < zopt_datasets) {
                        zil_close(za[t].za_zilog);
                        dmu_objset_close(za[t].za_os);
@@ -4084,7 +4091,7 @@ ztest_run(char *pool)
 
        /* Kill the resume thread */
        ztest_exiting = B_TRUE;
-       VERIFY(thr_join(resume_thread, NULL, NULL) == 0);
+       VERIFY(thread_join(resume_id, NULL, NULL) == 0);
        ztest_resume(spa);
 
        /*
index 15f7665cdde781631abc3b42a5653fd154795802..cad7553cc8a66c04ffd0eba9570cc0a748d4d748 100644 (file)
@@ -192,24 +192,34 @@ _NOTE(CONSTCOND) } while (0)
 /*
  * Threads
  */
-#define        tsd_get(key)            pthread_getspecific(key)
-#define        tsd_set(key, val)       pthread_setspecific(key, val)
+#define THR_BOUND              0x00000001
+#define TS_RUN                 0x00000002
 
 typedef void (*thread_func_t)(void *);
+typedef pthread_t kt_did_t;
+
 typedef struct kthread {
         list_node_t    t_node;
-       pthread_t       t_id;
+       kt_did_t        t_tid;
        pthread_attr_t  t_attr;
 } kthread_t;
 
-#define        curthread               zk_curthread()
-#define thread_create(stk, stksize, func, arg, len, pp, state, pri) \
-                               zk_thread_create((thread_func_t)func, arg)
-#define thr_join(kt, v1, v2)   pthread_join(kt->t_id, v2)
-
-extern kthread_t *zk_curthread(void);
-extern kthread_t *zk_thread_create(thread_func_t func, void *arg);
-extern void thread_exit(void);
+#define        tsd_get(key)            pthread_getspecific(key)
+#define        tsd_set(key, val)       pthread_setspecific(key, val)
+#define        curthread               zk_thread_current()
+#define thread_exit            zk_thread_exit
+#define thread_create(stk, stksize, func, arg, len, pp, state, pri)    \
+       zk_thread_create(stk, stksize, (thread_func_t)func, arg,        \
+                        len, NULL, state, pri)
+#define thread_join(tid, dtid, status)                                 \
+       zk_thread_join(tid, dtid, status)
+
+extern kthread_t *zk_thread_current(void);
+extern void zk_thread_exit(void);
+extern kthread_t *zk_thread_create(caddr_t stk, size_t  stksize,
+       thread_func_t func, void *arg, size_t len,
+       void *pp, int state, pri_t pri);
+extern int zk_thread_join(kt_did_t tid, kthread_t *dtid, void **status);
 
 #define        issig(why)      (FALSE)
 #define        ISSIG(thr, why) (FALSE)
@@ -218,6 +228,8 @@ extern void thread_exit(void);
  * Mutexes
  */
 #define MTX_MAGIC 0x9522f51362a6e326ull
+#define MTX_INIT  (void *)NULL
+#define MTX_DEST  (void *)-1UL
 typedef struct kmutex {
        void            *m_owner;
        uint64_t        m_magic;
@@ -238,6 +250,8 @@ extern void *mutex_owner(kmutex_t *mp);
  * RW locks
  */
 #define RW_MAGIC 0x4d31fb123648e78aull
+#define RW_INIT  (void *)NULL
+#define RW_DEST  (void *)-1UL
 typedef struct krwlock {
        void                    *rw_owner;
        void                    *rw_wr_owner;
@@ -339,6 +353,7 @@ extern void taskq_destroy(taskq_t *);
 extern void    taskq_wait(taskq_t *);
 extern int     taskq_member(taskq_t *, void *);
 extern void    system_taskq_init(void);
+extern void    system_taskq_fini(void);
 
 #define        XVA_MAPSIZE     3
 #define        XVA_MAGIC       0x78766174
index 161f3d5bac40f66e150f9553136c5cf1771055bd..19776bb8ab99021d335b10893bc7fc10806882a7 100644 (file)
@@ -30,6 +30,7 @@
 #include <stdlib.h>
 #include <string.h>
 #include <zlib.h>
+#include <sys/signal.h>
 #include <sys/spa.h>
 #include <sys/stat.h>
 #include <sys/processor.h>
@@ -57,64 +58,157 @@ struct utsname utsname = {
  * =========================================================================
  */
 
-kmutex_t kthread_lock;
+/* NOTE: Tracking each tid on a list and using it for curthread lookups
+ *       is slow at best but it provides an easy way to provide a kthread
+ *       style API on top of pthreads.  For now we just want ztest to work
+ *       to validate correctness.  Performance is not much of an issue
+ *       since that is what the in-kernel version is for.  That said
+ *       reworking this to track the kthread_t structure as thread
+ *       specific data would be probably the best way to speed this up.
+ */
+
+pthread_cond_t kthread_cond = PTHREAD_COND_INITIALIZER;
+pthread_mutex_t kthread_lock = PTHREAD_MUTEX_INITIALIZER;
 list_t kthread_list;
 
-kthread_t *
-zk_curthread(void)
+static int
+thread_count(void)
 {
        kthread_t *kt;
-       pthread_t tid;
+       int count = 0;
+
+       for (kt = list_head(&kthread_list); kt != NULL;
+            kt = list_next(&kthread_list, kt))
+               count++;
+
+       return count;
+}
+
+static void
+thread_init(void)
+{
+       kthread_t *kt;
+
+       /* Initialize list for tracking kthreads */
+       list_create(&kthread_list, sizeof (kthread_t),
+                   offsetof(kthread_t, t_node));
 
-       tid = pthread_self();
-       mutex_enter(&kthread_lock);
-        for (kt = list_head(&kthread_list); kt != NULL;
-            kt = list_next(&kthread_list, kt)) {
+       /* Create entry for primary kthread */
+       kt = umem_zalloc(sizeof(kthread_t), UMEM_NOFAIL);
+       list_link_init(&kt->t_node);
+       VERIFY3U(kt->t_tid = pthread_self(), !=, 0);
+        VERIFY3S(pthread_attr_init(&kt->t_attr), ==, 0);
+       VERIFY3S(pthread_mutex_lock(&kthread_lock), ==, 0);
+       list_insert_head(&kthread_list, kt);
+       VERIFY3S(pthread_mutex_unlock(&kthread_lock), ==, 0);
+}
+
+static void
+thread_fini(void)
+{
+       kthread_t *kt;
+       struct timespec ts = { 0 };
+       int count;
+
+       /* Wait for all threads to exit via thread_exit() */
+       VERIFY3S(pthread_mutex_lock(&kthread_lock), ==, 0);
+       while ((count = thread_count()) > 1) {
+               printf("Waiting for %d\n", count);
+               clock_gettime(CLOCK_REALTIME, &ts);
+               ts.tv_sec += 1;
+               pthread_cond_timedwait(&kthread_cond, &kthread_lock, &ts);
+       }
+
+       ASSERT3S(thread_count(), ==, 1);
+       kt = list_head(&kthread_list);
+       list_remove(&kthread_list, kt);
+       VERIFY3S(pthread_mutex_unlock(&kthread_lock), ==, 0);
+
+       VERIFY(pthread_attr_destroy(&kt->t_attr) == 0);
+       umem_free(kt, sizeof(kthread_t));
 
-               if (kt->t_id == tid) {
-                       mutex_exit(&kthread_lock);
-                       return kt;
+       /* Cleanup list for tracking kthreads */
+       list_destroy(&kthread_list);
+}
+
+kthread_t *
+zk_thread_current(void)
+{
+       kt_did_t tid = pthread_self();
+       kthread_t *kt;
+       int count = 1;
+
+       /*
+        * Because a newly created thread may call zk_thread_current()
+        * before the thread parent has had time to add the thread's tid
+        * to our lookup list.  We will loop as long as there are tid
+        * which have not yet been set which must be one of ours.
+        * Yes it's a hack, at some point we can just use native pthreads.
+        */
+       while (count > 0) {
+               count = 0;
+               VERIFY3S(pthread_mutex_lock(&kthread_lock), ==, 0);
+               for (kt = list_head(&kthread_list); kt != NULL;
+                    kt = list_next(&kthread_list, kt)) {
+
+                       if (kt->t_tid == tid) {
+                               VERIFY3S(pthread_mutex_unlock(
+                                        &kthread_lock), ==, 0);
+                               return kt;
+                       }
+
+                       if (kt->t_tid == (kt_did_t)-1)
+                               count++;
                }
+               VERIFY3S(pthread_mutex_unlock(&kthread_lock), ==, 0);
        }
-       mutex_exit(&kthread_lock);
 
+       /* Unreachable */
+       ASSERT(0);
        return NULL;
 }
 
 kthread_t *
-zk_thread_create(thread_func_t func, void *arg)
+zk_thread_create(caddr_t stk, size_t  stksize, thread_func_t func, void *arg,
+             size_t len, void *pp, int state, pri_t pri)
 {
        kthread_t *kt;
 
        kt = umem_zalloc(sizeof(kthread_t), UMEM_NOFAIL);
-
+       kt->t_tid = (kt_did_t)-1;
+       list_link_init(&kt->t_node);
        VERIFY(pthread_attr_init(&kt->t_attr) == 0);
-       VERIFY(pthread_attr_setdetachstate(&kt->t_attr,
-                                          PTHREAD_CREATE_DETACHED) == 0);
-       VERIFY(pthread_create(&kt->t_id, &kt->t_attr,
-                             (void *(*)(void *))func, arg) == 0);
 
-       mutex_enter(&kthread_lock);
+       VERIFY3S(pthread_mutex_lock(&kthread_lock), ==, 0);
        list_insert_head(&kthread_list, kt);
-       mutex_exit(&kthread_lock);
+       VERIFY3S(pthread_mutex_unlock(&kthread_lock), ==, 0);
+
+       VERIFY(pthread_create(&kt->t_tid, &kt->t_attr,
+                             (void *(*)(void *))func, arg) == 0);
 
        return kt;
 }
 
+int
+zk_thread_join(kt_did_t tid, kthread_t *dtid, void **status)
+{
+       return pthread_join(tid, status);
+}
+
 void
-thread_exit(void)
+zk_thread_exit(void)
 {
        kthread_t *kt;
 
-       VERIFY((kt = curthread) != NULL);
-
-       mutex_enter(&kthread_lock);
+       VERIFY3P(kt = curthread, !=, NULL);
+       VERIFY3S(pthread_mutex_lock(&kthread_lock), ==, 0);
        list_remove(&kthread_list, kt);
-       mutex_exit(&kthread_lock);
+       VERIFY3S(pthread_mutex_unlock(&kthread_lock), ==, 0);
 
        VERIFY(pthread_attr_destroy(&kt->t_attr) == 0);
        umem_free(kt, sizeof(kthread_t));
 
+       pthread_cond_broadcast(&kthread_cond);
        pthread_exit(NULL);
 }
 
@@ -149,14 +243,14 @@ kstat_delete(kstat_t *ksp)
 void
 mutex_init(kmutex_t *mp, char *name, int type, void *cookie)
 {
-       ASSERT(type == MUTEX_DEFAULT);
-       ASSERT(cookie == NULL);
+       ASSERT3S(type, ==, MUTEX_DEFAULT);
+       ASSERT3P(cookie, ==, NULL);
 
 #ifdef IM_FEELING_LUCKY
-       ASSERT(mp->m_magic != MTX_MAGIC);
+       ASSERT3U(mp->m_magic, !=, MTX_MAGIC);
 #endif
 
-       mp->m_owner = NULL;
+       mp->m_owner = MTX_INIT;
        mp->m_magic = MTX_MAGIC;
        VERIFY3S(pthread_mutex_init(&mp->m_lock, NULL), ==, 0);
 }
@@ -164,31 +258,31 @@ mutex_init(kmutex_t *mp, char *name, int type, void *cookie)
 void
 mutex_destroy(kmutex_t *mp)
 {
-       ASSERT(mp->m_magic == MTX_MAGIC);
-       ASSERT(mp->m_owner == NULL);
+       ASSERT3U(mp->m_magic, ==, MTX_MAGIC);
+       ASSERT3P(mp->m_owner, ==, MTX_INIT);
        VERIFY3S(pthread_mutex_destroy(&(mp)->m_lock), ==, 0);
-       mp->m_owner = (void *)-1UL;
+       mp->m_owner = MTX_DEST;
        mp->m_magic = 0;
 }
 
 void
 mutex_enter(kmutex_t *mp)
 {
-       ASSERT(mp->m_magic == MTX_MAGIC);
-       ASSERT(mp->m_owner != (void *)-1UL);
-       ASSERT(mp->m_owner != curthread);
+       ASSERT3U(mp->m_magic, ==, MTX_MAGIC);
+       ASSERT3P(mp->m_owner, !=, MTX_DEST);
+       ASSERT3P(mp->m_owner, !=, curthread);
        VERIFY3S(pthread_mutex_lock(&mp->m_lock), ==, 0);
-       ASSERT(mp->m_owner == NULL);
+       ASSERT3P(mp->m_owner, ==, MTX_INIT);
        mp->m_owner = curthread;
 }
 
 int
 mutex_tryenter(kmutex_t *mp)
 {
-       ASSERT(mp->m_magic == MTX_MAGIC);
-       ASSERT(mp->m_owner != (void *)-1UL);
+       ASSERT3U(mp->m_magic, ==, MTX_MAGIC);
+       ASSERT3P(mp->m_owner, !=, MTX_DEST);
        if (0 == pthread_mutex_trylock(&mp->m_lock)) {
-               ASSERT(mp->m_owner == NULL);
+               ASSERT3P(mp->m_owner, ==, MTX_INIT);
                mp->m_owner = curthread;
                return (1);
        } else {
@@ -199,16 +293,16 @@ mutex_tryenter(kmutex_t *mp)
 void
 mutex_exit(kmutex_t *mp)
 {
-       ASSERT(mp->m_magic == MTX_MAGIC);
-       ASSERT(mutex_owner(mp) == curthread);
-       mp->m_owner = NULL;
+       ASSERT3U(mp->m_magic, ==, MTX_MAGIC);
+       ASSERT3P(mutex_owner(mp), ==, curthread);
+       mp->m_owner = MTX_INIT;
        VERIFY3S(pthread_mutex_unlock(&mp->m_lock), ==, 0);
 }
 
 void *
 mutex_owner(kmutex_t *mp)
 {
-       ASSERT(mp->m_magic == MTX_MAGIC);
+       ASSERT3U(mp->m_magic, ==, MTX_MAGIC);
        return (mp->m_owner);
 }
 
@@ -221,16 +315,16 @@ mutex_owner(kmutex_t *mp)
 void
 rw_init(krwlock_t *rwlp, char *name, int type, void *arg)
 {
-       ASSERT(type == RW_DEFAULT);
-       ASSERT(arg == NULL);
+       ASSERT3S(type, ==, RW_DEFAULT);
+       ASSERT3P(arg, ==, NULL);
 
 #ifdef IM_FEELING_LUCKY
-       ASSERT(rwlp->rw_magic != RW_MAGIC);
+       ASSERT3U(rwlp->rw_magic, !=, RW_MAGIC);
 #endif
 
        VERIFY3S(pthread_rwlock_init(&rwlp->rw_lock, NULL), ==, 0);
-       rwlp->rw_owner = NULL;
-       rwlp->rw_wr_owner = NULL;
+       rwlp->rw_owner = RW_INIT;
+       rwlp->rw_wr_owner = RW_INIT;
        rwlp->rw_readers = 0;
        rwlp->rw_magic = RW_MAGIC;
 }
@@ -238,7 +332,7 @@ rw_init(krwlock_t *rwlp, char *name, int type, void *arg)
 void
 rw_destroy(krwlock_t *rwlp)
 {
-       ASSERT(rwlp->rw_magic == RW_MAGIC);
+       ASSERT3U(rwlp->rw_magic, ==, RW_MAGIC);
 
        VERIFY3S(pthread_rwlock_destroy(&rwlp->rw_lock), ==, 0);
        rwlp->rw_magic = 0;
@@ -247,18 +341,18 @@ rw_destroy(krwlock_t *rwlp)
 void
 rw_enter(krwlock_t *rwlp, krw_t rw)
 {
-       ASSERT(rwlp->rw_magic == RW_MAGIC);
-       ASSERT(rwlp->rw_owner != curthread);
-       ASSERT(rwlp->rw_wr_owner != curthread);
+       ASSERT3U(rwlp->rw_magic, ==, RW_MAGIC);
+       ASSERT3P(rwlp->rw_owner, !=, curthread);
+       ASSERT3P(rwlp->rw_wr_owner, !=, curthread);
 
        if (rw == RW_READER) {
                VERIFY3S(pthread_rwlock_rdlock(&rwlp->rw_lock), ==, 0);
-               ASSERT(rwlp->rw_wr_owner == NULL);
+               ASSERT3P(rwlp->rw_wr_owner, ==, RW_INIT);
 
                atomic_inc_uint(&rwlp->rw_readers);
        } else {
                VERIFY3S(pthread_rwlock_wrlock(&rwlp->rw_lock), ==, 0);
-               ASSERT(rwlp->rw_wr_owner == NULL);
+               ASSERT3P(rwlp->rw_wr_owner, ==, RW_INIT);
                ASSERT3U(rwlp->rw_readers, ==, 0);
 
                rwlp->rw_wr_owner = curthread;
@@ -270,15 +364,15 @@ rw_enter(krwlock_t *rwlp, krw_t rw)
 void
 rw_exit(krwlock_t *rwlp)
 {
-       ASSERT(rwlp->rw_magic == RW_MAGIC);
+       ASSERT3U(rwlp->rw_magic, ==, RW_MAGIC);
        ASSERT(RW_LOCK_HELD(rwlp));
 
        if (RW_READ_HELD(rwlp))
                atomic_dec_uint(&rwlp->rw_readers);
        else
-               rwlp->rw_wr_owner = NULL;
+               rwlp->rw_wr_owner = RW_INIT;
 
-       rwlp->rw_owner = NULL;
+       rwlp->rw_owner = RW_INIT;
        VERIFY3S(pthread_rwlock_unlock(&rwlp->rw_lock), ==, 0);
 }
 
@@ -287,7 +381,7 @@ rw_tryenter(krwlock_t *rwlp, krw_t rw)
 {
        int rv;
 
-       ASSERT(rwlp->rw_magic == RW_MAGIC);
+       ASSERT3U(rwlp->rw_magic, ==, RW_MAGIC);
 
        if (rw == RW_READER)
                rv = pthread_rwlock_tryrdlock(&rwlp->rw_lock);
@@ -295,7 +389,7 @@ rw_tryenter(krwlock_t *rwlp, krw_t rw)
                rv = pthread_rwlock_trywrlock(&rwlp->rw_lock);
 
        if (rv == 0) {
-               ASSERT(rwlp->rw_wr_owner == NULL);
+               ASSERT3P(rwlp->rw_wr_owner, ==, RW_INIT);
 
                if (rw == RW_READER)
                        atomic_inc_uint(&rwlp->rw_readers);
@@ -317,7 +411,7 @@ rw_tryenter(krwlock_t *rwlp, krw_t rw)
 int
 rw_tryupgrade(krwlock_t *rwlp)
 {
-       ASSERT(rwlp->rw_magic == RW_MAGIC);
+       ASSERT3U(rwlp->rw_magic, ==, RW_MAGIC);
 
        return (0);
 }
@@ -331,10 +425,10 @@ rw_tryupgrade(krwlock_t *rwlp)
 void
 cv_init(kcondvar_t *cv, char *name, int type, void *arg)
 {
-       ASSERT(type == CV_DEFAULT);
+       ASSERT3S(type, ==, CV_DEFAULT);
 
 #ifdef IM_FEELING_LUCKY
-       ASSERT(cv->cv_magic != CV_MAGIC);
+       ASSERT3U(cv->cv_magic, !=, CV_MAGIC);
 #endif
 
        cv->cv_magic = CV_MAGIC;
@@ -345,7 +439,7 @@ cv_init(kcondvar_t *cv, char *name, int type, void *arg)
 void
 cv_destroy(kcondvar_t *cv)
 {
-       ASSERT(cv->cv_magic == CV_MAGIC);
+       ASSERT3U(cv->cv_magic, ==, CV_MAGIC);
        VERIFY3S(pthread_cond_destroy(&cv->cv), ==, 0);
        cv->cv_magic = 0;
 }
@@ -353,9 +447,9 @@ cv_destroy(kcondvar_t *cv)
 void
 cv_wait(kcondvar_t *cv, kmutex_t *mp)
 {
-       ASSERT(cv->cv_magic == CV_MAGIC);
-       ASSERT(mutex_owner(mp) == curthread);
-       mp->m_owner = NULL;
+       ASSERT3U(cv->cv_magic, ==, CV_MAGIC);
+       ASSERT3P(mutex_owner(mp), ==, curthread);
+       mp->m_owner = MTX_INIT;
        int ret = pthread_cond_wait(&cv->cv, &mp->m_lock);
        if (ret != 0)
                VERIFY3S(ret, ==, EINTR);
@@ -370,7 +464,7 @@ cv_timedwait(kcondvar_t *cv, kmutex_t *mp, clock_t abstime)
        timestruc_t ts;
        clock_t delta;
 
-       ASSERT(cv->cv_magic == CV_MAGIC);
+       ASSERT3U(cv->cv_magic, ==, CV_MAGIC);
 
 top:
        delta = abstime - lbolt;
@@ -386,8 +480,8 @@ top:
                ts.tv_nsec -= NANOSEC;
        }
 
-       ASSERT(mutex_owner(mp) == curthread);
-       mp->m_owner = NULL;
+       ASSERT3P(mutex_owner(mp), ==, curthread);
+       mp->m_owner = MTX_INIT;
        error = pthread_cond_timedwait(&cv->cv, &mp->m_lock, &ts);
        mp->m_owner = curthread;
 
@@ -405,14 +499,14 @@ top:
 void
 cv_signal(kcondvar_t *cv)
 {
-       ASSERT(cv->cv_magic == CV_MAGIC);
+       ASSERT3U(cv->cv_magic, ==, CV_MAGIC);
        VERIFY3S(pthread_cond_signal(&cv->cv), ==, 0);
 }
 
 void
 cv_broadcast(kcondvar_t *cv)
 {
-       ASSERT(cv->cv_magic == CV_MAGIC);
+       ASSERT3U(cv->cv_magic, ==, CV_MAGIC);
        VERIFY3S(pthread_cond_broadcast(&cv->cv), ==, 0);
 }
 
@@ -896,12 +990,8 @@ kernel_init(int mode)
        VERIFY((random_fd = open("/dev/random", O_RDONLY)) != -1);
        VERIFY((urandom_fd = open("/dev/urandom", O_RDONLY)) != -1);
 
-       mutex_init(&kthread_lock, NULL, MUTEX_DEFAULT, NULL);
-       list_create(&kthread_list, sizeof (kthread_t),
-                   offsetof(kthread_t, t_node));
-
+       thread_init();
        system_taskq_init();
-
        spa_init(mode);
 }
 
@@ -909,9 +999,8 @@ void
 kernel_fini(void)
 {
        spa_fini();
-
-       list_destroy(&kthread_list);
-       mutex_destroy(&kthread_lock);
+       system_taskq_fini();
+       thread_fini();
 
        close(random_fd);
        close(urandom_fd);
index 40dfb678933dc666ae0845327c58c293ea382ea5..42e2dd3f43c858277a7299a7b5b7f63bb30abe7b 100644 (file)
@@ -43,6 +43,7 @@ struct taskq {
        kcondvar_t      tq_dispatch_cv;
        kcondvar_t      tq_wait_cv;
        kthread_t       **tq_threadlist;
+       kt_did_t        *tq_idlist;
        int             tq_flags;
        int             tq_active;
        int             tq_nthreads;
@@ -163,6 +164,7 @@ taskq_thread(void *arg)
        tq->tq_nthreads--;
        cv_broadcast(&tq->tq_wait_cv);
        mutex_exit(&tq->tq_lock);
+       thread_exit();
        return (NULL);
 }
 
@@ -198,7 +200,10 @@ taskq_create(const char *name, int nthreads, pri_t pri,
        tq->tq_maxalloc = maxalloc;
        tq->tq_task.task_next = &tq->tq_task;
        tq->tq_task.task_prev = &tq->tq_task;
-       tq->tq_threadlist = kmem_alloc(nthreads*sizeof(kthread_t *), KM_SLEEP);
+       VERIFY3P((tq->tq_threadlist = kmem_alloc(tq->tq_nthreads *
+                sizeof(kthread_t *), KM_SLEEP)), !=, NULL);
+       VERIFY3P((tq->tq_idlist = kmem_alloc(tq->tq_nthreads *
+                sizeof(kt_did_t), KM_SLEEP)), !=, NULL);
 
        if (flags & TASKQ_PREPOPULATE) {
                mutex_enter(&tq->tq_lock);
@@ -207,9 +212,11 @@ taskq_create(const char *name, int nthreads, pri_t pri,
                mutex_exit(&tq->tq_lock);
        }
 
-       for (t = 0; t < nthreads; t++)
+       for (t = 0; t < tq->tq_nthreads; t++) {
                VERIFY((tq->tq_threadlist[t] = thread_create(NULL, 0,
                       taskq_thread, tq, THR_BOUND, NULL, 0, 0)) != NULL);
+               tq->tq_idlist[t] = tq->tq_threadlist[t]->t_tid;
+       }
 
        return (tq);
 }
@@ -239,9 +246,10 @@ taskq_destroy(taskq_t *tq)
        mutex_exit(&tq->tq_lock);
 
        for (t = 0; t < nthreads; t++)
-               (void) thr_join(tq->tq_threadlist[t], NULL, NULL);
+               VERIFY3S(thread_join(tq->tq_idlist[t], NULL, NULL), ==, 0);
 
        kmem_free(tq->tq_threadlist, nthreads * sizeof(kthread_t *));
+       kmem_free(tq->tq_idlist, nthreads * sizeof(kt_did_t));
 
        rw_destroy(&tq->tq_threadlock);
        mutex_destroy(&tq->tq_lock);
@@ -272,3 +280,9 @@ system_taskq_init(void)
        system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512,
            TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
 }
+
+void
+system_taskq_fini(void)
+{
+       taskq_destroy(system_taskq);
+}
index b5fcc8c4a858a67c92cd112b8b7239120b992e1a..15745b3e983a9f2c522f77e6895508e568d580b0 100644 (file)
@@ -446,6 +446,8 @@ txg_sync_thread(dsl_pool_t *dp)
                rw_exit(&tx->tx_suspend);
                cv_broadcast(&tx->tx_sync_done_cv);
        }
+
+       thread_exit();
 }
 
 static void
@@ -490,6 +492,8 @@ txg_quiesce_thread(dsl_pool_t *dp)
                cv_broadcast(&tx->tx_sync_more_cv);
                cv_broadcast(&tx->tx_quiesce_done_cv);
        }
+
+       thread_exit();
 }
 
 /*