]> granicus.if.org Git - zfs/commitdiff
Replace tq_work_list and tq_threads in taskq_t
authorPrakash Surya <surya1@llnl.gov>
Tue, 6 Dec 2011 01:32:48 +0000 (17:32 -0800)
committerBrian Behlendorf <behlendorf1@llnl.gov>
Wed, 14 Dec 2011 00:10:50 +0000 (16:10 -0800)
To lay the ground work for introducing the taskq_dispatch_prealloc()
interface, the tq_work_list and tq_threads fields had to be replaced
with new alternatives in the taskq_t structure.

The tq_threads field was replaced with tq_thread_list. Rather than
storing the pointers to the taskq's kernel threads in an array, they are
now stored as a list. In addition to laying the ground work for the
taskq_dispatch_prealloc() interface, this change could also enable taskq
threads to be dynamically created and destroyed as threads can now be
added and removed to this list relatively easily.

The tq_work_list field was replaced with tq_active_list. Instead of
keeping a list of taskq_ent_t's which are currently being serviced, a
list of taskq_threads currently servicing a taskq_ent_t is kept. This
frees up the taskq_ent_t's tqent_list field when it is being serviced
(i.e. now when a taskq_ent_t is being serviced, it's tqent_list field
will be empty).

Signed-off-by: Prakash Surya <surya1@llnl.gov>
Signed-off-by: Brian Behlendorf <behlendorf1@llnl.gov>
Issue #65

include/sys/taskq.h
module/spl/spl-taskq.c

index 57f8b1cb59cfcb2c81866414bff8826cf101a03c..4ea29cb3bb3b21c3ef31f8cce8c8dbf6729508d7 100644 (file)
 typedef unsigned long taskqid_t;
 typedef void (task_func_t)(void *);
 
+typedef struct taskq_ent {
+        spinlock_t              tqent_lock;
+        struct list_head        tqent_list;
+        taskqid_t               tqent_id;
+        task_func_t             *tqent_func;
+        void                    *tqent_arg;
+} taskq_ent_t;
+
 /*
  * Flags for taskq_dispatch. TQ_SLEEP/TQ_NOSLEEP should be same as
  * KM_SLEEP/KM_NOSLEEP.  TQ_NOQUEUE/TQ_NOALLOC are set particularly
@@ -61,8 +69,9 @@ typedef void (task_func_t)(void *);
 typedef struct taskq {
         spinlock_t              tq_lock;       /* protects taskq_t */
         unsigned long           tq_lock_flags; /* interrupt state */
-        struct task_struct      **tq_threads;  /* thread pointers */
        const char              *tq_name;      /* taskq name */
+        struct list_head        tq_thread_list;/* list of all threads */
+       struct list_head        tq_active_list;/* list of active threads */
         int                     tq_nactive;    /* # of active threads */
         int                     tq_nthreads;   /* # of total threads */
        int                     tq_pri;        /* priority */
@@ -73,13 +82,20 @@ typedef struct taskq {
        taskqid_t               tq_next_id;    /* next pend/work id */
        taskqid_t               tq_lowest_id;  /* lowest pend/work id */
        struct list_head        tq_free_list;  /* free task_t's */
-       struct list_head        tq_work_list;  /* work task_t's */
        struct list_head        tq_pend_list;  /* pending task_t's */
        struct list_head        tq_prio_list;  /* priority pending task_t's */
        wait_queue_head_t       tq_work_waitq; /* new work waitq */
        wait_queue_head_t       tq_wait_waitq; /* wait waitq */
 } taskq_t;
 
+typedef struct taskq_thread {
+       struct list_head       tqt_thread_list;
+       struct list_head       tqt_active_list;
+       struct task_struct     *tqt_thread;
+       taskq_t                *tqt_tq;
+       taskq_ent_t            *tqt_ent;
+} taskq_thread_t;
+
 /* Global system-wide dynamic task queue available for all consumers */
 extern taskq_t *system_taskq;
 
index 50d32e021ae98e983b754bb56e4168bbe573ba76..5c22544b8b14c53b9234da28ba0af5e59e463ae6 100644 (file)
 taskq_t *system_taskq;
 EXPORT_SYMBOL(system_taskq);
 
-typedef struct taskq_ent {
-        spinlock_t              tqent_lock;
-        struct list_head        tqent_list;
-        taskqid_t               tqent_id;
-        task_func_t             *tqent_func;
-        void                    *tqent_arg;
-} taskq_ent_t;
-
 /*
  * NOTE: Must be called with tq->tq_lock held, returns a list_t which
  * is not attached to the free, work, or pending taskq lists.
@@ -228,15 +220,18 @@ EXPORT_SYMBOL(__taskq_wait);
 int
 __taskq_member(taskq_t *tq, void *t)
 {
-        int i;
+       struct list_head *l;
+       taskq_thread_t *tqt;
         SENTRY;
 
        ASSERT(tq);
         ASSERT(t);
 
-        for (i = 0; i < tq->tq_nthreads; i++)
-                if (tq->tq_threads[i] == (struct task_struct *)t)
-                        SRETURN(1);
+       list_for_each(l, &tq->tq_thread_list) {
+               tqt = list_entry(l, taskq_thread_t, tqt_thread_list);
+               if (tqt->tqt_thread == (struct task_struct *)t)
+                       SRETURN(1);
+       }
 
         SRETURN(0);
 }
@@ -305,6 +300,7 @@ taskq_lowest_id(taskq_t *tq)
 {
        taskqid_t lowest_id = tq->tq_next_id;
         taskq_ent_t *t;
+       taskq_thread_t *tqt;
        SENTRY;
 
        ASSERT(tq);
@@ -320,9 +316,11 @@ taskq_lowest_id(taskq_t *tq)
                lowest_id = MIN(lowest_id, t->tqent_id);
        }
 
-       if (!list_empty(&tq->tq_work_list)) {
-               t = list_entry(tq->tq_work_list.next, taskq_ent_t, tqent_list);
-               lowest_id = MIN(lowest_id, t->tqent_id);
+       if (!list_empty(&tq->tq_active_list)) {
+               tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
+                                tqt_active_list);
+               ASSERT(tqt->tqt_ent != NULL);
+               lowest_id = MIN(lowest_id, tqt->tqt_ent->tqent_id);
        }
 
        SRETURN(lowest_id);
@@ -333,25 +331,25 @@ taskq_lowest_id(taskq_t *tq)
  * taskqid.
  */
 static void
-taskq_insert_in_order(taskq_t *tq, taskq_ent_t *t)
+taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
 {
-       taskq_ent_t *w;
+       taskq_thread_t *w;
        struct list_head *l;
 
        SENTRY;
        ASSERT(tq);
-       ASSERT(t);
+       ASSERT(tqt);
        ASSERT(spin_is_locked(&tq->tq_lock));
 
-       list_for_each_prev(l, &tq->tq_work_list) {
-               w = list_entry(l, taskq_ent_t, tqent_list);
-               if (w->tqent_id < t->tqent_id) {
-                       list_add(&t->tqent_list, l);
+       list_for_each_prev(l, &tq->tq_active_list) {
+               w = list_entry(l, taskq_thread_t, tqt_active_list);
+               if (w->tqt_ent->tqent_id < tqt->tqt_ent->tqent_id) {
+                       list_add(&tqt->tqt_active_list, l);
                        break;
                }
        }
-       if (l == &tq->tq_work_list)
-               list_add(&t->tqent_list, &tq->tq_work_list);
+       if (l == &tq->tq_active_list)
+               list_add(&tqt->tqt_active_list, &tq->tq_active_list);
 
        SEXIT;
 }
@@ -362,12 +360,14 @@ taskq_thread(void *args)
         DECLARE_WAITQUEUE(wait, current);
         sigset_t blocked;
        taskqid_t id;
-        taskq_t *tq = args;
+       taskq_thread_t *tqt = args;
+        taskq_t *tq;
         taskq_ent_t *t;
        struct list_head *pend_list;
        SENTRY;
 
-        ASSERT(tq);
+        ASSERT(tqt);
+       tq = tqt->tqt_tq;
         current->flags |= PF_NOFREEZE;
 
        /* Disable the direct memory reclaim path */
@@ -407,7 +407,8 @@ taskq_thread(void *args)
                if (pend_list) {
                         t = list_entry(pend_list->next, taskq_ent_t, tqent_list);
                         list_del_init(&t->tqent_list);
-                       taskq_insert_in_order(tq, t);
+                       tqt->tqt_ent = t;
+                       taskq_insert_in_order(tq, tqt);
                         tq->tq_nactive++;
                        spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
 
@@ -416,6 +417,8 @@ taskq_thread(void *args)
 
                        spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
                         tq->tq_nactive--;
+                       list_del_init(&tqt->tqt_active_list);
+                       tqt->tqt_ent = NULL;
                        id = t->tqent_id;
                         task_done(tq, t);
 
@@ -435,6 +438,9 @@ taskq_thread(void *args)
 
        __set_current_state(TASK_RUNNING);
         tq->tq_nthreads--;
+       list_del_init(&tqt->tqt_thread_list);
+       kmem_free(tqt, sizeof(taskq_thread_t));
+
         spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
 
        SRETURN(0);
@@ -445,7 +451,7 @@ __taskq_create(const char *name, int nthreads, pri_t pri,
                int minalloc, int maxalloc, uint_t flags)
 {
         taskq_t *tq;
-        struct task_struct *t;
+       taskq_thread_t *tqt;
         int rc = 0, i, j = 0;
         SENTRY;
 
@@ -468,14 +474,10 @@ __taskq_create(const char *name, int nthreads, pri_t pri,
         if (tq == NULL)
                 SRETURN(NULL);
 
-        tq->tq_threads = kmem_alloc(nthreads * sizeof(t), KM_SLEEP);
-        if (tq->tq_threads == NULL) {
-                kmem_free(tq, sizeof(*tq));
-                SRETURN(NULL);
-        }
-
         spin_lock_init(&tq->tq_lock);
         spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+        INIT_LIST_HEAD(&tq->tq_thread_list);
+        INIT_LIST_HEAD(&tq->tq_active_list);
         tq->tq_name      = name;
         tq->tq_nactive   = 0;
        tq->tq_nthreads  = 0;
@@ -487,7 +489,6 @@ __taskq_create(const char *name, int nthreads, pri_t pri,
        tq->tq_next_id   = 1;
        tq->tq_lowest_id = 1;
         INIT_LIST_HEAD(&tq->tq_free_list);
-        INIT_LIST_HEAD(&tq->tq_work_list);
         INIT_LIST_HEAD(&tq->tq_pend_list);
         INIT_LIST_HEAD(&tq->tq_prio_list);
         init_waitqueue_head(&tq->tq_work_waitq);
@@ -499,19 +500,26 @@ __taskq_create(const char *name, int nthreads, pri_t pri,
 
         spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
 
-        for (i = 0; i < nthreads; i++) {
-                t = kthread_create(taskq_thread, tq, "%s/%d", name, i);
-                if (t) {
-                        tq->tq_threads[i] = t;
-                        kthread_bind(t, i % num_online_cpus());
-                        set_user_nice(t, PRIO_TO_NICE(pri));
-                        wake_up_process(t);
+       for (i = 0; i < nthreads; i++) {
+               tqt = kmem_alloc(sizeof(*tqt), KM_SLEEP);
+               INIT_LIST_HEAD(&tqt->tqt_thread_list);
+               INIT_LIST_HEAD(&tqt->tqt_active_list);
+               tqt->tqt_tq = tq;
+               tqt->tqt_ent = NULL;
+
+               tqt->tqt_thread = kthread_create(taskq_thread, tqt,
+                                                "%s/%d", name, i);
+               if (tqt->tqt_thread) {
+                       list_add(&tqt->tqt_thread_list, &tq->tq_thread_list);
+                       kthread_bind(tqt->tqt_thread, i % num_online_cpus());
+                       set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(pri));
+                       wake_up_process(tqt->tqt_thread);
                        j++;
-                } else {
-                        tq->tq_threads[i] = NULL;
-                        rc = 1;
-                }
-        }
+               } else {
+                       kmem_free(tqt, sizeof(taskq_thread_t));
+                       rc = 1;
+               }
+       }
 
         /* Wait for all threads to be started before potential destroy */
        wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j);
@@ -528,8 +536,9 @@ EXPORT_SYMBOL(__taskq_create);
 void
 __taskq_destroy(taskq_t *tq)
 {
+       struct task_struct *thread;
+       taskq_thread_t *tqt;
        taskq_ent_t *t;
-       int i, nthreads;
        SENTRY;
 
        ASSERT(tq);
@@ -540,13 +549,25 @@ __taskq_destroy(taskq_t *tq)
        /* TQ_ACTIVE cleared prevents new tasks being added to pending */
         __taskq_wait(tq);
 
-       nthreads = tq->tq_nthreads;
-       for (i = 0; i < nthreads; i++)
-               if (tq->tq_threads[i])
-                       kthread_stop(tq->tq_threads[i]);
-
         spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
 
+       /*
+        * Signal each thread to exit and block until it does.  Each thread
+        * is responsible for removing itself from the list and freeing its
+        * taskq_thread_t.  This allows for idle threads to opt to remove
+        * themselves from the taskq.  They can be recreated as needed.
+        */
+       while (!list_empty(&tq->tq_thread_list)) {
+               tqt = list_entry(tq->tq_thread_list.next,
+                                taskq_thread_t, tqt_thread_list);
+               thread = tqt->tqt_thread;
+               spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+
+               kthread_stop(thread);
+
+               spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+       }
+
         while (!list_empty(&tq->tq_free_list)) {
                t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
                list_del_init(&t->tqent_list);
@@ -555,13 +576,14 @@ __taskq_destroy(taskq_t *tq)
 
         ASSERT(tq->tq_nthreads == 0);
         ASSERT(tq->tq_nalloc == 0);
+        ASSERT(list_empty(&tq->tq_thread_list));
+        ASSERT(list_empty(&tq->tq_active_list));
         ASSERT(list_empty(&tq->tq_free_list));
-        ASSERT(list_empty(&tq->tq_work_list));
         ASSERT(list_empty(&tq->tq_pend_list));
         ASSERT(list_empty(&tq->tq_prio_list));
 
         spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
-        kmem_free(tq->tq_threads, nthreads * sizeof(taskq_ent_t *));
+
         kmem_free(tq, sizeof(taskq_t));
 
        SEXIT;