Create a new thread during recursive taskq dispatch if necessary

When dynamic taskq is enabled and all threads for a taskq are occupied,
a recursive dispatch can cause a deadlock if calling thread depends on
the recursively-dispatched thread for its return condition.

This patch attempts to create a new thread for recursive dispatch when
none are available.

Signed-off-by: Tim Chase <tim@chase2k.com>
Signed-off-by: Brian Behlendorf <behlendorf1@llnl.gov>
Closes #472
This commit is contained in:
Tim Chase 2015-08-27 11:13:20 -05:00 committed by Brian Behlendorf
parent ae89cf0f34
commit 076821eaff

View File

@ -53,6 +53,7 @@ EXPORT_SYMBOL(system_taskq);
/* Private dedicated taskq for creating new taskq threads on demand. */ /* Private dedicated taskq for creating new taskq threads on demand. */
static taskq_t *dynamic_taskq; static taskq_t *dynamic_taskq;
static taskq_thread_t *taskq_thread_create(taskq_t *); static taskq_thread_t *taskq_thread_create(taskq_t *);
static int taskq_thread_spawn(taskq_t *tq, int seq_tasks);
static int static int
task_km_flags(uint_t flags) task_km_flags(uint_t flags)
@ -533,6 +534,7 @@ taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
{ {
taskq_ent_t *t; taskq_ent_t *t;
taskqid_t rc = 0; taskqid_t rc = 0;
boolean_t threadlimit = B_FALSE;
ASSERT(tq); ASSERT(tq);
ASSERT(func); ASSERT(func);
@ -574,7 +576,13 @@ taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
wake_up(&tq->tq_work_waitq); wake_up(&tq->tq_work_waitq);
out: out:
threadlimit = (tq->tq_nactive == tq->tq_nthreads);
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
/* Spawn additional taskq threads if required. */
if (threadlimit && taskq_member(tq, current))
(void) taskq_thread_spawn(tq, spl_taskq_thread_sequential + 1);
return (rc); return (rc);
} }
EXPORT_SYMBOL(taskq_dispatch); EXPORT_SYMBOL(taskq_dispatch);
@ -585,6 +593,7 @@ taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
{ {
taskqid_t rc = 0; taskqid_t rc = 0;
taskq_ent_t *t; taskq_ent_t *t;
boolean_t threadlimit = B_FALSE;
ASSERT(tq); ASSERT(tq);
ASSERT(func); ASSERT(func);
@ -617,7 +626,13 @@ taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
spin_unlock(&t->tqent_lock); spin_unlock(&t->tqent_lock);
out: out:
threadlimit = (tq->tq_nactive == tq->tq_nthreads);
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
/* Spawn additional taskq threads if required. */
if (threadlimit && taskq_member(tq, current))
(void) taskq_thread_spawn(tq, spl_taskq_thread_sequential + 1);
return (rc); return (rc);
} }
EXPORT_SYMBOL(taskq_dispatch_delay); EXPORT_SYMBOL(taskq_dispatch_delay);
@ -626,6 +641,8 @@ void
taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
taskq_ent_t *t) taskq_ent_t *t)
{ {
boolean_t threadlimit = B_FALSE;
ASSERT(tq); ASSERT(tq);
ASSERT(func); ASSERT(func);
@ -661,7 +678,12 @@ taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
wake_up(&tq->tq_work_waitq); wake_up(&tq->tq_work_waitq);
out: out:
threadlimit = (tq->tq_nactive == tq->tq_nthreads);
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
/* Spawn additional taskq threads if required. */
if (threadlimit && taskq_member(tq, current))
(void) taskq_thread_spawn(tq, spl_taskq_thread_sequential + 1);
} }
EXPORT_SYMBOL(taskq_dispatch_ent); EXPORT_SYMBOL(taskq_dispatch_ent);