diff --git a/include/sys/taskq.h b/include/sys/taskq.h index 4e51d98dd..c83409d49 100644 --- a/include/sys/taskq.h +++ b/include/sys/taskq.h @@ -74,6 +74,7 @@ typedef struct taskq { struct list_head tq_free_list; /* free task_t's */ struct list_head tq_work_list; /* work task_t's */ struct list_head tq_pend_list; /* pending task_t's */ + struct list_head tq_prio_list; /* priority pending task_t's */ wait_queue_head_t tq_work_waitq; /* new work waitq */ wait_queue_head_t tq_wait_waitq; /* wait waitq */ } taskq_t; diff --git a/module/spl/spl-taskq.c b/module/spl/spl-taskq.c index 805749a14..9aca699c7 100644 --- a/module/spl/spl-taskq.c +++ b/module/spl/spl-taskq.c @@ -158,21 +158,22 @@ task_done(taskq_t *tq, spl_task_t *t) /* * As tasks are submitted to the task queue they are assigned a - * monotonically increasing taskqid and added to the tail of the - * pending list. As worker threads become available the tasks are - * removed from the head of the pending list and added to the tail - * of the work list. Finally, as tasks complete they are removed - * from the work list. This means that the pending and work lists - * are always kept sorted by taskqid. Thus the lowest outstanding + * monotonically increasing taskqid and added to the tail of the pending + * list. As worker threads become available the tasks are removed from + * the head of the pending or priority list, giving preference to the + * priority list. The tasks are then added to the work list, preserving + * the ordering by taskqid. Finally, as tasks complete they are removed + * from the work list. This means that the pending and work lists are + * always kept sorted by taskqid. Thus the lowest outstanding * incomplete taskqid can be determined simply by checking the min - * taskqid for each head item on the pending and work list. This - * value is stored in tq->tq_lowest_id and only updated to the new - * lowest id when the previous lowest id completes. All taskqids - * lower than tq->tq_lowest_id must have completed. It is also - * possible larger taskqid's have completed because they may be - * processed in parallel by several worker threads. However, this - * is not a problem because the behavior of taskq_wait_id() is to - * block until all previously submitted taskqid's have completed. + * taskqid for each head item on the pending, priority, and work list. + * This value is stored in tq->tq_lowest_id and only updated to the new + * lowest id when the previous lowest id completes. All taskqids lower + * than tq->tq_lowest_id must have completed. It is also possible + * larger taskqid's have completed because they may be processed in + * parallel by several worker threads. However, this is not a problem + * because the behavior of taskq_wait_id() is to block until all + * previously submitted taskqid's have completed. * * XXX: Taskqid_t wrapping is not handled. However, taskqid_t's are * 64-bit values so even if a taskq is processing 2^24 (16,777,216) @@ -274,7 +275,13 @@ __taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) GOTO(out, rc = 0); spin_lock(&t->t_lock); - list_add_tail(&t->t_list, &tq->tq_pend_list); + + /* Queue to the priority list instead of the pending list */ + if (flags & TQ_FRONT) + list_add_tail(&t->t_list, &tq->tq_prio_list); + else + list_add_tail(&t->t_list, &tq->tq_pend_list); + t->t_id = rc = tq->tq_next_id; tq->tq_next_id++; t->t_func = func; @@ -290,8 +297,9 @@ EXPORT_SYMBOL(__taskq_dispatch); /* * Returns the lowest incomplete taskqid_t. The taskqid_t may - * be queued on the pending list or may be on the work list - * currently being handled, but it is not 100% complete yet. + * be queued on the pending list, on the priority list, or on + * the work list currently being handled, but it is not 100% + * complete yet. */ static taskqid_t taskq_lowest_id(taskq_t *tq) @@ -308,6 +316,11 @@ taskq_lowest_id(taskq_t *tq) lowest_id = MIN(lowest_id, t->t_id); } + if (!list_empty(&tq->tq_prio_list)) { + t = list_entry(tq->tq_prio_list.next, spl_task_t, t_list); + lowest_id = MIN(lowest_id, t->t_id); + } + if (!list_empty(&tq->tq_work_list)) { t = list_entry(tq->tq_work_list.next, spl_task_t, t_list); lowest_id = MIN(lowest_id, t->t_id); @@ -316,6 +329,34 @@ taskq_lowest_id(taskq_t *tq) RETURN(lowest_id); } +/* + * Insert a task into a list keeping the list sorted by increasing + * taskqid. + */ +static void +taskq_insert_in_order(taskq_t *tq, spl_task_t *t) +{ + spl_task_t *w; + struct list_head *l; + + ENTRY; + ASSERT(tq); + ASSERT(t); + ASSERT(spin_is_locked(&tq->tq_lock)); + + list_for_each_prev(l, &tq->tq_work_list) { + w = list_entry(l, spl_task_t, t_list); + if (w->t_id < t->t_id) { + list_add(&t->t_list, l); + break; + } + } + if (l == &tq->tq_work_list) + list_add(&t->t_list, &tq->tq_work_list); + + EXIT; +} + static int taskq_thread(void *args) { @@ -324,6 +365,7 @@ taskq_thread(void *args) taskqid_t id; taskq_t *tq = args; spl_task_t *t; + struct list_head *pend_list; ENTRY; ASSERT(tq); @@ -341,7 +383,8 @@ taskq_thread(void *args) while (!kthread_should_stop()) { add_wait_queue(&tq->tq_work_waitq, &wait); - if (list_empty(&tq->tq_pend_list)) { + if (list_empty(&tq->tq_pend_list) && + list_empty(&tq->tq_prio_list)) { spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); schedule(); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); @@ -350,10 +393,18 @@ taskq_thread(void *args) } remove_wait_queue(&tq->tq_work_waitq, &wait); - if (!list_empty(&tq->tq_pend_list)) { - t = list_entry(tq->tq_pend_list.next,spl_task_t,t_list); + + if (!list_empty(&tq->tq_prio_list)) + pend_list = &tq->tq_prio_list; + else if (!list_empty(&tq->tq_pend_list)) + pend_list = &tq->tq_pend_list; + else + pend_list = NULL; + + if (pend_list) { + t = list_entry(pend_list->next, spl_task_t, t_list); list_del_init(&t->t_list); - list_add_tail(&t->t_list, &tq->tq_work_list); + taskq_insert_in_order(tq, t); tq->tq_nactive++; spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); @@ -435,6 +486,7 @@ __taskq_create(const char *name, int nthreads, pri_t pri, INIT_LIST_HEAD(&tq->tq_free_list); INIT_LIST_HEAD(&tq->tq_work_list); INIT_LIST_HEAD(&tq->tq_pend_list); + INIT_LIST_HEAD(&tq->tq_prio_list); init_waitqueue_head(&tq->tq_work_waitq); init_waitqueue_head(&tq->tq_wait_waitq); @@ -503,6 +555,7 @@ __taskq_destroy(taskq_t *tq) ASSERT(list_empty(&tq->tq_free_list)); ASSERT(list_empty(&tq->tq_work_list)); ASSERT(list_empty(&tq->tq_pend_list)); + ASSERT(list_empty(&tq->tq_prio_list)); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); kmem_free(tq->tq_threads, nthreads * sizeof(spl_task_t *));