mirror of
https://git.proxmox.com/git/mirror_zfs.git
synced 2025-01-27 02:14:28 +03:00
taskq delay/cancel functionality
Add the ability to dispatch a delayed task to a taskq. The desired behavior is for the task to be queued but not executed by a worker thread until the expiration time is reached. To achieve this two new functions were added. * taskq_dispatch_delay() - This function behaves exactly like taskq_dispatch() however it takes a third 'expire_time' argument. The caller should pass the desired time the task should be executed as an absolute value in jiffies. The task is guarenteed not to run before this time, it may run slightly latter if all the worker threads are busy. * taskq_cancel_id() - Given a task id attempt to cancel the task before it gets executed. This is primarily useful for canceling delay tasks but can be used for canceling any previously dispatched task. There are three possible return values. 0 - The task was found and canceled before it was executed. ENOENT - The task was not found, either it was already run or an invalid task id was supplied by the caller. EBUSY - The task is currently executing any may not be canceled. This function will block until the task has been completed. * taskq_wait_all() - The taskq_wait_id() function was renamed taskq_wait_all() to more clearly reflect its actual behavior. It is only curreny used by the splat taskq regression tests. * taskq_wait_id() - Historically, the only difference between this function and taskq_wait() was that you passed the task id. In both functions you would block until ALL lower task ids which executed. This was semantically correct but could be very slow particularly if there were delay tasks submitted. To better accomidate the delay tasks this function was reimplemnted. It will now only block until the passed task id has been completed. This is actually a fairly low risk change for a few reasons. * Only new ZFS callers will make use of the new interfaces and very little common code was changed to support the new functions. * The existing taskq_wait() implementation was not changed just slightly refactored. * The newly optimized taskq_wait_id() implementation was never used by ZFS we can't accidentally introduce a new bug there. NOTE: This functionality does not exist in the Illumos taskqs. Signed-off-by: Brian Behlendorf <behlendorf1@llnl.gov>
This commit is contained in:
parent
aed8671cb0
commit
d9acd930b5
@ -41,20 +41,6 @@
|
||||
#define TASKQ_THREADS_CPU_PCT 0x00000008
|
||||
#define TASKQ_DC_BATCH 0x00000010
|
||||
|
||||
typedef unsigned long taskqid_t;
|
||||
typedef void (task_func_t)(void *);
|
||||
|
||||
typedef struct taskq_ent {
|
||||
spinlock_t tqent_lock;
|
||||
struct list_head tqent_list;
|
||||
taskqid_t tqent_id;
|
||||
task_func_t *tqent_func;
|
||||
void *tqent_arg;
|
||||
uintptr_t tqent_flags;
|
||||
} taskq_ent_t;
|
||||
|
||||
#define TQENT_FLAG_PREALLOC 0x1
|
||||
|
||||
/*
|
||||
* Flags for taskq_dispatch. TQ_SLEEP/TQ_NOSLEEP should be same as
|
||||
* KM_SLEEP/KM_NOSLEEP. TQ_NOQUEUE/TQ_NOALLOC are set particularly
|
||||
@ -69,6 +55,9 @@ typedef struct taskq_ent {
|
||||
#define TQ_FRONT 0x08000000
|
||||
#define TQ_ACTIVE 0x80000000
|
||||
|
||||
typedef unsigned long taskqid_t;
|
||||
typedef void (task_func_t)(void *);
|
||||
|
||||
typedef struct taskq {
|
||||
spinlock_t tq_lock; /* protects taskq_t */
|
||||
unsigned long tq_lock_flags; /* interrupt state */
|
||||
@ -87,16 +76,33 @@ typedef struct taskq {
|
||||
struct list_head tq_free_list; /* free task_t's */
|
||||
struct list_head tq_pend_list; /* pending task_t's */
|
||||
struct list_head tq_prio_list; /* priority pending task_t's */
|
||||
struct list_head tq_delay_list; /* delayed task_t's */
|
||||
wait_queue_head_t tq_work_waitq; /* new work waitq */
|
||||
wait_queue_head_t tq_wait_waitq; /* wait waitq */
|
||||
} taskq_t;
|
||||
|
||||
typedef struct taskq_ent {
|
||||
spinlock_t tqent_lock;
|
||||
wait_queue_head_t tqent_waitq;
|
||||
struct timer_list tqent_timer;
|
||||
struct list_head tqent_list;
|
||||
taskqid_t tqent_id;
|
||||
task_func_t *tqent_func;
|
||||
void *tqent_arg;
|
||||
taskq_t *tqent_taskq;
|
||||
uintptr_t tqent_flags;
|
||||
} taskq_ent_t;
|
||||
|
||||
#define TQENT_FLAG_PREALLOC 0x1
|
||||
#define TQENT_FLAG_CANCEL 0x2
|
||||
|
||||
typedef struct taskq_thread {
|
||||
struct list_head tqt_thread_list;
|
||||
struct list_head tqt_active_list;
|
||||
struct task_struct *tqt_thread;
|
||||
taskq_t *tqt_tq;
|
||||
taskqid_t tqt_id;
|
||||
taskq_ent_t *tqt_task;
|
||||
uintptr_t tqt_flags;
|
||||
} taskq_thread_t;
|
||||
|
||||
@ -104,6 +110,8 @@ typedef struct taskq_thread {
|
||||
extern taskq_t *system_taskq;
|
||||
|
||||
extern taskqid_t taskq_dispatch(taskq_t *, task_func_t, void *, uint_t);
|
||||
extern taskqid_t taskq_dispatch_delay(taskq_t *, task_func_t, void *,
|
||||
uint_t, clock_t);
|
||||
extern void taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t,
|
||||
taskq_ent_t *);
|
||||
extern int taskq_empty_ent(taskq_ent_t *);
|
||||
@ -111,7 +119,9 @@ extern void taskq_init_ent(taskq_ent_t *);
|
||||
extern taskq_t *taskq_create(const char *, int, pri_t, int, int, uint_t);
|
||||
extern void taskq_destroy(taskq_t *);
|
||||
extern void taskq_wait_id(taskq_t *, taskqid_t);
|
||||
extern void taskq_wait_all(taskq_t *, taskqid_t);
|
||||
extern void taskq_wait(taskq_t *);
|
||||
extern int taskq_cancel_id(taskq_t *, taskqid_t);
|
||||
extern int taskq_member(taskq_t *, void *);
|
||||
|
||||
#define taskq_create_proc(name, nthreads, pri, min, max, proc, flags) \
|
||||
|
@ -69,6 +69,8 @@ retry:
|
||||
t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
|
||||
|
||||
ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
|
||||
ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL));
|
||||
ASSERT(!timer_pending(&t->tqent_timer));
|
||||
|
||||
list_del_init(&t->tqent_list);
|
||||
SRETURN(t);
|
||||
@ -126,6 +128,7 @@ task_free(taskq_t *tq, taskq_ent_t *t)
|
||||
ASSERT(t);
|
||||
ASSERT(spin_is_locked(&tq->tq_lock));
|
||||
ASSERT(list_empty(&t->tqent_list));
|
||||
ASSERT(!timer_pending(&t->tqent_timer));
|
||||
|
||||
kmem_free(t, sizeof(taskq_ent_t));
|
||||
tq->tq_nalloc--;
|
||||
@ -145,6 +148,9 @@ task_done(taskq_t *tq, taskq_ent_t *t)
|
||||
ASSERT(t);
|
||||
ASSERT(spin_is_locked(&tq->tq_lock));
|
||||
|
||||
/* Wake tasks blocked in taskq_wait_id() */
|
||||
wake_up_all(&t->tqent_waitq);
|
||||
|
||||
list_del_init(&t->tqent_list);
|
||||
|
||||
if (tq->tq_nalloc <= tq->tq_minalloc) {
|
||||
@ -162,213 +168,49 @@ task_done(taskq_t *tq, taskq_ent_t *t)
|
||||
}
|
||||
|
||||
/*
|
||||
* As tasks are submitted to the task queue they are assigned a
|
||||
* monotonically increasing taskqid and added to the tail of the pending
|
||||
* list. As worker threads become available the tasks are removed from
|
||||
* the head of the pending or priority list, giving preference to the
|
||||
* priority list. The tasks are then removed from their respective
|
||||
* list, and the taskq_thread servicing the task is added to the active
|
||||
* list, preserving the order using the serviced task's taskqid.
|
||||
* Finally, as tasks complete the taskq_thread servicing the task is
|
||||
* removed from the active list. This means that the pending task and
|
||||
* active taskq_thread lists are always kept sorted by taskqid. Thus the
|
||||
* lowest outstanding incomplete taskqid can be determined simply by
|
||||
* checking the min taskqid for each head item on the pending, priority,
|
||||
* and active taskq_thread list. This value is stored in
|
||||
* tq->tq_lowest_id and only updated to the new lowest id when the
|
||||
* previous lowest id completes. All taskqids lower than
|
||||
* tq->tq_lowest_id must have completed. It is also possible larger
|
||||
* taskqid's have completed because they may be processed in parallel by
|
||||
* several worker threads. However, this is not a problem because the
|
||||
* behavior of taskq_wait_id() is to block until all previously
|
||||
* submitted taskqid's have completed.
|
||||
*
|
||||
* XXX: Taskqid_t wrapping is not handled. However, taskqid_t's are
|
||||
* 64-bit values so even if a taskq is processing 2^24 (16,777,216)
|
||||
* taskqid_ts per second it will still take 2^40 seconds, 34,865 years,
|
||||
* before the wrap occurs. I can live with that for now.
|
||||
* When a delayed task timer expires remove it from the delay list and
|
||||
* add it to the priority list in order for immediate processing.
|
||||
*/
|
||||
static int
|
||||
taskq_wait_check(taskq_t *tq, taskqid_t id)
|
||||
{
|
||||
int rc;
|
||||
|
||||
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
|
||||
rc = (id < tq->tq_lowest_id);
|
||||
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
||||
|
||||
SRETURN(rc);
|
||||
}
|
||||
|
||||
void
|
||||
taskq_wait_id(taskq_t *tq, taskqid_t id)
|
||||
{
|
||||
SENTRY;
|
||||
ASSERT(tq);
|
||||
|
||||
wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id));
|
||||
|
||||
SEXIT;
|
||||
}
|
||||
EXPORT_SYMBOL(taskq_wait_id);
|
||||
|
||||
void
|
||||
taskq_wait(taskq_t *tq)
|
||||
{
|
||||
taskqid_t id;
|
||||
SENTRY;
|
||||
ASSERT(tq);
|
||||
|
||||
/* Wait for the largest outstanding taskqid */
|
||||
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
|
||||
id = tq->tq_next_id - 1;
|
||||
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
||||
|
||||
taskq_wait_id(tq, id);
|
||||
|
||||
SEXIT;
|
||||
|
||||
}
|
||||
EXPORT_SYMBOL(taskq_wait);
|
||||
|
||||
int
|
||||
taskq_member(taskq_t *tq, void *t)
|
||||
static void
|
||||
task_expire(unsigned long data)
|
||||
{
|
||||
taskq_ent_t *w, *t = (taskq_ent_t *)data;
|
||||
taskq_t *tq = t->tqent_taskq;
|
||||
struct list_head *l;
|
||||
taskq_thread_t *tqt;
|
||||
SENTRY;
|
||||
|
||||
ASSERT(tq);
|
||||
ASSERT(t);
|
||||
|
||||
list_for_each(l, &tq->tq_thread_list) {
|
||||
tqt = list_entry(l, taskq_thread_t, tqt_thread_list);
|
||||
if (tqt->tqt_thread == (struct task_struct *)t)
|
||||
SRETURN(1);
|
||||
}
|
||||
|
||||
SRETURN(0);
|
||||
}
|
||||
EXPORT_SYMBOL(taskq_member);
|
||||
|
||||
taskqid_t
|
||||
taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
|
||||
{
|
||||
taskq_ent_t *t;
|
||||
taskqid_t rc = 0;
|
||||
SENTRY;
|
||||
|
||||
ASSERT(tq);
|
||||
ASSERT(func);
|
||||
|
||||
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
|
||||
|
||||
/* Taskq being destroyed and all tasks drained */
|
||||
if (!(tq->tq_flags & TQ_ACTIVE))
|
||||
SGOTO(out, rc = 0);
|
||||
|
||||
/* Do not queue the task unless there is idle thread for it */
|
||||
ASSERT(tq->tq_nactive <= tq->tq_nthreads);
|
||||
if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads))
|
||||
SGOTO(out, rc = 0);
|
||||
|
||||
if ((t = task_alloc(tq, flags)) == NULL)
|
||||
SGOTO(out, rc = 0);
|
||||
|
||||
spin_lock(&t->tqent_lock);
|
||||
|
||||
/* Queue to the priority list instead of the pending list */
|
||||
if (flags & TQ_FRONT)
|
||||
list_add_tail(&t->tqent_list, &tq->tq_prio_list);
|
||||
else
|
||||
list_add_tail(&t->tqent_list, &tq->tq_pend_list);
|
||||
|
||||
t->tqent_id = rc = tq->tq_next_id;
|
||||
tq->tq_next_id++;
|
||||
t->tqent_func = func;
|
||||
t->tqent_arg = arg;
|
||||
|
||||
ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
|
||||
|
||||
spin_unlock(&t->tqent_lock);
|
||||
|
||||
wake_up(&tq->tq_work_waitq);
|
||||
out:
|
||||
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
||||
SRETURN(rc);
|
||||
}
|
||||
EXPORT_SYMBOL(taskq_dispatch);
|
||||
|
||||
void
|
||||
taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
|
||||
taskq_ent_t *t)
|
||||
{
|
||||
SENTRY;
|
||||
|
||||
ASSERT(tq);
|
||||
ASSERT(func);
|
||||
ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
|
||||
|
||||
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
|
||||
|
||||
/* Taskq being destroyed and all tasks drained */
|
||||
if (!(tq->tq_flags & TQ_ACTIVE)) {
|
||||
t->tqent_id = 0;
|
||||
goto out;
|
||||
if (t->tqent_flags & TQENT_FLAG_CANCEL) {
|
||||
ASSERT(list_empty(&t->tqent_list));
|
||||
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
||||
return;
|
||||
}
|
||||
|
||||
spin_lock(&t->tqent_lock);
|
||||
|
||||
/*
|
||||
* Mark it as a prealloc'd task. This is important
|
||||
* to ensure that we don't free it later.
|
||||
* The priority list must be maintained in strict task id order
|
||||
* from lowest to highest for lowest_id to be easily calculable.
|
||||
*/
|
||||
t->tqent_flags |= TQENT_FLAG_PREALLOC;
|
||||
list_del(&t->tqent_list);
|
||||
list_for_each_prev(l, &tq->tq_prio_list) {
|
||||
w = list_entry(l, taskq_ent_t, tqent_list);
|
||||
if (w->tqent_id < t->tqent_id) {
|
||||
list_add(&t->tqent_list, l);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (l == &tq->tq_prio_list)
|
||||
list_add(&t->tqent_list, &tq->tq_prio_list);
|
||||
|
||||
/* Queue to the priority list instead of the pending list */
|
||||
if (flags & TQ_FRONT)
|
||||
list_add_tail(&t->tqent_list, &tq->tq_prio_list);
|
||||
else
|
||||
list_add_tail(&t->tqent_list, &tq->tq_pend_list);
|
||||
|
||||
t->tqent_id = tq->tq_next_id;
|
||||
tq->tq_next_id++;
|
||||
t->tqent_func = func;
|
||||
t->tqent_arg = arg;
|
||||
|
||||
spin_unlock(&t->tqent_lock);
|
||||
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
||||
|
||||
wake_up(&tq->tq_work_waitq);
|
||||
out:
|
||||
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
||||
SEXIT;
|
||||
}
|
||||
EXPORT_SYMBOL(taskq_dispatch_ent);
|
||||
|
||||
int
|
||||
taskq_empty_ent(taskq_ent_t *t)
|
||||
{
|
||||
return list_empty(&t->tqent_list);
|
||||
}
|
||||
EXPORT_SYMBOL(taskq_empty_ent);
|
||||
|
||||
void
|
||||
taskq_init_ent(taskq_ent_t *t)
|
||||
{
|
||||
spin_lock_init(&t->tqent_lock);
|
||||
INIT_LIST_HEAD(&t->tqent_list);
|
||||
t->tqent_id = 0;
|
||||
t->tqent_func = NULL;
|
||||
t->tqent_arg = NULL;
|
||||
t->tqent_flags = 0;
|
||||
}
|
||||
EXPORT_SYMBOL(taskq_init_ent);
|
||||
|
||||
/*
|
||||
* Returns the lowest incomplete taskqid_t. The taskqid_t may
|
||||
* be queued on the pending list, on the priority list, or on
|
||||
* the work list currently being handled, but it is not 100%
|
||||
* complete yet.
|
||||
* be queued on the pending list, on the priority list, on the
|
||||
* delay list, or on the work list currently being handled, but
|
||||
* it is not 100% complete yet.
|
||||
*/
|
||||
static taskqid_t
|
||||
taskq_lowest_id(taskq_t *tq)
|
||||
@ -391,6 +233,11 @@ taskq_lowest_id(taskq_t *tq)
|
||||
lowest_id = MIN(lowest_id, t->tqent_id);
|
||||
}
|
||||
|
||||
if (!list_empty(&tq->tq_delay_list)) {
|
||||
t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list);
|
||||
lowest_id = MIN(lowest_id, t->tqent_id);
|
||||
}
|
||||
|
||||
if (!list_empty(&tq->tq_active_list)) {
|
||||
tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
|
||||
tqt_active_list);
|
||||
@ -428,6 +275,415 @@ taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
|
||||
SEXIT;
|
||||
}
|
||||
|
||||
/*
|
||||
* Find and return a task from the given list if it exists. The list
|
||||
* must be in lowest to highest task id order.
|
||||
*/
|
||||
static taskq_ent_t *
|
||||
taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id)
|
||||
{
|
||||
struct list_head *l;
|
||||
taskq_ent_t *t;
|
||||
SENTRY;
|
||||
|
||||
ASSERT(spin_is_locked(&tq->tq_lock));
|
||||
|
||||
list_for_each(l, lh) {
|
||||
t = list_entry(l, taskq_ent_t, tqent_list);
|
||||
|
||||
if (t->tqent_id == id)
|
||||
SRETURN(t);
|
||||
|
||||
if (t->tqent_id > id)
|
||||
break;
|
||||
}
|
||||
|
||||
SRETURN(NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
* Find an already dispatched task given the task id regardless of what
|
||||
* state it is in. If a task is still pending or executing it will be
|
||||
* returned and 'active' set appropriately. If the task has already
|
||||
* been run then NULL is returned.
|
||||
*/
|
||||
static taskq_ent_t *
|
||||
taskq_find(taskq_t *tq, taskqid_t id, int *active)
|
||||
{
|
||||
taskq_thread_t *tqt;
|
||||
struct list_head *l;
|
||||
taskq_ent_t *t;
|
||||
SENTRY;
|
||||
|
||||
ASSERT(spin_is_locked(&tq->tq_lock));
|
||||
*active = 0;
|
||||
|
||||
t = taskq_find_list(tq, &tq->tq_delay_list, id);
|
||||
if (t)
|
||||
SRETURN(t);
|
||||
|
||||
t = taskq_find_list(tq, &tq->tq_prio_list, id);
|
||||
if (t)
|
||||
SRETURN(t);
|
||||
|
||||
t = taskq_find_list(tq, &tq->tq_pend_list, id);
|
||||
if (t)
|
||||
SRETURN(t);
|
||||
|
||||
list_for_each(l, &tq->tq_active_list) {
|
||||
tqt = list_entry(l, taskq_thread_t, tqt_active_list);
|
||||
if (tqt->tqt_id == id) {
|
||||
t = tqt->tqt_task;
|
||||
*active = 1;
|
||||
SRETURN(t);
|
||||
}
|
||||
}
|
||||
|
||||
SRETURN(NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
* The taskq_wait_id() function blocks until the passed task id completes.
|
||||
* This does not guarantee that all lower task id's have completed.
|
||||
*/
|
||||
void
|
||||
taskq_wait_id(taskq_t *tq, taskqid_t id)
|
||||
{
|
||||
DEFINE_WAIT(wait);
|
||||
taskq_ent_t *t;
|
||||
int active = 0;
|
||||
SENTRY;
|
||||
|
||||
ASSERT(tq);
|
||||
ASSERT(id > 0);
|
||||
|
||||
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
|
||||
t = taskq_find(tq, id, &active);
|
||||
if (t)
|
||||
prepare_to_wait(&t->tqent_waitq, &wait, TASK_UNINTERRUPTIBLE);
|
||||
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
||||
|
||||
/*
|
||||
* We rely on the kernels autoremove_wake_function() function to
|
||||
* remove us from the wait queue in the context of wake_up().
|
||||
* Once woken the taskq_ent_t pointer must never be accessed.
|
||||
*/
|
||||
if (t) {
|
||||
t = NULL;
|
||||
schedule();
|
||||
__set_current_state(TASK_RUNNING);
|
||||
}
|
||||
|
||||
SEXIT;
|
||||
}
|
||||
EXPORT_SYMBOL(taskq_wait_id);
|
||||
|
||||
/*
|
||||
* The taskq_wait() function will block until all previously submitted
|
||||
* tasks have been completed. A previously submitted task is defined as
|
||||
* a task with a lower task id than the current task queue id. Note that
|
||||
* all task id's are assigned monotonically at dispatch time.
|
||||
*
|
||||
* Waiting for all previous tasks to complete is accomplished by tracking
|
||||
* the lowest outstanding task id. As tasks are dispatched they are added
|
||||
* added to the tail of the pending, priority, or delay lists. And as
|
||||
* worker threads become available the tasks are removed from the heads
|
||||
* of these lists and linked to the worker threads. This ensures the
|
||||
* lists are kept in lowest to highest task id order.
|
||||
*
|
||||
* Therefore the lowest outstanding task id can be quickly determined by
|
||||
* checking the head item from all of these lists. This value is stored
|
||||
* with the task queue as the lowest id. It only needs to be recalculated
|
||||
* when either the task with the current lowest id completes or is canceled.
|
||||
*
|
||||
* By blocking until the lowest task id exceeds the current task id when
|
||||
* the function was called we ensure all previous tasks have completed.
|
||||
*
|
||||
* NOTE: When there are multiple worked threads it is possible for larger
|
||||
* task ids to complete before smaller ones. Conversely when the task
|
||||
* queue contains delay tasks with small task ids, you may block for a
|
||||
* considerable length of time waiting for them to expire and execute.
|
||||
*/
|
||||
static int
|
||||
taskq_wait_check(taskq_t *tq, taskqid_t id)
|
||||
{
|
||||
int rc;
|
||||
|
||||
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
|
||||
rc = (id < tq->tq_lowest_id);
|
||||
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
||||
|
||||
SRETURN(rc);
|
||||
}
|
||||
|
||||
void
|
||||
taskq_wait_all(taskq_t *tq, taskqid_t id)
|
||||
{
|
||||
wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id));
|
||||
}
|
||||
EXPORT_SYMBOL(taskq_wait_all);
|
||||
|
||||
void
|
||||
taskq_wait(taskq_t *tq)
|
||||
{
|
||||
taskqid_t id;
|
||||
SENTRY;
|
||||
ASSERT(tq);
|
||||
|
||||
/* Wait for the largest outstanding taskqid */
|
||||
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
|
||||
id = tq->tq_next_id - 1;
|
||||
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
||||
|
||||
taskq_wait_all(tq, id);
|
||||
|
||||
SEXIT;
|
||||
|
||||
}
|
||||
EXPORT_SYMBOL(taskq_wait);
|
||||
|
||||
int
|
||||
taskq_member(taskq_t *tq, void *t)
|
||||
{
|
||||
struct list_head *l;
|
||||
taskq_thread_t *tqt;
|
||||
SENTRY;
|
||||
|
||||
ASSERT(tq);
|
||||
ASSERT(t);
|
||||
|
||||
list_for_each(l, &tq->tq_thread_list) {
|
||||
tqt = list_entry(l, taskq_thread_t, tqt_thread_list);
|
||||
if (tqt->tqt_thread == (struct task_struct *)t)
|
||||
SRETURN(1);
|
||||
}
|
||||
|
||||
SRETURN(0);
|
||||
}
|
||||
EXPORT_SYMBOL(taskq_member);
|
||||
|
||||
/*
|
||||
* Cancel an already dispatched task given the task id. Still pending tasks
|
||||
* will be immediately canceled, and if the task is active the function will
|
||||
* block until it completes. Preallocated tasks which are canceled must be
|
||||
* freed by the caller.
|
||||
*/
|
||||
int
|
||||
taskq_cancel_id(taskq_t *tq, taskqid_t id)
|
||||
{
|
||||
taskq_ent_t *t;
|
||||
int active = 0;
|
||||
int rc = ENOENT;
|
||||
SENTRY;
|
||||
|
||||
ASSERT(tq);
|
||||
|
||||
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
|
||||
t = taskq_find(tq, id, &active);
|
||||
if (t && !active) {
|
||||
list_del_init(&t->tqent_list);
|
||||
t->tqent_flags |= TQENT_FLAG_CANCEL;
|
||||
|
||||
/*
|
||||
* When canceling the lowest outstanding task id we
|
||||
* must recalculate the new lowest outstanding id.
|
||||
*/
|
||||
if (tq->tq_lowest_id == t->tqent_id) {
|
||||
tq->tq_lowest_id = taskq_lowest_id(tq);
|
||||
ASSERT3S(tq->tq_lowest_id, >, t->tqent_id);
|
||||
}
|
||||
|
||||
/*
|
||||
* The task_expire() function takes the tq->tq_lock so drop
|
||||
* drop the lock before synchronously cancelling the timer.
|
||||
*/
|
||||
if (timer_pending(&t->tqent_timer)) {
|
||||
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
||||
del_timer_sync(&t->tqent_timer);
|
||||
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
|
||||
}
|
||||
|
||||
if (!(t->tqent_flags & TQENT_FLAG_PREALLOC))
|
||||
task_done(tq, t);
|
||||
|
||||
rc = 0;
|
||||
}
|
||||
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
||||
|
||||
if (active) {
|
||||
taskq_wait_id(tq, id);
|
||||
rc = EBUSY;
|
||||
}
|
||||
|
||||
SRETURN(rc);
|
||||
}
|
||||
EXPORT_SYMBOL(taskq_cancel_id);
|
||||
|
||||
taskqid_t
|
||||
taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
|
||||
{
|
||||
taskq_ent_t *t;
|
||||
taskqid_t rc = 0;
|
||||
SENTRY;
|
||||
|
||||
ASSERT(tq);
|
||||
ASSERT(func);
|
||||
|
||||
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
|
||||
|
||||
/* Taskq being destroyed and all tasks drained */
|
||||
if (!(tq->tq_flags & TQ_ACTIVE))
|
||||
SGOTO(out, rc = 0);
|
||||
|
||||
/* Do not queue the task unless there is idle thread for it */
|
||||
ASSERT(tq->tq_nactive <= tq->tq_nthreads);
|
||||
if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads))
|
||||
SGOTO(out, rc = 0);
|
||||
|
||||
if ((t = task_alloc(tq, flags)) == NULL)
|
||||
SGOTO(out, rc = 0);
|
||||
|
||||
spin_lock(&t->tqent_lock);
|
||||
|
||||
/* Queue to the priority list instead of the pending list */
|
||||
if (flags & TQ_FRONT)
|
||||
list_add_tail(&t->tqent_list, &tq->tq_prio_list);
|
||||
else
|
||||
list_add_tail(&t->tqent_list, &tq->tq_pend_list);
|
||||
|
||||
t->tqent_id = rc = tq->tq_next_id;
|
||||
tq->tq_next_id++;
|
||||
t->tqent_func = func;
|
||||
t->tqent_arg = arg;
|
||||
t->tqent_taskq = tq;
|
||||
t->tqent_timer.data = 0;
|
||||
t->tqent_timer.function = NULL;
|
||||
t->tqent_timer.expires = 0;
|
||||
|
||||
ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
|
||||
|
||||
spin_unlock(&t->tqent_lock);
|
||||
|
||||
wake_up(&tq->tq_work_waitq);
|
||||
out:
|
||||
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
||||
SRETURN(rc);
|
||||
}
|
||||
EXPORT_SYMBOL(taskq_dispatch);
|
||||
|
||||
taskqid_t
|
||||
taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
|
||||
uint_t flags, clock_t expire_time)
|
||||
{
|
||||
taskq_ent_t *t;
|
||||
taskqid_t rc = 0;
|
||||
SENTRY;
|
||||
|
||||
ASSERT(tq);
|
||||
ASSERT(func);
|
||||
|
||||
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
|
||||
|
||||
/* Taskq being destroyed and all tasks drained */
|
||||
if (!(tq->tq_flags & TQ_ACTIVE))
|
||||
SGOTO(out, rc = 0);
|
||||
|
||||
if ((t = task_alloc(tq, flags)) == NULL)
|
||||
SGOTO(out, rc = 0);
|
||||
|
||||
spin_lock(&t->tqent_lock);
|
||||
|
||||
/* Queue to the delay list for subsequent execution */
|
||||
list_add_tail(&t->tqent_list, &tq->tq_delay_list);
|
||||
|
||||
t->tqent_id = rc = tq->tq_next_id;
|
||||
tq->tq_next_id++;
|
||||
t->tqent_func = func;
|
||||
t->tqent_arg = arg;
|
||||
t->tqent_taskq = tq;
|
||||
t->tqent_timer.data = (unsigned long)t;
|
||||
t->tqent_timer.function = task_expire;
|
||||
t->tqent_timer.expires = (unsigned long)expire_time;
|
||||
add_timer(&t->tqent_timer);
|
||||
|
||||
ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
|
||||
|
||||
spin_unlock(&t->tqent_lock);
|
||||
out:
|
||||
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
||||
SRETURN(rc);
|
||||
}
|
||||
EXPORT_SYMBOL(taskq_dispatch_delay);
|
||||
|
||||
void
|
||||
taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
|
||||
taskq_ent_t *t)
|
||||
{
|
||||
SENTRY;
|
||||
|
||||
ASSERT(tq);
|
||||
ASSERT(func);
|
||||
ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
|
||||
|
||||
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
|
||||
|
||||
/* Taskq being destroyed and all tasks drained */
|
||||
if (!(tq->tq_flags & TQ_ACTIVE)) {
|
||||
t->tqent_id = 0;
|
||||
goto out;
|
||||
}
|
||||
|
||||
spin_lock(&t->tqent_lock);
|
||||
|
||||
/*
|
||||
* Mark it as a prealloc'd task. This is important
|
||||
* to ensure that we don't free it later.
|
||||
*/
|
||||
t->tqent_flags |= TQENT_FLAG_PREALLOC;
|
||||
|
||||
/* Queue to the priority list instead of the pending list */
|
||||
if (flags & TQ_FRONT)
|
||||
list_add_tail(&t->tqent_list, &tq->tq_prio_list);
|
||||
else
|
||||
list_add_tail(&t->tqent_list, &tq->tq_pend_list);
|
||||
|
||||
t->tqent_id = tq->tq_next_id;
|
||||
tq->tq_next_id++;
|
||||
t->tqent_func = func;
|
||||
t->tqent_arg = arg;
|
||||
t->tqent_taskq = tq;
|
||||
|
||||
spin_unlock(&t->tqent_lock);
|
||||
|
||||
wake_up(&tq->tq_work_waitq);
|
||||
out:
|
||||
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
||||
SEXIT;
|
||||
}
|
||||
EXPORT_SYMBOL(taskq_dispatch_ent);
|
||||
|
||||
int
|
||||
taskq_empty_ent(taskq_ent_t *t)
|
||||
{
|
||||
return list_empty(&t->tqent_list);
|
||||
}
|
||||
EXPORT_SYMBOL(taskq_empty_ent);
|
||||
|
||||
void
|
||||
taskq_init_ent(taskq_ent_t *t)
|
||||
{
|
||||
spin_lock_init(&t->tqent_lock);
|
||||
init_waitqueue_head(&t->tqent_waitq);
|
||||
init_timer(&t->tqent_timer);
|
||||
INIT_LIST_HEAD(&t->tqent_list);
|
||||
t->tqent_id = 0;
|
||||
t->tqent_func = NULL;
|
||||
t->tqent_arg = NULL;
|
||||
t->tqent_flags = 0;
|
||||
t->tqent_taskq = NULL;
|
||||
}
|
||||
EXPORT_SYMBOL(taskq_init_ent);
|
||||
|
||||
static int
|
||||
taskq_thread(void *args)
|
||||
{
|
||||
@ -481,6 +737,7 @@ taskq_thread(void *args)
|
||||
* preallocated taskq_ent_t, tqent_id must be
|
||||
* stored prior to executing tqent_func. */
|
||||
tqt->tqt_id = t->tqent_id;
|
||||
tqt->tqt_task = t;
|
||||
|
||||
/* We must store a copy of the flags prior to
|
||||
* servicing the task (servicing a prealloc'd task
|
||||
@ -499,6 +756,7 @@ taskq_thread(void *args)
|
||||
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
|
||||
tq->tq_nactive--;
|
||||
list_del_init(&tqt->tqt_active_list);
|
||||
tqt->tqt_task = NULL;
|
||||
|
||||
/* For prealloc'd tasks, we don't free anything. */
|
||||
if ((tq->tq_flags & TASKQ_DYNAMIC) ||
|
||||
@ -576,6 +834,7 @@ taskq_create(const char *name, int nthreads, pri_t pri,
|
||||
INIT_LIST_HEAD(&tq->tq_free_list);
|
||||
INIT_LIST_HEAD(&tq->tq_pend_list);
|
||||
INIT_LIST_HEAD(&tq->tq_prio_list);
|
||||
INIT_LIST_HEAD(&tq->tq_delay_list);
|
||||
init_waitqueue_head(&tq->tq_work_waitq);
|
||||
init_waitqueue_head(&tq->tq_wait_waitq);
|
||||
|
||||
@ -669,6 +928,7 @@ taskq_destroy(taskq_t *tq)
|
||||
ASSERT(list_empty(&tq->tq_free_list));
|
||||
ASSERT(list_empty(&tq->tq_pend_list));
|
||||
ASSERT(list_empty(&tq->tq_prio_list));
|
||||
ASSERT(list_empty(&tq->tq_delay_list));
|
||||
|
||||
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
||||
|
||||
|
@ -548,10 +548,10 @@ splat_taskq_test4(struct file *file, void *arg)
|
||||
* next pending task as soon as it completes its current task. This
|
||||
* means that tasks do not strictly complete in order in which they
|
||||
* were dispatched (increasing task id). This is fine but we need to
|
||||
* verify that taskq_wait_id() blocks until the passed task id and all
|
||||
* verify that taskq_wait_all() blocks until the passed task id and all
|
||||
* lower task ids complete. We do this by dispatching the following
|
||||
* specific sequence of tasks each of which block for N time units.
|
||||
* We then use taskq_wait_id() to unblock at specific task id and
|
||||
* We then use taskq_wait_all() to unblock at specific task id and
|
||||
* verify the only the expected task ids have completed and in the
|
||||
* correct order. The two cases of interest are:
|
||||
*
|
||||
@ -562,17 +562,17 @@ splat_taskq_test4(struct file *file, void *arg)
|
||||
*
|
||||
* The following table shows each task id and how they will be
|
||||
* scheduled. Each rows represent one time unit and each column
|
||||
* one of the three worker threads. The places taskq_wait_id()
|
||||
* one of the three worker threads. The places taskq_wait_all()
|
||||
* must unblock for a specific id are identified as well as the
|
||||
* task ids which must have completed and their order.
|
||||
*
|
||||
* +-----+ <--- taskq_wait_id(tq, 8) unblocks
|
||||
* +-----+ <--- taskq_wait_all(tq, 8) unblocks
|
||||
* | | Required Completion Order: 1,2,4,5,3,8,6,7
|
||||
* +-----+ |
|
||||
* | | |
|
||||
* | | +-----+
|
||||
* | | | 8 |
|
||||
* | | +-----+ <--- taskq_wait_id(tq, 3) unblocks
|
||||
* | | +-----+ <--- taskq_wait_all(tq, 3) unblocks
|
||||
* | | 7 | | Required Completion Order: 1,2,4,5,3
|
||||
* | +-----+ |
|
||||
* | 6 | | |
|
||||
@ -712,13 +712,13 @@ splat_taskq_test5_impl(struct file *file, void *arg, boolean_t prealloc)
|
||||
|
||||
splat_vprint(file, SPLAT_TASKQ_TEST5_NAME, "Taskq '%s' "
|
||||
"waiting for taskqid %d completion\n", tq_arg.name, 3);
|
||||
taskq_wait_id(tq, 3);
|
||||
taskq_wait_all(tq, 3);
|
||||
if ((rc = splat_taskq_test_order(&tq_arg, order1)))
|
||||
goto out;
|
||||
|
||||
splat_vprint(file, SPLAT_TASKQ_TEST5_NAME, "Taskq '%s' "
|
||||
"waiting for taskqid %d completion\n", tq_arg.name, 8);
|
||||
taskq_wait_id(tq, 8);
|
||||
taskq_wait_all(tq, 8);
|
||||
rc = splat_taskq_test_order(&tq_arg, order2);
|
||||
|
||||
out:
|
||||
@ -874,7 +874,7 @@ splat_taskq_test6_impl(struct file *file, void *arg, boolean_t prealloc)
|
||||
splat_vprint(file, SPLAT_TASKQ_TEST6_NAME, "Taskq '%s' "
|
||||
"waiting for taskqid %d completion\n", tq_arg.name,
|
||||
SPLAT_TASKQ_ORDER_MAX);
|
||||
taskq_wait_id(tq, SPLAT_TASKQ_ORDER_MAX);
|
||||
taskq_wait_all(tq, SPLAT_TASKQ_ORDER_MAX);
|
||||
rc = splat_taskq_test_order(&tq_arg, order);
|
||||
|
||||
out:
|
||||
@ -975,7 +975,7 @@ splat_taskq_test7_impl(struct file *file, void *arg, boolean_t prealloc)
|
||||
if (tq_arg.flag == 0) {
|
||||
splat_vprint(file, SPLAT_TASKQ_TEST7_NAME,
|
||||
"Taskq '%s' waiting\n", tq_arg.name);
|
||||
taskq_wait_id(tq, SPLAT_TASKQ_DEPTH_MAX);
|
||||
taskq_wait_all(tq, SPLAT_TASKQ_DEPTH_MAX);
|
||||
}
|
||||
|
||||
splat_vprint(file, SPLAT_TASKQ_TEST7_NAME,
|
||||
|
Loading…
Reference in New Issue
Block a user