From 2c02b71b1411176905228666abf7a50a2e5f85dc Mon Sep 17 00:00:00 2001 From: Prakash Surya Date: Mon, 5 Dec 2011 17:32:48 -0800 Subject: [PATCH] Replace tq_work_list and tq_threads in taskq_t To lay the ground work for introducing the taskq_dispatch_prealloc() interface, the tq_work_list and tq_threads fields had to be replaced with new alternatives in the taskq_t structure. The tq_threads field was replaced with tq_thread_list. Rather than storing the pointers to the taskq's kernel threads in an array, they are now stored as a list. In addition to laying the ground work for the taskq_dispatch_prealloc() interface, this change could also enable taskq threads to be dynamically created and destroyed as threads can now be added and removed to this list relatively easily. The tq_work_list field was replaced with tq_active_list. Instead of keeping a list of taskq_ent_t's which are currently being serviced, a list of taskq_threads currently servicing a taskq_ent_t is kept. This frees up the taskq_ent_t's tqent_list field when it is being serviced (i.e. now when a taskq_ent_t is being serviced, it's tqent_list field will be empty). Signed-off-by: Prakash Surya Signed-off-by: Brian Behlendorf Issue #65 --- include/sys/taskq.h | 20 ++++++- module/spl/spl-taskq.c | 132 ++++++++++++++++++++++++----------------- 2 files changed, 95 insertions(+), 57 deletions(-) diff --git a/include/sys/taskq.h b/include/sys/taskq.h index 57f8b1c..4ea29cb 100644 --- a/include/sys/taskq.h +++ b/include/sys/taskq.h @@ -45,6 +45,14 @@ typedef unsigned long taskqid_t; typedef void (task_func_t)(void *); +typedef struct taskq_ent { + spinlock_t tqent_lock; + struct list_head tqent_list; + taskqid_t tqent_id; + task_func_t *tqent_func; + void *tqent_arg; +} taskq_ent_t; + /* * Flags for taskq_dispatch. TQ_SLEEP/TQ_NOSLEEP should be same as * KM_SLEEP/KM_NOSLEEP. TQ_NOQUEUE/TQ_NOALLOC are set particularly @@ -61,8 +69,9 @@ typedef void (task_func_t)(void *); typedef struct taskq { spinlock_t tq_lock; /* protects taskq_t */ unsigned long tq_lock_flags; /* interrupt state */ - struct task_struct **tq_threads; /* thread pointers */ const char *tq_name; /* taskq name */ + struct list_head tq_thread_list;/* list of all threads */ + struct list_head tq_active_list;/* list of active threads */ int tq_nactive; /* # of active threads */ int tq_nthreads; /* # of total threads */ int tq_pri; /* priority */ @@ -73,13 +82,20 @@ typedef struct taskq { taskqid_t tq_next_id; /* next pend/work id */ taskqid_t tq_lowest_id; /* lowest pend/work id */ struct list_head tq_free_list; /* free task_t's */ - struct list_head tq_work_list; /* work task_t's */ struct list_head tq_pend_list; /* pending task_t's */ struct list_head tq_prio_list; /* priority pending task_t's */ wait_queue_head_t tq_work_waitq; /* new work waitq */ wait_queue_head_t tq_wait_waitq; /* wait waitq */ } taskq_t; +typedef struct taskq_thread { + struct list_head tqt_thread_list; + struct list_head tqt_active_list; + struct task_struct *tqt_thread; + taskq_t *tqt_tq; + taskq_ent_t *tqt_ent; +} taskq_thread_t; + /* Global system-wide dynamic task queue available for all consumers */ extern taskq_t *system_taskq; diff --git a/module/spl/spl-taskq.c b/module/spl/spl-taskq.c index 50d32e0..5c22544 100644 --- a/module/spl/spl-taskq.c +++ b/module/spl/spl-taskq.c @@ -38,14 +38,6 @@ taskq_t *system_taskq; EXPORT_SYMBOL(system_taskq); -typedef struct taskq_ent { - spinlock_t tqent_lock; - struct list_head tqent_list; - taskqid_t tqent_id; - task_func_t *tqent_func; - void *tqent_arg; -} taskq_ent_t; - /* * NOTE: Must be called with tq->tq_lock held, returns a list_t which * is not attached to the free, work, or pending taskq lists. @@ -228,15 +220,18 @@ EXPORT_SYMBOL(__taskq_wait); int __taskq_member(taskq_t *tq, void *t) { - int i; + struct list_head *l; + taskq_thread_t *tqt; SENTRY; ASSERT(tq); ASSERT(t); - for (i = 0; i < tq->tq_nthreads; i++) - if (tq->tq_threads[i] == (struct task_struct *)t) - SRETURN(1); + list_for_each(l, &tq->tq_thread_list) { + tqt = list_entry(l, taskq_thread_t, tqt_thread_list); + if (tqt->tqt_thread == (struct task_struct *)t) + SRETURN(1); + } SRETURN(0); } @@ -305,6 +300,7 @@ taskq_lowest_id(taskq_t *tq) { taskqid_t lowest_id = tq->tq_next_id; taskq_ent_t *t; + taskq_thread_t *tqt; SENTRY; ASSERT(tq); @@ -320,9 +316,11 @@ taskq_lowest_id(taskq_t *tq) lowest_id = MIN(lowest_id, t->tqent_id); } - if (!list_empty(&tq->tq_work_list)) { - t = list_entry(tq->tq_work_list.next, taskq_ent_t, tqent_list); - lowest_id = MIN(lowest_id, t->tqent_id); + if (!list_empty(&tq->tq_active_list)) { + tqt = list_entry(tq->tq_active_list.next, taskq_thread_t, + tqt_active_list); + ASSERT(tqt->tqt_ent != NULL); + lowest_id = MIN(lowest_id, tqt->tqt_ent->tqent_id); } SRETURN(lowest_id); @@ -333,25 +331,25 @@ taskq_lowest_id(taskq_t *tq) * taskqid. */ static void -taskq_insert_in_order(taskq_t *tq, taskq_ent_t *t) +taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt) { - taskq_ent_t *w; + taskq_thread_t *w; struct list_head *l; SENTRY; ASSERT(tq); - ASSERT(t); + ASSERT(tqt); ASSERT(spin_is_locked(&tq->tq_lock)); - list_for_each_prev(l, &tq->tq_work_list) { - w = list_entry(l, taskq_ent_t, tqent_list); - if (w->tqent_id < t->tqent_id) { - list_add(&t->tqent_list, l); + list_for_each_prev(l, &tq->tq_active_list) { + w = list_entry(l, taskq_thread_t, tqt_active_list); + if (w->tqt_ent->tqent_id < tqt->tqt_ent->tqent_id) { + list_add(&tqt->tqt_active_list, l); break; } } - if (l == &tq->tq_work_list) - list_add(&t->tqent_list, &tq->tq_work_list); + if (l == &tq->tq_active_list) + list_add(&tqt->tqt_active_list, &tq->tq_active_list); SEXIT; } @@ -362,12 +360,14 @@ taskq_thread(void *args) DECLARE_WAITQUEUE(wait, current); sigset_t blocked; taskqid_t id; - taskq_t *tq = args; + taskq_thread_t *tqt = args; + taskq_t *tq; taskq_ent_t *t; struct list_head *pend_list; SENTRY; - ASSERT(tq); + ASSERT(tqt); + tq = tqt->tqt_tq; current->flags |= PF_NOFREEZE; /* Disable the direct memory reclaim path */ @@ -407,7 +407,8 @@ taskq_thread(void *args) if (pend_list) { t = list_entry(pend_list->next, taskq_ent_t, tqent_list); list_del_init(&t->tqent_list); - taskq_insert_in_order(tq, t); + tqt->tqt_ent = t; + taskq_insert_in_order(tq, tqt); tq->tq_nactive++; spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); @@ -416,6 +417,8 @@ taskq_thread(void *args) spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); tq->tq_nactive--; + list_del_init(&tqt->tqt_active_list); + tqt->tqt_ent = NULL; id = t->tqent_id; task_done(tq, t); @@ -435,6 +438,9 @@ taskq_thread(void *args) __set_current_state(TASK_RUNNING); tq->tq_nthreads--; + list_del_init(&tqt->tqt_thread_list); + kmem_free(tqt, sizeof(taskq_thread_t)); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); SRETURN(0); @@ -445,7 +451,7 @@ __taskq_create(const char *name, int nthreads, pri_t pri, int minalloc, int maxalloc, uint_t flags) { taskq_t *tq; - struct task_struct *t; + taskq_thread_t *tqt; int rc = 0, i, j = 0; SENTRY; @@ -468,14 +474,10 @@ __taskq_create(const char *name, int nthreads, pri_t pri, if (tq == NULL) SRETURN(NULL); - tq->tq_threads = kmem_alloc(nthreads * sizeof(t), KM_SLEEP); - if (tq->tq_threads == NULL) { - kmem_free(tq, sizeof(*tq)); - SRETURN(NULL); - } - spin_lock_init(&tq->tq_lock); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + INIT_LIST_HEAD(&tq->tq_thread_list); + INIT_LIST_HEAD(&tq->tq_active_list); tq->tq_name = name; tq->tq_nactive = 0; tq->tq_nthreads = 0; @@ -487,7 +489,6 @@ __taskq_create(const char *name, int nthreads, pri_t pri, tq->tq_next_id = 1; tq->tq_lowest_id = 1; INIT_LIST_HEAD(&tq->tq_free_list); - INIT_LIST_HEAD(&tq->tq_work_list); INIT_LIST_HEAD(&tq->tq_pend_list); INIT_LIST_HEAD(&tq->tq_prio_list); init_waitqueue_head(&tq->tq_work_waitq); @@ -499,19 +500,26 @@ __taskq_create(const char *name, int nthreads, pri_t pri, spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - for (i = 0; i < nthreads; i++) { - t = kthread_create(taskq_thread, tq, "%s/%d", name, i); - if (t) { - tq->tq_threads[i] = t; - kthread_bind(t, i % num_online_cpus()); - set_user_nice(t, PRIO_TO_NICE(pri)); - wake_up_process(t); + for (i = 0; i < nthreads; i++) { + tqt = kmem_alloc(sizeof(*tqt), KM_SLEEP); + INIT_LIST_HEAD(&tqt->tqt_thread_list); + INIT_LIST_HEAD(&tqt->tqt_active_list); + tqt->tqt_tq = tq; + tqt->tqt_ent = NULL; + + tqt->tqt_thread = kthread_create(taskq_thread, tqt, + "%s/%d", name, i); + if (tqt->tqt_thread) { + list_add(&tqt->tqt_thread_list, &tq->tq_thread_list); + kthread_bind(tqt->tqt_thread, i % num_online_cpus()); + set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(pri)); + wake_up_process(tqt->tqt_thread); j++; - } else { - tq->tq_threads[i] = NULL; - rc = 1; - } - } + } else { + kmem_free(tqt, sizeof(taskq_thread_t)); + rc = 1; + } + } /* Wait for all threads to be started before potential destroy */ wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j); @@ -528,8 +536,9 @@ EXPORT_SYMBOL(__taskq_create); void __taskq_destroy(taskq_t *tq) { + struct task_struct *thread; + taskq_thread_t *tqt; taskq_ent_t *t; - int i, nthreads; SENTRY; ASSERT(tq); @@ -540,13 +549,25 @@ __taskq_destroy(taskq_t *tq) /* TQ_ACTIVE cleared prevents new tasks being added to pending */ __taskq_wait(tq); - nthreads = tq->tq_nthreads; - for (i = 0; i < nthreads; i++) - if (tq->tq_threads[i]) - kthread_stop(tq->tq_threads[i]); - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + /* + * Signal each thread to exit and block until it does. Each thread + * is responsible for removing itself from the list and freeing its + * taskq_thread_t. This allows for idle threads to opt to remove + * themselves from the taskq. They can be recreated as needed. + */ + while (!list_empty(&tq->tq_thread_list)) { + tqt = list_entry(tq->tq_thread_list.next, + taskq_thread_t, tqt_thread_list); + thread = tqt->tqt_thread; + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + + kthread_stop(thread); + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + } + while (!list_empty(&tq->tq_free_list)) { t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list); list_del_init(&t->tqent_list); @@ -555,13 +576,14 @@ __taskq_destroy(taskq_t *tq) ASSERT(tq->tq_nthreads == 0); ASSERT(tq->tq_nalloc == 0); + ASSERT(list_empty(&tq->tq_thread_list)); + ASSERT(list_empty(&tq->tq_active_list)); ASSERT(list_empty(&tq->tq_free_list)); - ASSERT(list_empty(&tq->tq_work_list)); ASSERT(list_empty(&tq->tq_pend_list)); ASSERT(list_empty(&tq->tq_prio_list)); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - kmem_free(tq->tq_threads, nthreads * sizeof(taskq_ent_t *)); + kmem_free(tq, sizeof(taskq_t)); SEXIT; -- 2.40.0