Replace tq_work_list and tq_threads in taskq_t

To lay the ground work for introducing the taskq_dispatch_prealloc()
interface, the tq_work_list and tq_threads fields had to be replaced
with new alternatives in the taskq_t structure.

The tq_threads field was replaced with tq_thread_list. Rather than
storing the pointers to the taskq's kernel threads in an array, they are
now stored as a list. In addition to laying the ground work for the
taskq_dispatch_prealloc() interface, this change could also enable taskq
threads to be dynamically created and destroyed as threads can now be
added and removed to this list relatively easily.

The tq_work_list field was replaced with tq_active_list. Instead of
keeping a list of taskq_ent_t's which are currently being serviced, a
list of taskq_threads currently servicing a taskq_ent_t is kept. This
frees up the taskq_ent_t's tqent_list field when it is being serviced
(i.e. now when a taskq_ent_t is being serviced, it's tqent_list field
will be empty).

Signed-off-by: Prakash Surya <surya1@llnl.gov>
Signed-off-by: Brian Behlendorf <behlendorf1@llnl.gov>
Issue #65
This commit is contained in:
Prakash Surya 2011-12-05 17:32:48 -08:00 committed by Brian Behlendorf
parent 046a70c93b
commit 2c02b71b14
2 changed files with 95 additions and 57 deletions

View File

@ -45,6 +45,14 @@
typedef unsigned long taskqid_t; typedef unsigned long taskqid_t;
typedef void (task_func_t)(void *); typedef void (task_func_t)(void *);
typedef struct taskq_ent {
spinlock_t tqent_lock;
struct list_head tqent_list;
taskqid_t tqent_id;
task_func_t *tqent_func;
void *tqent_arg;
} taskq_ent_t;
/* /*
* Flags for taskq_dispatch. TQ_SLEEP/TQ_NOSLEEP should be same as * Flags for taskq_dispatch. TQ_SLEEP/TQ_NOSLEEP should be same as
* KM_SLEEP/KM_NOSLEEP. TQ_NOQUEUE/TQ_NOALLOC are set particularly * KM_SLEEP/KM_NOSLEEP. TQ_NOQUEUE/TQ_NOALLOC are set particularly
@ -61,8 +69,9 @@ typedef void (task_func_t)(void *);
typedef struct taskq { typedef struct taskq {
spinlock_t tq_lock; /* protects taskq_t */ spinlock_t tq_lock; /* protects taskq_t */
unsigned long tq_lock_flags; /* interrupt state */ unsigned long tq_lock_flags; /* interrupt state */
struct task_struct **tq_threads; /* thread pointers */
const char *tq_name; /* taskq name */ const char *tq_name; /* taskq name */
struct list_head tq_thread_list;/* list of all threads */
struct list_head tq_active_list;/* list of active threads */
int tq_nactive; /* # of active threads */ int tq_nactive; /* # of active threads */
int tq_nthreads; /* # of total threads */ int tq_nthreads; /* # of total threads */
int tq_pri; /* priority */ int tq_pri; /* priority */
@ -73,13 +82,20 @@ typedef struct taskq {
taskqid_t tq_next_id; /* next pend/work id */ taskqid_t tq_next_id; /* next pend/work id */
taskqid_t tq_lowest_id; /* lowest pend/work id */ taskqid_t tq_lowest_id; /* lowest pend/work id */
struct list_head tq_free_list; /* free task_t's */ struct list_head tq_free_list; /* free task_t's */
struct list_head tq_work_list; /* work task_t's */
struct list_head tq_pend_list; /* pending task_t's */ struct list_head tq_pend_list; /* pending task_t's */
struct list_head tq_prio_list; /* priority pending task_t's */ struct list_head tq_prio_list; /* priority pending task_t's */
wait_queue_head_t tq_work_waitq; /* new work waitq */ wait_queue_head_t tq_work_waitq; /* new work waitq */
wait_queue_head_t tq_wait_waitq; /* wait waitq */ wait_queue_head_t tq_wait_waitq; /* wait waitq */
} taskq_t; } taskq_t;
typedef struct taskq_thread {
struct list_head tqt_thread_list;
struct list_head tqt_active_list;
struct task_struct *tqt_thread;
taskq_t *tqt_tq;
taskq_ent_t *tqt_ent;
} taskq_thread_t;
/* Global system-wide dynamic task queue available for all consumers */ /* Global system-wide dynamic task queue available for all consumers */
extern taskq_t *system_taskq; extern taskq_t *system_taskq;

View File

@ -38,14 +38,6 @@
taskq_t *system_taskq; taskq_t *system_taskq;
EXPORT_SYMBOL(system_taskq); EXPORT_SYMBOL(system_taskq);
typedef struct taskq_ent {
spinlock_t tqent_lock;
struct list_head tqent_list;
taskqid_t tqent_id;
task_func_t *tqent_func;
void *tqent_arg;
} taskq_ent_t;
/* /*
* NOTE: Must be called with tq->tq_lock held, returns a list_t which * NOTE: Must be called with tq->tq_lock held, returns a list_t which
* is not attached to the free, work, or pending taskq lists. * is not attached to the free, work, or pending taskq lists.
@ -228,15 +220,18 @@ EXPORT_SYMBOL(__taskq_wait);
int int
__taskq_member(taskq_t *tq, void *t) __taskq_member(taskq_t *tq, void *t)
{ {
int i; struct list_head *l;
taskq_thread_t *tqt;
SENTRY; SENTRY;
ASSERT(tq); ASSERT(tq);
ASSERT(t); ASSERT(t);
for (i = 0; i < tq->tq_nthreads; i++) list_for_each(l, &tq->tq_thread_list) {
if (tq->tq_threads[i] == (struct task_struct *)t) tqt = list_entry(l, taskq_thread_t, tqt_thread_list);
SRETURN(1); if (tqt->tqt_thread == (struct task_struct *)t)
SRETURN(1);
}
SRETURN(0); SRETURN(0);
} }
@ -305,6 +300,7 @@ taskq_lowest_id(taskq_t *tq)
{ {
taskqid_t lowest_id = tq->tq_next_id; taskqid_t lowest_id = tq->tq_next_id;
taskq_ent_t *t; taskq_ent_t *t;
taskq_thread_t *tqt;
SENTRY; SENTRY;
ASSERT(tq); ASSERT(tq);
@ -320,9 +316,11 @@ taskq_lowest_id(taskq_t *tq)
lowest_id = MIN(lowest_id, t->tqent_id); lowest_id = MIN(lowest_id, t->tqent_id);
} }
if (!list_empty(&tq->tq_work_list)) { if (!list_empty(&tq->tq_active_list)) {
t = list_entry(tq->tq_work_list.next, taskq_ent_t, tqent_list); tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
lowest_id = MIN(lowest_id, t->tqent_id); tqt_active_list);
ASSERT(tqt->tqt_ent != NULL);
lowest_id = MIN(lowest_id, tqt->tqt_ent->tqent_id);
} }
SRETURN(lowest_id); SRETURN(lowest_id);
@ -333,25 +331,25 @@ taskq_lowest_id(taskq_t *tq)
* taskqid. * taskqid.
*/ */
static void static void
taskq_insert_in_order(taskq_t *tq, taskq_ent_t *t) taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
{ {
taskq_ent_t *w; taskq_thread_t *w;
struct list_head *l; struct list_head *l;
SENTRY; SENTRY;
ASSERT(tq); ASSERT(tq);
ASSERT(t); ASSERT(tqt);
ASSERT(spin_is_locked(&tq->tq_lock)); ASSERT(spin_is_locked(&tq->tq_lock));
list_for_each_prev(l, &tq->tq_work_list) { list_for_each_prev(l, &tq->tq_active_list) {
w = list_entry(l, taskq_ent_t, tqent_list); w = list_entry(l, taskq_thread_t, tqt_active_list);
if (w->tqent_id < t->tqent_id) { if (w->tqt_ent->tqent_id < tqt->tqt_ent->tqent_id) {
list_add(&t->tqent_list, l); list_add(&tqt->tqt_active_list, l);
break; break;
} }
} }
if (l == &tq->tq_work_list) if (l == &tq->tq_active_list)
list_add(&t->tqent_list, &tq->tq_work_list); list_add(&tqt->tqt_active_list, &tq->tq_active_list);
SEXIT; SEXIT;
} }
@ -362,12 +360,14 @@ taskq_thread(void *args)
DECLARE_WAITQUEUE(wait, current); DECLARE_WAITQUEUE(wait, current);
sigset_t blocked; sigset_t blocked;
taskqid_t id; taskqid_t id;
taskq_t *tq = args; taskq_thread_t *tqt = args;
taskq_t *tq;
taskq_ent_t *t; taskq_ent_t *t;
struct list_head *pend_list; struct list_head *pend_list;
SENTRY; SENTRY;
ASSERT(tq); ASSERT(tqt);
tq = tqt->tqt_tq;
current->flags |= PF_NOFREEZE; current->flags |= PF_NOFREEZE;
/* Disable the direct memory reclaim path */ /* Disable the direct memory reclaim path */
@ -407,7 +407,8 @@ taskq_thread(void *args)
if (pend_list) { if (pend_list) {
t = list_entry(pend_list->next, taskq_ent_t, tqent_list); t = list_entry(pend_list->next, taskq_ent_t, tqent_list);
list_del_init(&t->tqent_list); list_del_init(&t->tqent_list);
taskq_insert_in_order(tq, t); tqt->tqt_ent = t;
taskq_insert_in_order(tq, tqt);
tq->tq_nactive++; tq->tq_nactive++;
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
@ -416,6 +417,8 @@ taskq_thread(void *args)
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
tq->tq_nactive--; tq->tq_nactive--;
list_del_init(&tqt->tqt_active_list);
tqt->tqt_ent = NULL;
id = t->tqent_id; id = t->tqent_id;
task_done(tq, t); task_done(tq, t);
@ -435,6 +438,9 @@ taskq_thread(void *args)
__set_current_state(TASK_RUNNING); __set_current_state(TASK_RUNNING);
tq->tq_nthreads--; tq->tq_nthreads--;
list_del_init(&tqt->tqt_thread_list);
kmem_free(tqt, sizeof(taskq_thread_t));
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
SRETURN(0); SRETURN(0);
@ -445,7 +451,7 @@ __taskq_create(const char *name, int nthreads, pri_t pri,
int minalloc, int maxalloc, uint_t flags) int minalloc, int maxalloc, uint_t flags)
{ {
taskq_t *tq; taskq_t *tq;
struct task_struct *t; taskq_thread_t *tqt;
int rc = 0, i, j = 0; int rc = 0, i, j = 0;
SENTRY; SENTRY;
@ -468,14 +474,10 @@ __taskq_create(const char *name, int nthreads, pri_t pri,
if (tq == NULL) if (tq == NULL)
SRETURN(NULL); SRETURN(NULL);
tq->tq_threads = kmem_alloc(nthreads * sizeof(t), KM_SLEEP);
if (tq->tq_threads == NULL) {
kmem_free(tq, sizeof(*tq));
SRETURN(NULL);
}
spin_lock_init(&tq->tq_lock); spin_lock_init(&tq->tq_lock);
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
INIT_LIST_HEAD(&tq->tq_thread_list);
INIT_LIST_HEAD(&tq->tq_active_list);
tq->tq_name = name; tq->tq_name = name;
tq->tq_nactive = 0; tq->tq_nactive = 0;
tq->tq_nthreads = 0; tq->tq_nthreads = 0;
@ -487,7 +489,6 @@ __taskq_create(const char *name, int nthreads, pri_t pri,
tq->tq_next_id = 1; tq->tq_next_id = 1;
tq->tq_lowest_id = 1; tq->tq_lowest_id = 1;
INIT_LIST_HEAD(&tq->tq_free_list); INIT_LIST_HEAD(&tq->tq_free_list);
INIT_LIST_HEAD(&tq->tq_work_list);
INIT_LIST_HEAD(&tq->tq_pend_list); INIT_LIST_HEAD(&tq->tq_pend_list);
INIT_LIST_HEAD(&tq->tq_prio_list); INIT_LIST_HEAD(&tq->tq_prio_list);
init_waitqueue_head(&tq->tq_work_waitq); init_waitqueue_head(&tq->tq_work_waitq);
@ -499,19 +500,26 @@ __taskq_create(const char *name, int nthreads, pri_t pri,
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
for (i = 0; i < nthreads; i++) { for (i = 0; i < nthreads; i++) {
t = kthread_create(taskq_thread, tq, "%s/%d", name, i); tqt = kmem_alloc(sizeof(*tqt), KM_SLEEP);
if (t) { INIT_LIST_HEAD(&tqt->tqt_thread_list);
tq->tq_threads[i] = t; INIT_LIST_HEAD(&tqt->tqt_active_list);
kthread_bind(t, i % num_online_cpus()); tqt->tqt_tq = tq;
set_user_nice(t, PRIO_TO_NICE(pri)); tqt->tqt_ent = NULL;
wake_up_process(t);
tqt->tqt_thread = kthread_create(taskq_thread, tqt,
"%s/%d", name, i);
if (tqt->tqt_thread) {
list_add(&tqt->tqt_thread_list, &tq->tq_thread_list);
kthread_bind(tqt->tqt_thread, i % num_online_cpus());
set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(pri));
wake_up_process(tqt->tqt_thread);
j++; j++;
} else { } else {
tq->tq_threads[i] = NULL; kmem_free(tqt, sizeof(taskq_thread_t));
rc = 1; rc = 1;
} }
} }
/* Wait for all threads to be started before potential destroy */ /* Wait for all threads to be started before potential destroy */
wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j); wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j);
@ -528,8 +536,9 @@ EXPORT_SYMBOL(__taskq_create);
void void
__taskq_destroy(taskq_t *tq) __taskq_destroy(taskq_t *tq)
{ {
struct task_struct *thread;
taskq_thread_t *tqt;
taskq_ent_t *t; taskq_ent_t *t;
int i, nthreads;
SENTRY; SENTRY;
ASSERT(tq); ASSERT(tq);
@ -540,13 +549,25 @@ __taskq_destroy(taskq_t *tq)
/* TQ_ACTIVE cleared prevents new tasks being added to pending */ /* TQ_ACTIVE cleared prevents new tasks being added to pending */
__taskq_wait(tq); __taskq_wait(tq);
nthreads = tq->tq_nthreads;
for (i = 0; i < nthreads; i++)
if (tq->tq_threads[i])
kthread_stop(tq->tq_threads[i]);
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
/*
* Signal each thread to exit and block until it does. Each thread
* is responsible for removing itself from the list and freeing its
* taskq_thread_t. This allows for idle threads to opt to remove
* themselves from the taskq. They can be recreated as needed.
*/
while (!list_empty(&tq->tq_thread_list)) {
tqt = list_entry(tq->tq_thread_list.next,
taskq_thread_t, tqt_thread_list);
thread = tqt->tqt_thread;
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
kthread_stop(thread);
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
}
while (!list_empty(&tq->tq_free_list)) { while (!list_empty(&tq->tq_free_list)) {
t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list); t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
list_del_init(&t->tqent_list); list_del_init(&t->tqent_list);
@ -555,13 +576,14 @@ __taskq_destroy(taskq_t *tq)
ASSERT(tq->tq_nthreads == 0); ASSERT(tq->tq_nthreads == 0);
ASSERT(tq->tq_nalloc == 0); ASSERT(tq->tq_nalloc == 0);
ASSERT(list_empty(&tq->tq_thread_list));
ASSERT(list_empty(&tq->tq_active_list));
ASSERT(list_empty(&tq->tq_free_list)); ASSERT(list_empty(&tq->tq_free_list));
ASSERT(list_empty(&tq->tq_work_list));
ASSERT(list_empty(&tq->tq_pend_list)); ASSERT(list_empty(&tq->tq_pend_list));
ASSERT(list_empty(&tq->tq_prio_list)); ASSERT(list_empty(&tq->tq_prio_list));
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
kmem_free(tq->tq_threads, nthreads * sizeof(taskq_ent_t *));
kmem_free(tq, sizeof(taskq_t)); kmem_free(tq, sizeof(taskq_t));
SEXIT; SEXIT;