typedef unsigned long taskqid_t;
typedef void (task_func_t)(void *);
+typedef struct taskq_ent {
+ spinlock_t tqent_lock;
+ struct list_head tqent_list;
+ taskqid_t tqent_id;
+ task_func_t *tqent_func;
+ void *tqent_arg;
+} taskq_ent_t;
+
/*
* Flags for taskq_dispatch. TQ_SLEEP/TQ_NOSLEEP should be same as
* KM_SLEEP/KM_NOSLEEP. TQ_NOQUEUE/TQ_NOALLOC are set particularly
typedef struct taskq {
spinlock_t tq_lock; /* protects taskq_t */
unsigned long tq_lock_flags; /* interrupt state */
- struct task_struct **tq_threads; /* thread pointers */
const char *tq_name; /* taskq name */
+ struct list_head tq_thread_list;/* list of all threads */
+ struct list_head tq_active_list;/* list of active threads */
int tq_nactive; /* # of active threads */
int tq_nthreads; /* # of total threads */
int tq_pri; /* priority */
taskqid_t tq_next_id; /* next pend/work id */
taskqid_t tq_lowest_id; /* lowest pend/work id */
struct list_head tq_free_list; /* free task_t's */
- struct list_head tq_work_list; /* work task_t's */
struct list_head tq_pend_list; /* pending task_t's */
struct list_head tq_prio_list; /* priority pending task_t's */
wait_queue_head_t tq_work_waitq; /* new work waitq */
wait_queue_head_t tq_wait_waitq; /* wait waitq */
} taskq_t;
+typedef struct taskq_thread {
+ struct list_head tqt_thread_list;
+ struct list_head tqt_active_list;
+ struct task_struct *tqt_thread;
+ taskq_t *tqt_tq;
+ taskq_ent_t *tqt_ent;
+} taskq_thread_t;
+
/* Global system-wide dynamic task queue available for all consumers */
extern taskq_t *system_taskq;
taskq_t *system_taskq;
EXPORT_SYMBOL(system_taskq);
-typedef struct taskq_ent {
- spinlock_t tqent_lock;
- struct list_head tqent_list;
- taskqid_t tqent_id;
- task_func_t *tqent_func;
- void *tqent_arg;
-} taskq_ent_t;
-
/*
* NOTE: Must be called with tq->tq_lock held, returns a list_t which
* is not attached to the free, work, or pending taskq lists.
int
__taskq_member(taskq_t *tq, void *t)
{
- int i;
+ struct list_head *l;
+ taskq_thread_t *tqt;
SENTRY;
ASSERT(tq);
ASSERT(t);
- for (i = 0; i < tq->tq_nthreads; i++)
- if (tq->tq_threads[i] == (struct task_struct *)t)
- SRETURN(1);
+ list_for_each(l, &tq->tq_thread_list) {
+ tqt = list_entry(l, taskq_thread_t, tqt_thread_list);
+ if (tqt->tqt_thread == (struct task_struct *)t)
+ SRETURN(1);
+ }
SRETURN(0);
}
{
taskqid_t lowest_id = tq->tq_next_id;
taskq_ent_t *t;
+ taskq_thread_t *tqt;
SENTRY;
ASSERT(tq);
lowest_id = MIN(lowest_id, t->tqent_id);
}
- if (!list_empty(&tq->tq_work_list)) {
- t = list_entry(tq->tq_work_list.next, taskq_ent_t, tqent_list);
- lowest_id = MIN(lowest_id, t->tqent_id);
+ if (!list_empty(&tq->tq_active_list)) {
+ tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
+ tqt_active_list);
+ ASSERT(tqt->tqt_ent != NULL);
+ lowest_id = MIN(lowest_id, tqt->tqt_ent->tqent_id);
}
SRETURN(lowest_id);
* taskqid.
*/
static void
-taskq_insert_in_order(taskq_t *tq, taskq_ent_t *t)
+taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
{
- taskq_ent_t *w;
+ taskq_thread_t *w;
struct list_head *l;
SENTRY;
ASSERT(tq);
- ASSERT(t);
+ ASSERT(tqt);
ASSERT(spin_is_locked(&tq->tq_lock));
- list_for_each_prev(l, &tq->tq_work_list) {
- w = list_entry(l, taskq_ent_t, tqent_list);
- if (w->tqent_id < t->tqent_id) {
- list_add(&t->tqent_list, l);
+ list_for_each_prev(l, &tq->tq_active_list) {
+ w = list_entry(l, taskq_thread_t, tqt_active_list);
+ if (w->tqt_ent->tqent_id < tqt->tqt_ent->tqent_id) {
+ list_add(&tqt->tqt_active_list, l);
break;
}
}
- if (l == &tq->tq_work_list)
- list_add(&t->tqent_list, &tq->tq_work_list);
+ if (l == &tq->tq_active_list)
+ list_add(&tqt->tqt_active_list, &tq->tq_active_list);
SEXIT;
}
DECLARE_WAITQUEUE(wait, current);
sigset_t blocked;
taskqid_t id;
- taskq_t *tq = args;
+ taskq_thread_t *tqt = args;
+ taskq_t *tq;
taskq_ent_t *t;
struct list_head *pend_list;
SENTRY;
- ASSERT(tq);
+ ASSERT(tqt);
+ tq = tqt->tqt_tq;
current->flags |= PF_NOFREEZE;
/* Disable the direct memory reclaim path */
if (pend_list) {
t = list_entry(pend_list->next, taskq_ent_t, tqent_list);
list_del_init(&t->tqent_list);
- taskq_insert_in_order(tq, t);
+ tqt->tqt_ent = t;
+ taskq_insert_in_order(tq, tqt);
tq->tq_nactive++;
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
tq->tq_nactive--;
+ list_del_init(&tqt->tqt_active_list);
+ tqt->tqt_ent = NULL;
id = t->tqent_id;
task_done(tq, t);
__set_current_state(TASK_RUNNING);
tq->tq_nthreads--;
+ list_del_init(&tqt->tqt_thread_list);
+ kmem_free(tqt, sizeof(taskq_thread_t));
+
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
SRETURN(0);
int minalloc, int maxalloc, uint_t flags)
{
taskq_t *tq;
- struct task_struct *t;
+ taskq_thread_t *tqt;
int rc = 0, i, j = 0;
SENTRY;
if (tq == NULL)
SRETURN(NULL);
- tq->tq_threads = kmem_alloc(nthreads * sizeof(t), KM_SLEEP);
- if (tq->tq_threads == NULL) {
- kmem_free(tq, sizeof(*tq));
- SRETURN(NULL);
- }
-
spin_lock_init(&tq->tq_lock);
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+ INIT_LIST_HEAD(&tq->tq_thread_list);
+ INIT_LIST_HEAD(&tq->tq_active_list);
tq->tq_name = name;
tq->tq_nactive = 0;
tq->tq_nthreads = 0;
tq->tq_next_id = 1;
tq->tq_lowest_id = 1;
INIT_LIST_HEAD(&tq->tq_free_list);
- INIT_LIST_HEAD(&tq->tq_work_list);
INIT_LIST_HEAD(&tq->tq_pend_list);
INIT_LIST_HEAD(&tq->tq_prio_list);
init_waitqueue_head(&tq->tq_work_waitq);
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
- for (i = 0; i < nthreads; i++) {
- t = kthread_create(taskq_thread, tq, "%s/%d", name, i);
- if (t) {
- tq->tq_threads[i] = t;
- kthread_bind(t, i % num_online_cpus());
- set_user_nice(t, PRIO_TO_NICE(pri));
- wake_up_process(t);
+ for (i = 0; i < nthreads; i++) {
+ tqt = kmem_alloc(sizeof(*tqt), KM_SLEEP);
+ INIT_LIST_HEAD(&tqt->tqt_thread_list);
+ INIT_LIST_HEAD(&tqt->tqt_active_list);
+ tqt->tqt_tq = tq;
+ tqt->tqt_ent = NULL;
+
+ tqt->tqt_thread = kthread_create(taskq_thread, tqt,
+ "%s/%d", name, i);
+ if (tqt->tqt_thread) {
+ list_add(&tqt->tqt_thread_list, &tq->tq_thread_list);
+ kthread_bind(tqt->tqt_thread, i % num_online_cpus());
+ set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(pri));
+ wake_up_process(tqt->tqt_thread);
j++;
- } else {
- tq->tq_threads[i] = NULL;
- rc = 1;
- }
- }
+ } else {
+ kmem_free(tqt, sizeof(taskq_thread_t));
+ rc = 1;
+ }
+ }
/* Wait for all threads to be started before potential destroy */
wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j);
void
__taskq_destroy(taskq_t *tq)
{
+ struct task_struct *thread;
+ taskq_thread_t *tqt;
taskq_ent_t *t;
- int i, nthreads;
SENTRY;
ASSERT(tq);
/* TQ_ACTIVE cleared prevents new tasks being added to pending */
__taskq_wait(tq);
- nthreads = tq->tq_nthreads;
- for (i = 0; i < nthreads; i++)
- if (tq->tq_threads[i])
- kthread_stop(tq->tq_threads[i]);
-
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+ /*
+ * Signal each thread to exit and block until it does. Each thread
+ * is responsible for removing itself from the list and freeing its
+ * taskq_thread_t. This allows for idle threads to opt to remove
+ * themselves from the taskq. They can be recreated as needed.
+ */
+ while (!list_empty(&tq->tq_thread_list)) {
+ tqt = list_entry(tq->tq_thread_list.next,
+ taskq_thread_t, tqt_thread_list);
+ thread = tqt->tqt_thread;
+ spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
+
+ kthread_stop(thread);
+
+ spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
+ }
+
while (!list_empty(&tq->tq_free_list)) {
t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
list_del_init(&t->tqent_list);
ASSERT(tq->tq_nthreads == 0);
ASSERT(tq->tq_nalloc == 0);
+ ASSERT(list_empty(&tq->tq_thread_list));
+ ASSERT(list_empty(&tq->tq_active_list));
ASSERT(list_empty(&tq->tq_free_list));
- ASSERT(list_empty(&tq->tq_work_list));
ASSERT(list_empty(&tq->tq_pend_list));
ASSERT(list_empty(&tq->tq_prio_list));
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
- kmem_free(tq->tq_threads, nthreads * sizeof(taskq_ent_t *));
+
kmem_free(tq, sizeof(taskq_t));
SEXIT;