]> granicus.if.org Git - spl/commitdiff
Implementation of the TQ_FRONT flag.
authorNed Bass <bass6@llnl.gov>
Thu, 1 Jul 2010 17:07:51 +0000 (10:07 -0700)
committerBrian Behlendorf <behlendorf1@llnl.gov>
Thu, 1 Jul 2010 17:59:38 +0000 (10:59 -0700)
Adds a task queue to receive tasks dispatched with TQ_FRONT.  Worker
threads pull tasks from this high priority queue before the default
pending queue.

Executing tasks out of FIFO order potentially breaks taskq_lowest_id()
if we do not preserve the ordering of the work list by taskqid.
Therefore, instead of always appending to the work list, we search for
the appropriate place to insert a task.  The common case is to append
to the list, so we make this operation efficient by searching the work
list in reverse order.

Signed-off-by: Brian Behlendorf <behlendorf1@llnl.gov>
include/sys/taskq.h
module/spl/spl-taskq.c

index 4e51d98ddee868d91b471c67997132400ca99f8e..c83409d4936ffc4c2d0731bc0ca297636ad35210 100644 (file)
@@ -74,6 +74,7 @@ typedef struct taskq {
        struct list_head        tq_free_list;  /* free task_t's */
        struct list_head        tq_work_list;  /* work task_t's */
        struct list_head        tq_pend_list;  /* pending task_t's */
+       struct list_head        tq_prio_list;  /* priority pending task_t's */
        wait_queue_head_t       tq_work_waitq; /* new work waitq */
        wait_queue_head_t       tq_wait_waitq; /* wait waitq */
 } taskq_t;
index 805749a14ed76e3cd80a8bebed9925df274f2655..9aca699c72317458b18bad215025018250472c11 100644 (file)
@@ -158,21 +158,22 @@ task_done(taskq_t *tq, spl_task_t *t)
 
 /*
  * As tasks are submitted to the task queue they are assigned a
- * monotonically increasing taskqid and added to the tail of the
- * pending list.  As worker threads become available the tasks are
- * removed from the head of the pending list and added to the tail
- * of the work list.  Finally, as tasks complete they are removed
- * from the work list.  This means that the pending and work lists
- * are always kept sorted by taskqid.  Thus the lowest outstanding
+ * monotonically increasing taskqid and added to the tail of the pending
+ * list.  As worker threads become available the tasks are removed from
+ * the head of the pending or priority list, giving preference to the
+ * priority list.  The tasks are then added to the work list, preserving
+ * the ordering by taskqid.  Finally, as tasks complete they are removed
+ * from the work list.  This means that the pending and work lists are
+ * always kept sorted by taskqid.  Thus the lowest outstanding
  * incomplete taskqid can be determined simply by checking the min
- * taskqid for each head item on the pending and work list.  This
- * value is stored in tq->tq_lowest_id and only updated to the new
- * lowest id when the previous lowest id completes.  All taskqids
- * lower than tq->tq_lowest_id must have completed.  It is also
- * possible larger taskqid's have completed because they may be
- * processed in parallel by several worker threads.  However, this
- * is not a problem because the behavior of taskq_wait_id() is to
- * block until all previously submitted taskqid's have completed.
+ * taskqid for each head item on the pending, priority, and work list.
+ * This value is stored in tq->tq_lowest_id and only updated to the new
+ * lowest id when the previous lowest id completes.  All taskqids lower
+ * than tq->tq_lowest_id must have completed.  It is also possible
+ * larger taskqid's have completed because they may be processed in
+ * parallel by several worker threads.  However, this is not a problem
+ * because the behavior of taskq_wait_id() is to block until all
+ * previously submitted taskqid's have completed.
  *
  * XXX: Taskqid_t wrapping is not handled.  However, taskqid_t's are
  * 64-bit values so even if a taskq is processing 2^24 (16,777,216)
@@ -274,7 +275,13 @@ __taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
                GOTO(out, rc = 0);
 
        spin_lock(&t->t_lock);
-       list_add_tail(&t->t_list, &tq->tq_pend_list);
+
+       /* Queue to the priority list instead of the pending list */
+       if (flags & TQ_FRONT)
+               list_add_tail(&t->t_list, &tq->tq_prio_list);
+       else
+               list_add_tail(&t->t_list, &tq->tq_pend_list);
+
        t->t_id = rc = tq->tq_next_id;
        tq->tq_next_id++;
         t->t_func = func;
@@ -290,8 +297,9 @@ EXPORT_SYMBOL(__taskq_dispatch);
 
 /*
  * Returns the lowest incomplete taskqid_t.  The taskqid_t may
- * be queued on the pending list or may be on the work list
- * currently being handled, but it is not 100% complete yet.
+ * be queued on the pending list, on the priority list,  or on
+ * the work list currently being handled, but it is not 100%
+ * complete yet.
  */
 static taskqid_t
 taskq_lowest_id(taskq_t *tq)
@@ -308,6 +316,11 @@ taskq_lowest_id(taskq_t *tq)
                lowest_id = MIN(lowest_id, t->t_id);
        }
 
+       if (!list_empty(&tq->tq_prio_list)) {
+               t = list_entry(tq->tq_prio_list.next, spl_task_t, t_list);
+               lowest_id = MIN(lowest_id, t->t_id);
+       }
+
        if (!list_empty(&tq->tq_work_list)) {
                t = list_entry(tq->tq_work_list.next, spl_task_t, t_list);
                lowest_id = MIN(lowest_id, t->t_id);
@@ -316,6 +329,34 @@ taskq_lowest_id(taskq_t *tq)
        RETURN(lowest_id);
 }
 
+/*
+ * Insert a task into a list keeping the list sorted by increasing
+ * taskqid.
+ */
+static void
+taskq_insert_in_order(taskq_t *tq, spl_task_t *t)
+{
+       spl_task_t *w;
+       struct list_head *l;
+
+       ENTRY;
+       ASSERT(tq);
+       ASSERT(t);
+       ASSERT(spin_is_locked(&tq->tq_lock));
+
+       list_for_each_prev(l, &tq->tq_work_list) {
+               w = list_entry(l, spl_task_t, t_list);
+               if (w->t_id < t->t_id) {
+                       list_add(&t->t_list, l);
+                       break;
+               }
+       }
+       if (l == &tq->tq_work_list)
+               list_add(&t->t_list, &tq->tq_work_list);
+
+       EXIT;
+}
+
 static int
 taskq_thread(void *args)
 {
@@ -324,6 +365,7 @@ taskq_thread(void *args)
        taskqid_t id;
         taskq_t *tq = args;
         spl_task_t *t;
+       struct list_head *pend_list;
        ENTRY;
 
         ASSERT(tq);
@@ -341,7 +383,8 @@ taskq_thread(void *args)
         while (!kthread_should_stop()) {
 
                add_wait_queue(&tq->tq_work_waitq, &wait);
-               if (list_empty(&tq->tq_pend_list)) {
+               if (list_empty(&tq->tq_pend_list) &&
+                   list_empty(&tq->tq_prio_list)) {
                        spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
                        schedule();
                        spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
@@ -350,10 +393,18 @@ taskq_thread(void *args)
                }
 
                remove_wait_queue(&tq->tq_work_waitq, &wait);
-                if (!list_empty(&tq->tq_pend_list)) {
-                        t = list_entry(tq->tq_pend_list.next,spl_task_t,t_list);
+
+               if (!list_empty(&tq->tq_prio_list))
+                       pend_list = &tq->tq_prio_list;
+               else if (!list_empty(&tq->tq_pend_list))
+                       pend_list = &tq->tq_pend_list;
+               else
+                       pend_list = NULL;
+
+               if (pend_list) {
+                        t = list_entry(pend_list->next, spl_task_t, t_list);
                         list_del_init(&t->t_list);
-                       list_add_tail(&t->t_list, &tq->tq_work_list);
+                       taskq_insert_in_order(tq, t);
                         tq->tq_nactive++;
                        spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
 
@@ -435,6 +486,7 @@ __taskq_create(const char *name, int nthreads, pri_t pri,
         INIT_LIST_HEAD(&tq->tq_free_list);
         INIT_LIST_HEAD(&tq->tq_work_list);
         INIT_LIST_HEAD(&tq->tq_pend_list);
+        INIT_LIST_HEAD(&tq->tq_prio_list);
         init_waitqueue_head(&tq->tq_work_waitq);
         init_waitqueue_head(&tq->tq_wait_waitq);
 
@@ -503,6 +555,7 @@ __taskq_destroy(taskq_t *tq)
         ASSERT(list_empty(&tq->tq_free_list));
         ASSERT(list_empty(&tq->tq_work_list));
         ASSERT(list_empty(&tq->tq_pend_list));
+        ASSERT(list_empty(&tq->tq_prio_list));
 
         spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
         kmem_free(tq->tq_threads, nthreads * sizeof(spl_task_t *));