From 472a34caff3bc8b0f65e7cdb4b5960b0e2d616c2 Mon Sep 17 00:00:00 2001 From: Brian Behlendorf Date: Thu, 6 Dec 2012 12:57:42 -0800 Subject: [PATCH 01/11] taskq style, convert spaces to soft tabs Update the taskq implementation to conform with the style used throughout the rest of the code. There are no functional changes in this commit. Signed-off-by: Brian Behlendorf --- include/sys/taskq.h | 120 +++++++++-------- module/spl/spl-taskq.c | 299 ++++++++++++++++++++--------------------- 2 files changed, 210 insertions(+), 209 deletions(-) diff --git a/include/sys/taskq.h b/include/sys/taskq.h index 6b09bdf1b..8260cf935 100644 --- a/include/sys/taskq.h +++ b/include/sys/taskq.h @@ -33,71 +33,71 @@ #include #include -#define TASKQ_NAMELEN 31 +#define TASKQ_NAMELEN 31 -#define TASKQ_PREPOPULATE 0x00000001 -#define TASKQ_CPR_SAFE 0x00000002 -#define TASKQ_DYNAMIC 0x00000004 -#define TASKQ_THREADS_CPU_PCT 0x00000008 -#define TASKQ_DC_BATCH 0x00000010 +#define TASKQ_PREPOPULATE 0x00000001 +#define TASKQ_CPR_SAFE 0x00000002 +#define TASKQ_DYNAMIC 0x00000004 +#define TASKQ_THREADS_CPU_PCT 0x00000008 +#define TASKQ_DC_BATCH 0x00000010 typedef unsigned long taskqid_t; typedef void (task_func_t)(void *); typedef struct taskq_ent { - spinlock_t tqent_lock; - struct list_head tqent_list; - taskqid_t tqent_id; - task_func_t *tqent_func; - void *tqent_arg; - uintptr_t tqent_flags; + spinlock_t tqent_lock; + struct list_head tqent_list; + taskqid_t tqent_id; + task_func_t *tqent_func; + void *tqent_arg; + uintptr_t tqent_flags; } taskq_ent_t; -#define TQENT_FLAG_PREALLOC 0x1 +#define TQENT_FLAG_PREALLOC 0x1 /* * Flags for taskq_dispatch. TQ_SLEEP/TQ_NOSLEEP should be same as * KM_SLEEP/KM_NOSLEEP. TQ_NOQUEUE/TQ_NOALLOC are set particularly * large so as not to conflict with already used GFP_* defines. */ -#define TQ_SLEEP 0x00000000 -#define TQ_NOSLEEP 0x00000001 -#define TQ_PUSHPAGE 0x00000002 -#define TQ_NOQUEUE 0x01000000 -#define TQ_NOALLOC 0x02000000 -#define TQ_NEW 0x04000000 -#define TQ_FRONT 0x08000000 -#define TQ_ACTIVE 0x80000000 +#define TQ_SLEEP 0x00000000 +#define TQ_NOSLEEP 0x00000001 +#define TQ_PUSHPAGE 0x00000002 +#define TQ_NOQUEUE 0x01000000 +#define TQ_NOALLOC 0x02000000 +#define TQ_NEW 0x04000000 +#define TQ_FRONT 0x08000000 +#define TQ_ACTIVE 0x80000000 typedef struct taskq { - spinlock_t tq_lock; /* protects taskq_t */ - unsigned long tq_lock_flags; /* interrupt state */ - const char *tq_name; /* taskq name */ - struct list_head tq_thread_list;/* list of all threads */ - struct list_head tq_active_list;/* list of active threads */ - int tq_nactive; /* # of active threads */ - int tq_nthreads; /* # of total threads */ - int tq_pri; /* priority */ - int tq_minalloc; /* min task_t pool size */ - int tq_maxalloc; /* max task_t pool size */ - int tq_nalloc; /* cur task_t pool size */ - uint_t tq_flags; /* flags */ - taskqid_t tq_next_id; /* next pend/work id */ - taskqid_t tq_lowest_id; /* lowest pend/work id */ - struct list_head tq_free_list; /* free task_t's */ - struct list_head tq_pend_list; /* pending task_t's */ - struct list_head tq_prio_list; /* priority pending task_t's */ - wait_queue_head_t tq_work_waitq; /* new work waitq */ - wait_queue_head_t tq_wait_waitq; /* wait waitq */ + spinlock_t tq_lock; /* protects taskq_t */ + unsigned long tq_lock_flags; /* interrupt state */ + const char *tq_name; /* taskq name */ + struct list_head tq_thread_list;/* list of all threads */ + struct list_head tq_active_list;/* list of active threads */ + int tq_nactive; /* # of active threads */ + int tq_nthreads; /* # of total threads */ + int tq_pri; /* priority */ + int tq_minalloc; /* min task_t pool size */ + int tq_maxalloc; /* max task_t pool size */ + int tq_nalloc; /* cur task_t pool size */ + uint_t tq_flags; /* flags */ + taskqid_t tq_next_id; /* next pend/work id */ + taskqid_t tq_lowest_id; /* lowest pend/work id */ + struct list_head tq_free_list; /* free task_t's */ + struct list_head tq_pend_list; /* pending task_t's */ + struct list_head tq_prio_list; /* priority pending task_t's */ + wait_queue_head_t tq_work_waitq; /* new work waitq */ + wait_queue_head_t tq_wait_waitq; /* wait waitq */ } taskq_t; typedef struct taskq_thread { - struct list_head tqt_thread_list; - struct list_head tqt_active_list; - struct task_struct *tqt_thread; - taskq_t *tqt_tq; - taskqid_t tqt_id; - uintptr_t tqt_flags; + struct list_head tqt_thread_list; + struct list_head tqt_active_list; + struct task_struct *tqt_thread; + taskq_t *tqt_tq; + taskqid_t tqt_id; + uintptr_t tqt_flags; } taskq_thread_t; /* Global system-wide dynamic task queue available for all consumers */ @@ -116,18 +116,20 @@ extern int __taskq_member(taskq_t *, void *); int spl_taskq_init(void); void spl_taskq_fini(void); -#define taskq_member(tq, t) __taskq_member(tq, t) -#define taskq_wait_id(tq, id) __taskq_wait_id(tq, id) -#define taskq_wait(tq) __taskq_wait(tq) -#define taskq_dispatch(tq, f, p, fl) __taskq_dispatch(tq, f, p, fl) -#define taskq_dispatch_ent(tq, f, p, fl, t) __taskq_dispatch_ent(tq, f, p, fl, t) -#define taskq_empty_ent(t) __taskq_empty_ent(t) -#define taskq_init_ent(t) __taskq_init_ent(t) -#define taskq_create(n, th, p, mi, ma, fl) __taskq_create(n, th, p, mi, ma, fl) -#define taskq_create_proc(n, th, p, mi, ma, pr, fl) \ - __taskq_create(n, th, p, mi, ma, fl) -#define taskq_create_sysdc(n, th, mi, ma, pr, dc, fl) \ - __taskq_create(n, th, maxclsyspri, mi, ma, fl) -#define taskq_destroy(tq) __taskq_destroy(tq) +#define taskq_member(tq, t) __taskq_member(tq, t) +#define taskq_wait_id(tq, id) __taskq_wait_id(tq, id) +#define taskq_wait(tq) __taskq_wait(tq) +#define taskq_dispatch(tq, f, p, fl) __taskq_dispatch(tq, f, p, fl) +#define taskq_dispatch_ent(tq, f, p, fl, t) \ + __taskq_dispatch_ent(tq, f, p, fl, t) +#define taskq_empty_ent(t) __taskq_empty_ent(t) +#define taskq_init_ent(t) __taskq_init_ent(t) +#define taskq_create(n, th, p, mi, ma, fl) \ + __taskq_create(n, th, p, mi, ma, fl) +#define taskq_create_proc(n, th, p, mi, ma, pr, fl) \ + __taskq_create(n, th, p, mi, ma, fl) +#define taskq_create_sysdc(n, th, mi, ma, pr, dc, fl) \ + __taskq_create(n, th, maxclsyspri, mi, ma, fl) +#define taskq_destroy(tq) __taskq_destroy(tq) #endif /* _SPL_TASKQ_H */ diff --git a/module/spl/spl-taskq.c b/module/spl/spl-taskq.c index 7ea20461b..99bd361ad 100644 --- a/module/spl/spl-taskq.c +++ b/module/spl/spl-taskq.c @@ -57,60 +57,60 @@ task_km_flags(uint_t flags) static taskq_ent_t * task_alloc(taskq_t *tq, uint_t flags) { - taskq_ent_t *t; - int count = 0; - SENTRY; + taskq_ent_t *t; + int count = 0; + SENTRY; - ASSERT(tq); - ASSERT(spin_is_locked(&tq->tq_lock)); + ASSERT(tq); + ASSERT(spin_is_locked(&tq->tq_lock)); retry: - /* Acquire taskq_ent_t's from free list if available */ - if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) { - t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list); + /* Acquire taskq_ent_t's from free list if available */ + if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) { + t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list); - ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); + ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); - list_del_init(&t->tqent_list); - SRETURN(t); - } + list_del_init(&t->tqent_list); + SRETURN(t); + } - /* Free list is empty and memory allocations are prohibited */ - if (flags & TQ_NOALLOC) - SRETURN(NULL); + /* Free list is empty and memory allocations are prohibited */ + if (flags & TQ_NOALLOC) + SRETURN(NULL); - /* Hit maximum taskq_ent_t pool size */ - if (tq->tq_nalloc >= tq->tq_maxalloc) { - if (flags & TQ_NOSLEEP) - SRETURN(NULL); + /* Hit maximum taskq_ent_t pool size */ + if (tq->tq_nalloc >= tq->tq_maxalloc) { + if (flags & TQ_NOSLEEP) + SRETURN(NULL); - /* - * Sleep periodically polling the free list for an available - * taskq_ent_t. Dispatching with TQ_SLEEP should always succeed - * but we cannot block forever waiting for an taskq_entq_t to - * show up in the free list, otherwise a deadlock can happen. - * - * Therefore, we need to allocate a new task even if the number - * of allocated tasks is above tq->tq_maxalloc, but we still - * end up delaying the task allocation by one second, thereby - * throttling the task dispatch rate. - */ - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - schedule_timeout(HZ / 100); - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - if (count < 100) - SGOTO(retry, count++); - } + /* + * Sleep periodically polling the free list for an available + * taskq_ent_t. Dispatching with TQ_SLEEP should always succeed + * but we cannot block forever waiting for an taskq_ent_t to + * show up in the free list, otherwise a deadlock can happen. + * + * Therefore, we need to allocate a new task even if the number + * of allocated tasks is above tq->tq_maxalloc, but we still + * end up delaying the task allocation by one second, thereby + * throttling the task dispatch rate. + */ + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + schedule_timeout(HZ / 100); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + if (count < 100) + SGOTO(retry, count++); + } - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - t = kmem_alloc(sizeof(taskq_ent_t), task_km_flags(flags)); - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + t = kmem_alloc(sizeof(taskq_ent_t), task_km_flags(flags)); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - if (t) { - taskq_init_ent(t); - tq->tq_nalloc++; - } + if (t) { + taskq_init_ent(t); + tq->tq_nalloc++; + } - SRETURN(t); + SRETURN(t); } /* @@ -120,15 +120,15 @@ retry: static void task_free(taskq_t *tq, taskq_ent_t *t) { - SENTRY; + SENTRY; - ASSERT(tq); - ASSERT(t); + ASSERT(tq); + ASSERT(t); ASSERT(spin_is_locked(&tq->tq_lock)); ASSERT(list_empty(&t->tqent_list)); - kmem_free(t, sizeof(taskq_ent_t)); - tq->tq_nalloc--; + kmem_free(t, sizeof(taskq_ent_t)); + tq->tq_nalloc--; SEXIT; } @@ -147,18 +147,18 @@ task_done(taskq_t *tq, taskq_ent_t *t) list_del_init(&t->tqent_list); - if (tq->tq_nalloc <= tq->tq_minalloc) { + if (tq->tq_nalloc <= tq->tq_minalloc) { t->tqent_id = 0; t->tqent_func = NULL; t->tqent_arg = NULL; t->tqent_flags = 0; - list_add_tail(&t->tqent_list, &tq->tq_free_list); + list_add_tail(&t->tqent_list, &tq->tq_free_list); } else { task_free(tq, t); } - SEXIT; + SEXIT; } /* @@ -236,10 +236,10 @@ __taskq_member(taskq_t *tq, void *t) { struct list_head *l; taskq_thread_t *tqt; - SENTRY; + SENTRY; ASSERT(tq); - ASSERT(t); + ASSERT(t); list_for_each(l, &tq->tq_thread_list) { tqt = list_entry(l, taskq_thread_t, tqt_thread_list); @@ -247,21 +247,21 @@ __taskq_member(taskq_t *tq, void *t) SRETURN(1); } - SRETURN(0); + SRETURN(0); } EXPORT_SYMBOL(__taskq_member); taskqid_t __taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) { - taskq_ent_t *t; + taskq_ent_t *t; taskqid_t rc = 0; - SENTRY; + SENTRY; - ASSERT(tq); - ASSERT(func); + ASSERT(tq); + ASSERT(func); - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); /* Taskq being destroyed and all tasks drained */ if (!(tq->tq_flags & TQ_ACTIVE)) @@ -272,7 +272,7 @@ __taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) SGOTO(out, rc = 0); - if ((t = task_alloc(tq, flags)) == NULL) + if ((t = task_alloc(tq, flags)) == NULL) SGOTO(out, rc = 0); spin_lock(&t->tqent_lock); @@ -285,8 +285,8 @@ __taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) t->tqent_id = rc = tq->tq_next_id; tq->tq_next_id++; - t->tqent_func = func; - t->tqent_arg = arg; + t->tqent_func = func; + t->tqent_arg = arg; ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); @@ -374,7 +374,7 @@ static taskqid_t taskq_lowest_id(taskq_t *tq) { taskqid_t lowest_id = tq->tq_next_id; - taskq_ent_t *t; + taskq_ent_t *t; taskq_thread_t *tqt; SENTRY; @@ -393,7 +393,7 @@ taskq_lowest_id(taskq_t *tq) if (!list_empty(&tq->tq_active_list)) { tqt = list_entry(tq->tq_active_list.next, taskq_thread_t, - tqt_active_list); + tqt_active_list); ASSERT(tqt->tqt_id != 0); lowest_id = MIN(lowest_id, tqt->tqt_id); } @@ -402,8 +402,7 @@ taskq_lowest_id(taskq_t *tq) } /* - * Insert a task into a list keeping the list sorted by increasing - * taskqid. + * Insert a task into a list keeping the list sorted by increasing taskqid. */ static void taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt) @@ -432,28 +431,28 @@ taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt) static int taskq_thread(void *args) { - DECLARE_WAITQUEUE(wait, current); - sigset_t blocked; + DECLARE_WAITQUEUE(wait, current); + sigset_t blocked; taskq_thread_t *tqt = args; - taskq_t *tq; - taskq_ent_t *t; + taskq_t *tq; + taskq_ent_t *t; struct list_head *pend_list; SENTRY; - ASSERT(tqt); + ASSERT(tqt); tq = tqt->tqt_tq; - current->flags |= PF_NOFREEZE; + current->flags |= PF_NOFREEZE; - sigfillset(&blocked); - sigprocmask(SIG_BLOCK, &blocked, NULL); - flush_signals(current); + sigfillset(&blocked); + sigprocmask(SIG_BLOCK, &blocked, NULL); + flush_signals(current); - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - tq->tq_nthreads++; - wake_up(&tq->tq_wait_waitq); - set_current_state(TASK_INTERRUPTIBLE); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + tq->tq_nthreads++; + wake_up(&tq->tq_wait_waitq); + set_current_state(TASK_INTERRUPTIBLE); - while (!kthread_should_stop()) { + while (!kthread_should_stop()) { if (list_empty(&tq->tq_pend_list) && list_empty(&tq->tq_prio_list)) { @@ -475,8 +474,8 @@ taskq_thread(void *args) pend_list = NULL; if (pend_list) { - t = list_entry(pend_list->next, taskq_ent_t, tqent_list); - list_del_init(&t->tqent_list); + t = list_entry(pend_list->next,taskq_ent_t,tqent_list); + list_del_init(&t->tqent_list); /* In order to support recursively dispatching a * preallocated taskq_ent_t, tqent_id must be @@ -491,14 +490,14 @@ taskq_thread(void *args) tqt->tqt_flags = t->tqent_flags; taskq_insert_in_order(tq, tqt); - tq->tq_nactive++; + tq->tq_nactive++; spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); /* Perform the requested task */ - t->tqent_func(t->tqent_arg); + t->tqent_func(t->tqent_arg); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - tq->tq_nactive--; + tq->tq_nactive--; list_del_init(&tqt->tqt_active_list); /* For prealloc'd tasks, we don't free anything. */ @@ -515,37 +514,37 @@ taskq_thread(void *args) tqt->tqt_id = 0; tqt->tqt_flags = 0; - wake_up_all(&tq->tq_wait_waitq); + wake_up_all(&tq->tq_wait_waitq); } set_current_state(TASK_INTERRUPTIBLE); - } + } __set_current_state(TASK_RUNNING); - tq->tq_nthreads--; + tq->tq_nthreads--; list_del_init(&tqt->tqt_thread_list); kmem_free(tqt, sizeof(taskq_thread_t)); - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); SRETURN(0); } taskq_t * __taskq_create(const char *name, int nthreads, pri_t pri, - int minalloc, int maxalloc, uint_t flags) + int minalloc, int maxalloc, uint_t flags) { - taskq_t *tq; + taskq_t *tq; taskq_thread_t *tqt; - int rc = 0, i, j = 0; - SENTRY; + int rc = 0, i, j = 0; + SENTRY; - ASSERT(name != NULL); - ASSERT(pri <= maxclsyspri); - ASSERT(minalloc >= 0); - ASSERT(maxalloc <= INT_MAX); - ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */ + ASSERT(name != NULL); + ASSERT(pri <= maxclsyspri); + ASSERT(minalloc >= 0); + ASSERT(maxalloc <= INT_MAX); + ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */ /* Scale the number of threads using nthreads as a percentage */ if (flags & TASKQ_THREADS_CPU_PCT) { @@ -556,35 +555,35 @@ __taskq_create(const char *name, int nthreads, pri_t pri, nthreads = MAX((num_online_cpus() * nthreads) / 100, 1); } - tq = kmem_alloc(sizeof(*tq), KM_PUSHPAGE); - if (tq == NULL) - SRETURN(NULL); + tq = kmem_alloc(sizeof(*tq), KM_PUSHPAGE); + if (tq == NULL) + SRETURN(NULL); - spin_lock_init(&tq->tq_lock); - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - INIT_LIST_HEAD(&tq->tq_thread_list); - INIT_LIST_HEAD(&tq->tq_active_list); - tq->tq_name = name; - tq->tq_nactive = 0; + spin_lock_init(&tq->tq_lock); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + INIT_LIST_HEAD(&tq->tq_thread_list); + INIT_LIST_HEAD(&tq->tq_active_list); + tq->tq_name = name; + tq->tq_nactive = 0; tq->tq_nthreads = 0; - tq->tq_pri = pri; - tq->tq_minalloc = minalloc; - tq->tq_maxalloc = maxalloc; + tq->tq_pri = pri; + tq->tq_minalloc = minalloc; + tq->tq_maxalloc = maxalloc; tq->tq_nalloc = 0; - tq->tq_flags = (flags | TQ_ACTIVE); + tq->tq_flags = (flags | TQ_ACTIVE); tq->tq_next_id = 1; tq->tq_lowest_id = 1; - INIT_LIST_HEAD(&tq->tq_free_list); - INIT_LIST_HEAD(&tq->tq_pend_list); - INIT_LIST_HEAD(&tq->tq_prio_list); - init_waitqueue_head(&tq->tq_work_waitq); - init_waitqueue_head(&tq->tq_wait_waitq); + INIT_LIST_HEAD(&tq->tq_free_list); + INIT_LIST_HEAD(&tq->tq_pend_list); + INIT_LIST_HEAD(&tq->tq_prio_list); + init_waitqueue_head(&tq->tq_work_waitq); + init_waitqueue_head(&tq->tq_wait_waitq); - if (flags & TASKQ_PREPOPULATE) - for (i = 0; i < minalloc; i++) - task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW)); + if (flags & TASKQ_PREPOPULATE) + for (i = 0; i < minalloc; i++) + task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW)); - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); for (i = 0; i < nthreads; i++) { tqt = kmem_alloc(sizeof(*tqt), KM_PUSHPAGE); @@ -594,7 +593,7 @@ __taskq_create(const char *name, int nthreads, pri_t pri, tqt->tqt_id = 0; tqt->tqt_thread = kthread_create(taskq_thread, tqt, - "%s/%d", name, i); + "%s/%d", name, i); if (tqt->tqt_thread) { list_add(&tqt->tqt_thread_list, &tq->tq_thread_list); kthread_bind(tqt->tqt_thread, i % num_online_cpus()); @@ -607,15 +606,15 @@ __taskq_create(const char *name, int nthreads, pri_t pri, } } - /* Wait for all threads to be started before potential destroy */ + /* Wait for all threads to be started before potential destroy */ wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j); - if (rc) { - __taskq_destroy(tq); - tq = NULL; - } + if (rc) { + __taskq_destroy(tq); + tq = NULL; + } - SRETURN(tq); + SRETURN(tq); } EXPORT_SYMBOL(__taskq_create); @@ -629,13 +628,13 @@ __taskq_destroy(taskq_t *tq) ASSERT(tq); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - tq->tq_flags &= ~TQ_ACTIVE; + tq->tq_flags &= ~TQ_ACTIVE; spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); /* TQ_ACTIVE cleared prevents new tasks being added to pending */ - __taskq_wait(tq); + __taskq_wait(tq); - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); /* * Signal each thread to exit and block until it does. Each thread @@ -651,29 +650,29 @@ __taskq_destroy(taskq_t *tq) kthread_stop(thread); - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); } - while (!list_empty(&tq->tq_free_list)) { + while (!list_empty(&tq->tq_free_list)) { t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list); ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); - list_del_init(&t->tqent_list); - task_free(tq, t); - } + list_del_init(&t->tqent_list); + task_free(tq, t); + } - ASSERT(tq->tq_nthreads == 0); - ASSERT(tq->tq_nalloc == 0); - ASSERT(list_empty(&tq->tq_thread_list)); - ASSERT(list_empty(&tq->tq_active_list)); - ASSERT(list_empty(&tq->tq_free_list)); - ASSERT(list_empty(&tq->tq_pend_list)); - ASSERT(list_empty(&tq->tq_prio_list)); + ASSERT(tq->tq_nthreads == 0); + ASSERT(tq->tq_nalloc == 0); + ASSERT(list_empty(&tq->tq_thread_list)); + ASSERT(list_empty(&tq->tq_active_list)); + ASSERT(list_empty(&tq->tq_free_list)); + ASSERT(list_empty(&tq->tq_pend_list)); + ASSERT(list_empty(&tq->tq_prio_list)); - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - kmem_free(tq, sizeof(taskq_t)); + kmem_free(tq, sizeof(taskq_t)); SEXIT; } @@ -682,22 +681,22 @@ EXPORT_SYMBOL(__taskq_destroy); int spl_taskq_init(void) { - SENTRY; + SENTRY; /* Solaris creates a dynamic taskq of up to 64 threads, however in * a Linux environment 1 thread per-core is usually about right */ - system_taskq = taskq_create("spl_system_taskq", num_online_cpus(), + system_taskq = taskq_create("spl_system_taskq", num_online_cpus(), minclsyspri, 4, 512, TASKQ_PREPOPULATE); if (system_taskq == NULL) SRETURN(1); - SRETURN(0); + SRETURN(0); } void spl_taskq_fini(void) { - SENTRY; + SENTRY; taskq_destroy(system_taskq); - SEXIT; + SEXIT; } From aed8671cb0bfc18f6cd034ecad2e9cf49536d965 Mon Sep 17 00:00:00 2001 From: Brian Behlendorf Date: Thu, 6 Dec 2012 13:04:27 -0800 Subject: [PATCH 02/11] taskq style, remove #define wrappers When the taskq implementation was originally written I wrapped all the API functions in #define's. This was done as a preventative measure to ensure that a taskq symbol never conflicted with an existing kernel symbol. However, in practice the taskq symbols never conflicted. The only major conflicts occured with the kmem cache API. Since this added layer of obfuscation never bought us anything for the taskq's I'm removing it. Signed-off-by: Brian Behlendorf --- include/sys/taskq.h | 40 +++++++++++++++------------------------- module/spl/spl-taskq.c | 42 +++++++++++++++++++++--------------------- 2 files changed, 36 insertions(+), 46 deletions(-) diff --git a/include/sys/taskq.h b/include/sys/taskq.h index 8260cf935..84b563208 100644 --- a/include/sys/taskq.h +++ b/include/sys/taskq.h @@ -103,33 +103,23 @@ typedef struct taskq_thread { /* Global system-wide dynamic task queue available for all consumers */ extern taskq_t *system_taskq; -extern taskqid_t __taskq_dispatch(taskq_t *, task_func_t, void *, uint_t); -extern void __taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t, taskq_ent_t *); -extern int __taskq_empty_ent(taskq_ent_t *); -extern void __taskq_init_ent(taskq_ent_t *); -extern taskq_t *__taskq_create(const char *, int, pri_t, int, int, uint_t); -extern void __taskq_destroy(taskq_t *); -extern void __taskq_wait_id(taskq_t *, taskqid_t); -extern void __taskq_wait(taskq_t *); -extern int __taskq_member(taskq_t *, void *); +extern taskqid_t taskq_dispatch(taskq_t *, task_func_t, void *, uint_t); +extern void taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t, + taskq_ent_t *); +extern int taskq_empty_ent(taskq_ent_t *); +extern void taskq_init_ent(taskq_ent_t *); +extern taskq_t *taskq_create(const char *, int, pri_t, int, int, uint_t); +extern void taskq_destroy(taskq_t *); +extern void taskq_wait_id(taskq_t *, taskqid_t); +extern void taskq_wait(taskq_t *); +extern int taskq_member(taskq_t *, void *); + +#define taskq_create_proc(name, nthreads, pri, min, max, proc, flags) \ + taskq_create(name, nthreads, pri, min, max, flags) +#define taskq_create_sysdc(name, nthreads, min, max, proc, dc, flags) \ + taskq_create(name, nthreads, maxclsyspri, min, max, flags) int spl_taskq_init(void); void spl_taskq_fini(void); -#define taskq_member(tq, t) __taskq_member(tq, t) -#define taskq_wait_id(tq, id) __taskq_wait_id(tq, id) -#define taskq_wait(tq) __taskq_wait(tq) -#define taskq_dispatch(tq, f, p, fl) __taskq_dispatch(tq, f, p, fl) -#define taskq_dispatch_ent(tq, f, p, fl, t) \ - __taskq_dispatch_ent(tq, f, p, fl, t) -#define taskq_empty_ent(t) __taskq_empty_ent(t) -#define taskq_init_ent(t) __taskq_init_ent(t) -#define taskq_create(n, th, p, mi, ma, fl) \ - __taskq_create(n, th, p, mi, ma, fl) -#define taskq_create_proc(n, th, p, mi, ma, pr, fl) \ - __taskq_create(n, th, p, mi, ma, fl) -#define taskq_create_sysdc(n, th, mi, ma, pr, dc, fl) \ - __taskq_create(n, th, maxclsyspri, mi, ma, fl) -#define taskq_destroy(tq) __taskq_destroy(tq) - #endif /* _SPL_TASKQ_H */ diff --git a/module/spl/spl-taskq.c b/module/spl/spl-taskq.c index 99bd361ad..2007cf084 100644 --- a/module/spl/spl-taskq.c +++ b/module/spl/spl-taskq.c @@ -201,7 +201,7 @@ taskq_wait_check(taskq_t *tq, taskqid_t id) } void -__taskq_wait_id(taskq_t *tq, taskqid_t id) +taskq_wait_id(taskq_t *tq, taskqid_t id) { SENTRY; ASSERT(tq); @@ -210,10 +210,10 @@ __taskq_wait_id(taskq_t *tq, taskqid_t id) SEXIT; } -EXPORT_SYMBOL(__taskq_wait_id); +EXPORT_SYMBOL(taskq_wait_id); void -__taskq_wait(taskq_t *tq) +taskq_wait(taskq_t *tq) { taskqid_t id; SENTRY; @@ -224,15 +224,15 @@ __taskq_wait(taskq_t *tq) id = tq->tq_next_id - 1; spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - __taskq_wait_id(tq, id); + taskq_wait_id(tq, id); SEXIT; } -EXPORT_SYMBOL(__taskq_wait); +EXPORT_SYMBOL(taskq_wait); int -__taskq_member(taskq_t *tq, void *t) +taskq_member(taskq_t *tq, void *t) { struct list_head *l; taskq_thread_t *tqt; @@ -249,10 +249,10 @@ __taskq_member(taskq_t *tq, void *t) SRETURN(0); } -EXPORT_SYMBOL(__taskq_member); +EXPORT_SYMBOL(taskq_member); taskqid_t -__taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) +taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) { taskq_ent_t *t; taskqid_t rc = 0; @@ -297,10 +297,10 @@ out: spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); SRETURN(rc); } -EXPORT_SYMBOL(__taskq_dispatch); +EXPORT_SYMBOL(taskq_dispatch); void -__taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, +taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, taskq_ent_t *t) { SENTRY; @@ -343,17 +343,17 @@ out: spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); SEXIT; } -EXPORT_SYMBOL(__taskq_dispatch_ent); +EXPORT_SYMBOL(taskq_dispatch_ent); int -__taskq_empty_ent(taskq_ent_t *t) +taskq_empty_ent(taskq_ent_t *t) { return list_empty(&t->tqent_list); } -EXPORT_SYMBOL(__taskq_empty_ent); +EXPORT_SYMBOL(taskq_empty_ent); void -__taskq_init_ent(taskq_ent_t *t) +taskq_init_ent(taskq_ent_t *t) { spin_lock_init(&t->tqent_lock); INIT_LIST_HEAD(&t->tqent_list); @@ -362,7 +362,7 @@ __taskq_init_ent(taskq_ent_t *t) t->tqent_arg = NULL; t->tqent_flags = 0; } -EXPORT_SYMBOL(__taskq_init_ent); +EXPORT_SYMBOL(taskq_init_ent); /* * Returns the lowest incomplete taskqid_t. The taskqid_t may @@ -532,7 +532,7 @@ taskq_thread(void *args) } taskq_t * -__taskq_create(const char *name, int nthreads, pri_t pri, +taskq_create(const char *name, int nthreads, pri_t pri, int minalloc, int maxalloc, uint_t flags) { taskq_t *tq; @@ -610,16 +610,16 @@ __taskq_create(const char *name, int nthreads, pri_t pri, wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j); if (rc) { - __taskq_destroy(tq); + taskq_destroy(tq); tq = NULL; } SRETURN(tq); } -EXPORT_SYMBOL(__taskq_create); +EXPORT_SYMBOL(taskq_create); void -__taskq_destroy(taskq_t *tq) +taskq_destroy(taskq_t *tq) { struct task_struct *thread; taskq_thread_t *tqt; @@ -632,7 +632,7 @@ __taskq_destroy(taskq_t *tq) spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); /* TQ_ACTIVE cleared prevents new tasks being added to pending */ - __taskq_wait(tq); + taskq_wait(tq); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); @@ -676,7 +676,7 @@ __taskq_destroy(taskq_t *tq) SEXIT; } -EXPORT_SYMBOL(__taskq_destroy); +EXPORT_SYMBOL(taskq_destroy); int spl_taskq_init(void) From d9acd930b52503582425c6398fc8dbc1d7d4a01b Mon Sep 17 00:00:00 2001 From: Brian Behlendorf Date: Thu, 6 Dec 2012 12:38:19 -0800 Subject: [PATCH 03/11] taskq delay/cancel functionality Add the ability to dispatch a delayed task to a taskq. The desired behavior is for the task to be queued but not executed by a worker thread until the expiration time is reached. To achieve this two new functions were added. * taskq_dispatch_delay() - This function behaves exactly like taskq_dispatch() however it takes a third 'expire_time' argument. The caller should pass the desired time the task should be executed as an absolute value in jiffies. The task is guarenteed not to run before this time, it may run slightly latter if all the worker threads are busy. * taskq_cancel_id() - Given a task id attempt to cancel the task before it gets executed. This is primarily useful for canceling delay tasks but can be used for canceling any previously dispatched task. There are three possible return values. 0 - The task was found and canceled before it was executed. ENOENT - The task was not found, either it was already run or an invalid task id was supplied by the caller. EBUSY - The task is currently executing any may not be canceled. This function will block until the task has been completed. * taskq_wait_all() - The taskq_wait_id() function was renamed taskq_wait_all() to more clearly reflect its actual behavior. It is only curreny used by the splat taskq regression tests. * taskq_wait_id() - Historically, the only difference between this function and taskq_wait() was that you passed the task id. In both functions you would block until ALL lower task ids which executed. This was semantically correct but could be very slow particularly if there were delay tasks submitted. To better accomidate the delay tasks this function was reimplemnted. It will now only block until the passed task id has been completed. This is actually a fairly low risk change for a few reasons. * Only new ZFS callers will make use of the new interfaces and very little common code was changed to support the new functions. * The existing taskq_wait() implementation was not changed just slightly refactored. * The newly optimized taskq_wait_id() implementation was never used by ZFS we can't accidentally introduce a new bug there. NOTE: This functionality does not exist in the Illumos taskqs. Signed-off-by: Brian Behlendorf --- include/sys/taskq.h | 38 ++- module/spl/spl-taskq.c | 640 ++++++++++++++++++++++++++----------- module/splat/splat-taskq.c | 18 +- 3 files changed, 483 insertions(+), 213 deletions(-) diff --git a/include/sys/taskq.h b/include/sys/taskq.h index 84b563208..3839de288 100644 --- a/include/sys/taskq.h +++ b/include/sys/taskq.h @@ -41,20 +41,6 @@ #define TASKQ_THREADS_CPU_PCT 0x00000008 #define TASKQ_DC_BATCH 0x00000010 -typedef unsigned long taskqid_t; -typedef void (task_func_t)(void *); - -typedef struct taskq_ent { - spinlock_t tqent_lock; - struct list_head tqent_list; - taskqid_t tqent_id; - task_func_t *tqent_func; - void *tqent_arg; - uintptr_t tqent_flags; -} taskq_ent_t; - -#define TQENT_FLAG_PREALLOC 0x1 - /* * Flags for taskq_dispatch. TQ_SLEEP/TQ_NOSLEEP should be same as * KM_SLEEP/KM_NOSLEEP. TQ_NOQUEUE/TQ_NOALLOC are set particularly @@ -69,6 +55,9 @@ typedef struct taskq_ent { #define TQ_FRONT 0x08000000 #define TQ_ACTIVE 0x80000000 +typedef unsigned long taskqid_t; +typedef void (task_func_t)(void *); + typedef struct taskq { spinlock_t tq_lock; /* protects taskq_t */ unsigned long tq_lock_flags; /* interrupt state */ @@ -87,16 +76,33 @@ typedef struct taskq { struct list_head tq_free_list; /* free task_t's */ struct list_head tq_pend_list; /* pending task_t's */ struct list_head tq_prio_list; /* priority pending task_t's */ + struct list_head tq_delay_list; /* delayed task_t's */ wait_queue_head_t tq_work_waitq; /* new work waitq */ wait_queue_head_t tq_wait_waitq; /* wait waitq */ } taskq_t; +typedef struct taskq_ent { + spinlock_t tqent_lock; + wait_queue_head_t tqent_waitq; + struct timer_list tqent_timer; + struct list_head tqent_list; + taskqid_t tqent_id; + task_func_t *tqent_func; + void *tqent_arg; + taskq_t *tqent_taskq; + uintptr_t tqent_flags; +} taskq_ent_t; + +#define TQENT_FLAG_PREALLOC 0x1 +#define TQENT_FLAG_CANCEL 0x2 + typedef struct taskq_thread { struct list_head tqt_thread_list; struct list_head tqt_active_list; struct task_struct *tqt_thread; taskq_t *tqt_tq; taskqid_t tqt_id; + taskq_ent_t *tqt_task; uintptr_t tqt_flags; } taskq_thread_t; @@ -104,6 +110,8 @@ typedef struct taskq_thread { extern taskq_t *system_taskq; extern taskqid_t taskq_dispatch(taskq_t *, task_func_t, void *, uint_t); +extern taskqid_t taskq_dispatch_delay(taskq_t *, task_func_t, void *, + uint_t, clock_t); extern void taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t, taskq_ent_t *); extern int taskq_empty_ent(taskq_ent_t *); @@ -111,7 +119,9 @@ extern void taskq_init_ent(taskq_ent_t *); extern taskq_t *taskq_create(const char *, int, pri_t, int, int, uint_t); extern void taskq_destroy(taskq_t *); extern void taskq_wait_id(taskq_t *, taskqid_t); +extern void taskq_wait_all(taskq_t *, taskqid_t); extern void taskq_wait(taskq_t *); +extern int taskq_cancel_id(taskq_t *, taskqid_t); extern int taskq_member(taskq_t *, void *); #define taskq_create_proc(name, nthreads, pri, min, max, proc, flags) \ diff --git a/module/spl/spl-taskq.c b/module/spl/spl-taskq.c index 2007cf084..c9ae0a50b 100644 --- a/module/spl/spl-taskq.c +++ b/module/spl/spl-taskq.c @@ -69,6 +69,8 @@ retry: t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list); ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); + ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL)); + ASSERT(!timer_pending(&t->tqent_timer)); list_del_init(&t->tqent_list); SRETURN(t); @@ -126,6 +128,7 @@ task_free(taskq_t *tq, taskq_ent_t *t) ASSERT(t); ASSERT(spin_is_locked(&tq->tq_lock)); ASSERT(list_empty(&t->tqent_list)); + ASSERT(!timer_pending(&t->tqent_timer)); kmem_free(t, sizeof(taskq_ent_t)); tq->tq_nalloc--; @@ -145,6 +148,9 @@ task_done(taskq_t *tq, taskq_ent_t *t) ASSERT(t); ASSERT(spin_is_locked(&tq->tq_lock)); + /* Wake tasks blocked in taskq_wait_id() */ + wake_up_all(&t->tqent_waitq); + list_del_init(&t->tqent_list); if (tq->tq_nalloc <= tq->tq_minalloc) { @@ -162,213 +168,49 @@ task_done(taskq_t *tq, taskq_ent_t *t) } /* - * As tasks are submitted to the task queue they are assigned a - * monotonically increasing taskqid and added to the tail of the pending - * list. As worker threads become available the tasks are removed from - * the head of the pending or priority list, giving preference to the - * priority list. The tasks are then removed from their respective - * list, and the taskq_thread servicing the task is added to the active - * list, preserving the order using the serviced task's taskqid. - * Finally, as tasks complete the taskq_thread servicing the task is - * removed from the active list. This means that the pending task and - * active taskq_thread lists are always kept sorted by taskqid. Thus the - * lowest outstanding incomplete taskqid can be determined simply by - * checking the min taskqid for each head item on the pending, priority, - * and active taskq_thread list. This value is stored in - * tq->tq_lowest_id and only updated to the new lowest id when the - * previous lowest id completes. All taskqids lower than - * tq->tq_lowest_id must have completed. It is also possible larger - * taskqid's have completed because they may be processed in parallel by - * several worker threads. However, this is not a problem because the - * behavior of taskq_wait_id() is to block until all previously - * submitted taskqid's have completed. - * - * XXX: Taskqid_t wrapping is not handled. However, taskqid_t's are - * 64-bit values so even if a taskq is processing 2^24 (16,777,216) - * taskqid_ts per second it will still take 2^40 seconds, 34,865 years, - * before the wrap occurs. I can live with that for now. + * When a delayed task timer expires remove it from the delay list and + * add it to the priority list in order for immediate processing. */ -static int -taskq_wait_check(taskq_t *tq, taskqid_t id) -{ - int rc; - - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - rc = (id < tq->tq_lowest_id); - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - - SRETURN(rc); -} - -void -taskq_wait_id(taskq_t *tq, taskqid_t id) -{ - SENTRY; - ASSERT(tq); - - wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id)); - - SEXIT; -} -EXPORT_SYMBOL(taskq_wait_id); - -void -taskq_wait(taskq_t *tq) -{ - taskqid_t id; - SENTRY; - ASSERT(tq); - - /* Wait for the largest outstanding taskqid */ - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - id = tq->tq_next_id - 1; - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - - taskq_wait_id(tq, id); - - SEXIT; - -} -EXPORT_SYMBOL(taskq_wait); - -int -taskq_member(taskq_t *tq, void *t) +static void +task_expire(unsigned long data) { + taskq_ent_t *w, *t = (taskq_ent_t *)data; + taskq_t *tq = t->tqent_taskq; struct list_head *l; - taskq_thread_t *tqt; - SENTRY; - - ASSERT(tq); - ASSERT(t); - - list_for_each(l, &tq->tq_thread_list) { - tqt = list_entry(l, taskq_thread_t, tqt_thread_list); - if (tqt->tqt_thread == (struct task_struct *)t) - SRETURN(1); - } - - SRETURN(0); -} -EXPORT_SYMBOL(taskq_member); - -taskqid_t -taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) -{ - taskq_ent_t *t; - taskqid_t rc = 0; - SENTRY; - - ASSERT(tq); - ASSERT(func); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - /* Taskq being destroyed and all tasks drained */ - if (!(tq->tq_flags & TQ_ACTIVE)) - SGOTO(out, rc = 0); - - /* Do not queue the task unless there is idle thread for it */ - ASSERT(tq->tq_nactive <= tq->tq_nthreads); - if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) - SGOTO(out, rc = 0); - - if ((t = task_alloc(tq, flags)) == NULL) - SGOTO(out, rc = 0); - - spin_lock(&t->tqent_lock); - - /* Queue to the priority list instead of the pending list */ - if (flags & TQ_FRONT) - list_add_tail(&t->tqent_list, &tq->tq_prio_list); - else - list_add_tail(&t->tqent_list, &tq->tq_pend_list); - - t->tqent_id = rc = tq->tq_next_id; - tq->tq_next_id++; - t->tqent_func = func; - t->tqent_arg = arg; - - ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); - - spin_unlock(&t->tqent_lock); - - wake_up(&tq->tq_work_waitq); -out: - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - SRETURN(rc); -} -EXPORT_SYMBOL(taskq_dispatch); - -void -taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, - taskq_ent_t *t) -{ - SENTRY; - - ASSERT(tq); - ASSERT(func); - ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); - - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - - /* Taskq being destroyed and all tasks drained */ - if (!(tq->tq_flags & TQ_ACTIVE)) { - t->tqent_id = 0; - goto out; + if (t->tqent_flags & TQENT_FLAG_CANCEL) { + ASSERT(list_empty(&t->tqent_list)); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + return; } - spin_lock(&t->tqent_lock); - /* - * Mark it as a prealloc'd task. This is important - * to ensure that we don't free it later. + * The priority list must be maintained in strict task id order + * from lowest to highest for lowest_id to be easily calculable. */ - t->tqent_flags |= TQENT_FLAG_PREALLOC; + list_del(&t->tqent_list); + list_for_each_prev(l, &tq->tq_prio_list) { + w = list_entry(l, taskq_ent_t, tqent_list); + if (w->tqent_id < t->tqent_id) { + list_add(&t->tqent_list, l); + break; + } + } + if (l == &tq->tq_prio_list) + list_add(&t->tqent_list, &tq->tq_prio_list); - /* Queue to the priority list instead of the pending list */ - if (flags & TQ_FRONT) - list_add_tail(&t->tqent_list, &tq->tq_prio_list); - else - list_add_tail(&t->tqent_list, &tq->tq_pend_list); - - t->tqent_id = tq->tq_next_id; - tq->tq_next_id++; - t->tqent_func = func; - t->tqent_arg = arg; - - spin_unlock(&t->tqent_lock); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); wake_up(&tq->tq_work_waitq); -out: - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - SEXIT; } -EXPORT_SYMBOL(taskq_dispatch_ent); - -int -taskq_empty_ent(taskq_ent_t *t) -{ - return list_empty(&t->tqent_list); -} -EXPORT_SYMBOL(taskq_empty_ent); - -void -taskq_init_ent(taskq_ent_t *t) -{ - spin_lock_init(&t->tqent_lock); - INIT_LIST_HEAD(&t->tqent_list); - t->tqent_id = 0; - t->tqent_func = NULL; - t->tqent_arg = NULL; - t->tqent_flags = 0; -} -EXPORT_SYMBOL(taskq_init_ent); /* * Returns the lowest incomplete taskqid_t. The taskqid_t may - * be queued on the pending list, on the priority list, or on - * the work list currently being handled, but it is not 100% - * complete yet. + * be queued on the pending list, on the priority list, on the + * delay list, or on the work list currently being handled, but + * it is not 100% complete yet. */ static taskqid_t taskq_lowest_id(taskq_t *tq) @@ -391,6 +233,11 @@ taskq_lowest_id(taskq_t *tq) lowest_id = MIN(lowest_id, t->tqent_id); } + if (!list_empty(&tq->tq_delay_list)) { + t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list); + lowest_id = MIN(lowest_id, t->tqent_id); + } + if (!list_empty(&tq->tq_active_list)) { tqt = list_entry(tq->tq_active_list.next, taskq_thread_t, tqt_active_list); @@ -428,6 +275,415 @@ taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt) SEXIT; } +/* + * Find and return a task from the given list if it exists. The list + * must be in lowest to highest task id order. + */ +static taskq_ent_t * +taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id) +{ + struct list_head *l; + taskq_ent_t *t; + SENTRY; + + ASSERT(spin_is_locked(&tq->tq_lock)); + + list_for_each(l, lh) { + t = list_entry(l, taskq_ent_t, tqent_list); + + if (t->tqent_id == id) + SRETURN(t); + + if (t->tqent_id > id) + break; + } + + SRETURN(NULL); +} + +/* + * Find an already dispatched task given the task id regardless of what + * state it is in. If a task is still pending or executing it will be + * returned and 'active' set appropriately. If the task has already + * been run then NULL is returned. + */ +static taskq_ent_t * +taskq_find(taskq_t *tq, taskqid_t id, int *active) +{ + taskq_thread_t *tqt; + struct list_head *l; + taskq_ent_t *t; + SENTRY; + + ASSERT(spin_is_locked(&tq->tq_lock)); + *active = 0; + + t = taskq_find_list(tq, &tq->tq_delay_list, id); + if (t) + SRETURN(t); + + t = taskq_find_list(tq, &tq->tq_prio_list, id); + if (t) + SRETURN(t); + + t = taskq_find_list(tq, &tq->tq_pend_list, id); + if (t) + SRETURN(t); + + list_for_each(l, &tq->tq_active_list) { + tqt = list_entry(l, taskq_thread_t, tqt_active_list); + if (tqt->tqt_id == id) { + t = tqt->tqt_task; + *active = 1; + SRETURN(t); + } + } + + SRETURN(NULL); +} + +/* + * The taskq_wait_id() function blocks until the passed task id completes. + * This does not guarantee that all lower task id's have completed. + */ +void +taskq_wait_id(taskq_t *tq, taskqid_t id) +{ + DEFINE_WAIT(wait); + taskq_ent_t *t; + int active = 0; + SENTRY; + + ASSERT(tq); + ASSERT(id > 0); + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + t = taskq_find(tq, id, &active); + if (t) + prepare_to_wait(&t->tqent_waitq, &wait, TASK_UNINTERRUPTIBLE); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + + /* + * We rely on the kernels autoremove_wake_function() function to + * remove us from the wait queue in the context of wake_up(). + * Once woken the taskq_ent_t pointer must never be accessed. + */ + if (t) { + t = NULL; + schedule(); + __set_current_state(TASK_RUNNING); + } + + SEXIT; +} +EXPORT_SYMBOL(taskq_wait_id); + +/* + * The taskq_wait() function will block until all previously submitted + * tasks have been completed. A previously submitted task is defined as + * a task with a lower task id than the current task queue id. Note that + * all task id's are assigned monotonically at dispatch time. + * + * Waiting for all previous tasks to complete is accomplished by tracking + * the lowest outstanding task id. As tasks are dispatched they are added + * added to the tail of the pending, priority, or delay lists. And as + * worker threads become available the tasks are removed from the heads + * of these lists and linked to the worker threads. This ensures the + * lists are kept in lowest to highest task id order. + * + * Therefore the lowest outstanding task id can be quickly determined by + * checking the head item from all of these lists. This value is stored + * with the task queue as the lowest id. It only needs to be recalculated + * when either the task with the current lowest id completes or is canceled. + * + * By blocking until the lowest task id exceeds the current task id when + * the function was called we ensure all previous tasks have completed. + * + * NOTE: When there are multiple worked threads it is possible for larger + * task ids to complete before smaller ones. Conversely when the task + * queue contains delay tasks with small task ids, you may block for a + * considerable length of time waiting for them to expire and execute. + */ +static int +taskq_wait_check(taskq_t *tq, taskqid_t id) +{ + int rc; + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + rc = (id < tq->tq_lowest_id); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + + SRETURN(rc); +} + +void +taskq_wait_all(taskq_t *tq, taskqid_t id) +{ + wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id)); +} +EXPORT_SYMBOL(taskq_wait_all); + +void +taskq_wait(taskq_t *tq) +{ + taskqid_t id; + SENTRY; + ASSERT(tq); + + /* Wait for the largest outstanding taskqid */ + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + id = tq->tq_next_id - 1; + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + + taskq_wait_all(tq, id); + + SEXIT; + +} +EXPORT_SYMBOL(taskq_wait); + +int +taskq_member(taskq_t *tq, void *t) +{ + struct list_head *l; + taskq_thread_t *tqt; + SENTRY; + + ASSERT(tq); + ASSERT(t); + + list_for_each(l, &tq->tq_thread_list) { + tqt = list_entry(l, taskq_thread_t, tqt_thread_list); + if (tqt->tqt_thread == (struct task_struct *)t) + SRETURN(1); + } + + SRETURN(0); +} +EXPORT_SYMBOL(taskq_member); + +/* + * Cancel an already dispatched task given the task id. Still pending tasks + * will be immediately canceled, and if the task is active the function will + * block until it completes. Preallocated tasks which are canceled must be + * freed by the caller. + */ +int +taskq_cancel_id(taskq_t *tq, taskqid_t id) +{ + taskq_ent_t *t; + int active = 0; + int rc = ENOENT; + SENTRY; + + ASSERT(tq); + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + t = taskq_find(tq, id, &active); + if (t && !active) { + list_del_init(&t->tqent_list); + t->tqent_flags |= TQENT_FLAG_CANCEL; + + /* + * When canceling the lowest outstanding task id we + * must recalculate the new lowest outstanding id. + */ + if (tq->tq_lowest_id == t->tqent_id) { + tq->tq_lowest_id = taskq_lowest_id(tq); + ASSERT3S(tq->tq_lowest_id, >, t->tqent_id); + } + + /* + * The task_expire() function takes the tq->tq_lock so drop + * drop the lock before synchronously cancelling the timer. + */ + if (timer_pending(&t->tqent_timer)) { + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + del_timer_sync(&t->tqent_timer); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + } + + if (!(t->tqent_flags & TQENT_FLAG_PREALLOC)) + task_done(tq, t); + + rc = 0; + } + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + + if (active) { + taskq_wait_id(tq, id); + rc = EBUSY; + } + + SRETURN(rc); +} +EXPORT_SYMBOL(taskq_cancel_id); + +taskqid_t +taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) +{ + taskq_ent_t *t; + taskqid_t rc = 0; + SENTRY; + + ASSERT(tq); + ASSERT(func); + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + + /* Taskq being destroyed and all tasks drained */ + if (!(tq->tq_flags & TQ_ACTIVE)) + SGOTO(out, rc = 0); + + /* Do not queue the task unless there is idle thread for it */ + ASSERT(tq->tq_nactive <= tq->tq_nthreads); + if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) + SGOTO(out, rc = 0); + + if ((t = task_alloc(tq, flags)) == NULL) + SGOTO(out, rc = 0); + + spin_lock(&t->tqent_lock); + + /* Queue to the priority list instead of the pending list */ + if (flags & TQ_FRONT) + list_add_tail(&t->tqent_list, &tq->tq_prio_list); + else + list_add_tail(&t->tqent_list, &tq->tq_pend_list); + + t->tqent_id = rc = tq->tq_next_id; + tq->tq_next_id++; + t->tqent_func = func; + t->tqent_arg = arg; + t->tqent_taskq = tq; + t->tqent_timer.data = 0; + t->tqent_timer.function = NULL; + t->tqent_timer.expires = 0; + + ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); + + spin_unlock(&t->tqent_lock); + + wake_up(&tq->tq_work_waitq); +out: + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + SRETURN(rc); +} +EXPORT_SYMBOL(taskq_dispatch); + +taskqid_t +taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, + uint_t flags, clock_t expire_time) +{ + taskq_ent_t *t; + taskqid_t rc = 0; + SENTRY; + + ASSERT(tq); + ASSERT(func); + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + + /* Taskq being destroyed and all tasks drained */ + if (!(tq->tq_flags & TQ_ACTIVE)) + SGOTO(out, rc = 0); + + if ((t = task_alloc(tq, flags)) == NULL) + SGOTO(out, rc = 0); + + spin_lock(&t->tqent_lock); + + /* Queue to the delay list for subsequent execution */ + list_add_tail(&t->tqent_list, &tq->tq_delay_list); + + t->tqent_id = rc = tq->tq_next_id; + tq->tq_next_id++; + t->tqent_func = func; + t->tqent_arg = arg; + t->tqent_taskq = tq; + t->tqent_timer.data = (unsigned long)t; + t->tqent_timer.function = task_expire; + t->tqent_timer.expires = (unsigned long)expire_time; + add_timer(&t->tqent_timer); + + ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); + + spin_unlock(&t->tqent_lock); +out: + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + SRETURN(rc); +} +EXPORT_SYMBOL(taskq_dispatch_delay); + +void +taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, + taskq_ent_t *t) +{ + SENTRY; + + ASSERT(tq); + ASSERT(func); + ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + + /* Taskq being destroyed and all tasks drained */ + if (!(tq->tq_flags & TQ_ACTIVE)) { + t->tqent_id = 0; + goto out; + } + + spin_lock(&t->tqent_lock); + + /* + * Mark it as a prealloc'd task. This is important + * to ensure that we don't free it later. + */ + t->tqent_flags |= TQENT_FLAG_PREALLOC; + + /* Queue to the priority list instead of the pending list */ + if (flags & TQ_FRONT) + list_add_tail(&t->tqent_list, &tq->tq_prio_list); + else + list_add_tail(&t->tqent_list, &tq->tq_pend_list); + + t->tqent_id = tq->tq_next_id; + tq->tq_next_id++; + t->tqent_func = func; + t->tqent_arg = arg; + t->tqent_taskq = tq; + + spin_unlock(&t->tqent_lock); + + wake_up(&tq->tq_work_waitq); +out: + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + SEXIT; +} +EXPORT_SYMBOL(taskq_dispatch_ent); + +int +taskq_empty_ent(taskq_ent_t *t) +{ + return list_empty(&t->tqent_list); +} +EXPORT_SYMBOL(taskq_empty_ent); + +void +taskq_init_ent(taskq_ent_t *t) +{ + spin_lock_init(&t->tqent_lock); + init_waitqueue_head(&t->tqent_waitq); + init_timer(&t->tqent_timer); + INIT_LIST_HEAD(&t->tqent_list); + t->tqent_id = 0; + t->tqent_func = NULL; + t->tqent_arg = NULL; + t->tqent_flags = 0; + t->tqent_taskq = NULL; +} +EXPORT_SYMBOL(taskq_init_ent); + static int taskq_thread(void *args) { @@ -481,6 +737,7 @@ taskq_thread(void *args) * preallocated taskq_ent_t, tqent_id must be * stored prior to executing tqent_func. */ tqt->tqt_id = t->tqent_id; + tqt->tqt_task = t; /* We must store a copy of the flags prior to * servicing the task (servicing a prealloc'd task @@ -499,6 +756,7 @@ taskq_thread(void *args) spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); tq->tq_nactive--; list_del_init(&tqt->tqt_active_list); + tqt->tqt_task = NULL; /* For prealloc'd tasks, we don't free anything. */ if ((tq->tq_flags & TASKQ_DYNAMIC) || @@ -576,6 +834,7 @@ taskq_create(const char *name, int nthreads, pri_t pri, INIT_LIST_HEAD(&tq->tq_free_list); INIT_LIST_HEAD(&tq->tq_pend_list); INIT_LIST_HEAD(&tq->tq_prio_list); + INIT_LIST_HEAD(&tq->tq_delay_list); init_waitqueue_head(&tq->tq_work_waitq); init_waitqueue_head(&tq->tq_wait_waitq); @@ -669,6 +928,7 @@ taskq_destroy(taskq_t *tq) ASSERT(list_empty(&tq->tq_free_list)); ASSERT(list_empty(&tq->tq_pend_list)); ASSERT(list_empty(&tq->tq_prio_list)); + ASSERT(list_empty(&tq->tq_delay_list)); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); diff --git a/module/splat/splat-taskq.c b/module/splat/splat-taskq.c index b94930cc9..7fad4627e 100644 --- a/module/splat/splat-taskq.c +++ b/module/splat/splat-taskq.c @@ -548,10 +548,10 @@ splat_taskq_test4(struct file *file, void *arg) * next pending task as soon as it completes its current task. This * means that tasks do not strictly complete in order in which they * were dispatched (increasing task id). This is fine but we need to - * verify that taskq_wait_id() blocks until the passed task id and all + * verify that taskq_wait_all() blocks until the passed task id and all * lower task ids complete. We do this by dispatching the following * specific sequence of tasks each of which block for N time units. - * We then use taskq_wait_id() to unblock at specific task id and + * We then use taskq_wait_all() to unblock at specific task id and * verify the only the expected task ids have completed and in the * correct order. The two cases of interest are: * @@ -562,17 +562,17 @@ splat_taskq_test4(struct file *file, void *arg) * * The following table shows each task id and how they will be * scheduled. Each rows represent one time unit and each column - * one of the three worker threads. The places taskq_wait_id() + * one of the three worker threads. The places taskq_wait_all() * must unblock for a specific id are identified as well as the * task ids which must have completed and their order. * - * +-----+ <--- taskq_wait_id(tq, 8) unblocks + * +-----+ <--- taskq_wait_all(tq, 8) unblocks * | | Required Completion Order: 1,2,4,5,3,8,6,7 * +-----+ | * | | | * | | +-----+ * | | | 8 | - * | | +-----+ <--- taskq_wait_id(tq, 3) unblocks + * | | +-----+ <--- taskq_wait_all(tq, 3) unblocks * | | 7 | | Required Completion Order: 1,2,4,5,3 * | +-----+ | * | 6 | | | @@ -712,13 +712,13 @@ splat_taskq_test5_impl(struct file *file, void *arg, boolean_t prealloc) splat_vprint(file, SPLAT_TASKQ_TEST5_NAME, "Taskq '%s' " "waiting for taskqid %d completion\n", tq_arg.name, 3); - taskq_wait_id(tq, 3); + taskq_wait_all(tq, 3); if ((rc = splat_taskq_test_order(&tq_arg, order1))) goto out; splat_vprint(file, SPLAT_TASKQ_TEST5_NAME, "Taskq '%s' " "waiting for taskqid %d completion\n", tq_arg.name, 8); - taskq_wait_id(tq, 8); + taskq_wait_all(tq, 8); rc = splat_taskq_test_order(&tq_arg, order2); out: @@ -874,7 +874,7 @@ splat_taskq_test6_impl(struct file *file, void *arg, boolean_t prealloc) splat_vprint(file, SPLAT_TASKQ_TEST6_NAME, "Taskq '%s' " "waiting for taskqid %d completion\n", tq_arg.name, SPLAT_TASKQ_ORDER_MAX); - taskq_wait_id(tq, SPLAT_TASKQ_ORDER_MAX); + taskq_wait_all(tq, SPLAT_TASKQ_ORDER_MAX); rc = splat_taskq_test_order(&tq_arg, order); out: @@ -975,7 +975,7 @@ splat_taskq_test7_impl(struct file *file, void *arg, boolean_t prealloc) if (tq_arg.flag == 0) { splat_vprint(file, SPLAT_TASKQ_TEST7_NAME, "Taskq '%s' waiting\n", tq_arg.name); - taskq_wait_id(tq, SPLAT_TASKQ_DEPTH_MAX); + taskq_wait_all(tq, SPLAT_TASKQ_DEPTH_MAX); } splat_vprint(file, SPLAT_TASKQ_TEST7_NAME, From 2f357826208085bacce9e0a29972c2e5728d5420 Mon Sep 17 00:00:00 2001 From: Brian Behlendorf Date: Thu, 6 Dec 2012 14:52:35 -0800 Subject: [PATCH 04/11] splat taskq:delay: Add test case Add a test case for taskq_dispatch_delay() to verify it is working properly. The test dispatchs 100 tasks to a taskq with random expiration times spread over 5 seconds. As each task expires and gets executed by a worker thread it verifies that it was run at the correct time. Once all the delayed tasks have been executed we double check that all the dispatched tasks were successful. Signed-off-by: Brian Behlendorf --- module/splat/splat-taskq.c | 124 +++++++++++++++++++++++++++++++++---- 1 file changed, 113 insertions(+), 11 deletions(-) diff --git a/module/splat/splat-taskq.c b/module/splat/splat-taskq.c index 7fad4627e..272d334c0 100644 --- a/module/splat/splat-taskq.c +++ b/module/splat/splat-taskq.c @@ -25,6 +25,7 @@ \*****************************************************************************/ #include +#include #include #include "splat-internal.h" @@ -63,6 +64,10 @@ #define SPLAT_TASKQ_TEST8_NAME "contention" #define SPLAT_TASKQ_TEST8_DESC "1 queue, 100 threads, 131072 tasks" +#define SPLAT_TASKQ_TEST9_ID 0x0209 +#define SPLAT_TASKQ_TEST9_NAME "delay" +#define SPLAT_TASKQ_TEST9_DESC "Delayed task execution" + #define SPLAT_TASKQ_ORDER_MAX 8 #define SPLAT_TASKQ_DEPTH_MAX 16 @@ -70,9 +75,10 @@ typedef struct splat_taskq_arg { int flag; int id; - atomic_t count; + atomic_t *count; int order[SPLAT_TASKQ_ORDER_MAX]; unsigned int depth; + unsigned long expire; taskq_t *tq; taskq_ent_t *tqe; spinlock_t lock; @@ -415,8 +421,8 @@ splat_taskq_test3(struct file *file, void *arg) /* * Create a taskq and dispatch a large number of tasks to the queue. * Then use taskq_wait() to block until all the tasks complete, then - * cross check that all the tasks ran by checking tg_arg->count which - * is incremented in the task function. Finally cleanup the taskq. + * cross check that all the tasks ran by checking the shared atomic + * counter which is incremented in the task function. * * First we try with a large 'maxalloc' value, then we try with a small one. * We should not drop tasks when TQ_SLEEP is used in taskq_dispatch(), even @@ -428,7 +434,7 @@ splat_taskq_test4_func(void *arg) splat_taskq_arg_t *tq_arg = (splat_taskq_arg_t *)arg; ASSERT(tq_arg); - atomic_inc(&tq_arg->count); + atomic_inc(tq_arg->count); } static int @@ -439,6 +445,7 @@ splat_taskq_test4_common(struct file *file, void *arg, int minalloc, taskqid_t id; splat_taskq_arg_t tq_arg; taskq_ent_t *tqes; + atomic_t count; int i, j, rc = 0; tqes = kmalloc(sizeof(*tqes) * nr_tasks, GFP_KERNEL); @@ -461,9 +468,10 @@ splat_taskq_test4_common(struct file *file, void *arg, int minalloc, tq_arg.file = file; tq_arg.name = SPLAT_TASKQ_TEST4_NAME; + tq_arg.count = &count; for (i = 1; i <= nr_tasks; i *= 2) { - atomic_set(&tq_arg.count, 0); + atomic_set(tq_arg.count, 0); splat_vprint(file, SPLAT_TASKQ_TEST4_NAME, "Taskq '%s' function '%s' dispatched %d times\n", tq_arg.name, sym2str(splat_taskq_test4_func), i); @@ -495,8 +503,8 @@ splat_taskq_test4_common(struct file *file, void *arg, int minalloc, taskq_wait(tq); splat_vprint(file, SPLAT_TASKQ_TEST4_NAME, "Taskq '%s' " "%d/%d dispatches finished\n", tq_arg.name, - atomic_read(&tq_arg.count), i); - if (atomic_read(&tq_arg.count) != i) { + atomic_read(&count), i); + if (atomic_read(&count) != i) { rc = -ERANGE; goto out; @@ -1011,7 +1019,7 @@ splat_taskq_test8_func(void *arg) splat_taskq_arg_t *tq_arg = (splat_taskq_arg_t *)arg; ASSERT(tq_arg); - atomic_inc(&tq_arg->count); + atomic_inc(tq_arg->count); } #define TEST8_NUM_TASKS 0x20000 @@ -1025,6 +1033,7 @@ splat_taskq_test8_common(struct file *file, void *arg, int minalloc, taskqid_t id; splat_taskq_arg_t tq_arg; taskq_ent_t **tqes; + atomic_t count; int i, j, rc = 0; tqes = vmalloc(sizeof(*tqes) * TEST8_NUM_TASKS); @@ -1048,8 +1057,9 @@ splat_taskq_test8_common(struct file *file, void *arg, int minalloc, tq_arg.file = file; tq_arg.name = SPLAT_TASKQ_TEST8_NAME; + tq_arg.count = &count; + atomic_set(tq_arg.count, 0); - atomic_set(&tq_arg.count, 0); for (i = 0; i < TEST8_NUM_TASKS; i++) { tqes[i] = kmalloc(sizeof(taskq_ent_t), GFP_KERNEL); if (tqes[i] == NULL) { @@ -1079,9 +1089,9 @@ splat_taskq_test8_common(struct file *file, void *arg, int minalloc, taskq_wait(tq); splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, "Taskq '%s' " "%d/%d dispatches finished\n", tq_arg.name, - atomic_read(&tq_arg.count), TEST8_NUM_TASKS); + atomic_read(tq_arg.count), TEST8_NUM_TASKS); - if (atomic_read(&tq_arg.count) != TEST8_NUM_TASKS) + if (atomic_read(tq_arg.count) != TEST8_NUM_TASKS) rc = -ERANGE; out: @@ -1106,6 +1116,95 @@ splat_taskq_test8(struct file *file, void *arg) return rc; } +/* + * Create a taskq and dispatch a number of delayed tasks to the queue. + * For each task verify that it was run no early than requested. + */ +static void +splat_taskq_test9_func(void *arg) +{ + splat_taskq_arg_t *tq_arg = (splat_taskq_arg_t *)arg; + ASSERT(tq_arg); + + if (ddi_get_lbolt() >= tq_arg->expire) + atomic_inc(tq_arg->count); + + kmem_free(tq_arg, sizeof(splat_taskq_arg_t)); +} + +static int +splat_taskq_test9(struct file *file, void *arg) +{ + taskq_t *tq; + atomic_t count; + int i, rc = 0; + int minalloc = 1; + int maxalloc = 10; + int nr_tasks = 100; + + splat_vprint(file, SPLAT_TASKQ_TEST9_NAME, + "Taskq '%s' creating (%s dispatch) (%d/%d/%d)\n", + SPLAT_TASKQ_TEST9_NAME, "delay", minalloc, maxalloc, nr_tasks); + if ((tq = taskq_create(SPLAT_TASKQ_TEST9_NAME, 3, maxclsyspri, + minalloc, maxalloc, TASKQ_PREPOPULATE)) == NULL) { + splat_vprint(file, SPLAT_TASKQ_TEST9_NAME, + "Taskq '%s' create failed\n", SPLAT_TASKQ_TEST9_NAME); + return -EINVAL; + } + + atomic_set(&count, 0); + + for (i = 1; i <= nr_tasks; i++) { + splat_taskq_arg_t *tq_arg; + taskqid_t id; + uint32_t rnd; + + /* A random timeout in jiffies of at most 5 seconds */ + get_random_bytes((void *)&rnd, 4); + rnd = rnd % (5 * HZ); + + tq_arg = kmem_alloc(sizeof(splat_taskq_arg_t), KM_SLEEP); + tq_arg->file = file; + tq_arg->name = SPLAT_TASKQ_TEST9_NAME; + tq_arg->expire = ddi_get_lbolt() + rnd; + tq_arg->count = &count; + + splat_vprint(file, SPLAT_TASKQ_TEST9_NAME, + "Taskq '%s' delay dispatch %u jiffies\n", + SPLAT_TASKQ_TEST9_NAME, rnd); + + id = taskq_dispatch_delay(tq, splat_taskq_test9_func, + tq_arg, TQ_SLEEP, ddi_get_lbolt() + rnd); + + if (id == 0) { + splat_vprint(file, SPLAT_TASKQ_TEST9_NAME, + "Taskq '%s' delay dispatch failed\n", + SPLAT_TASKQ_TEST9_NAME); + kmem_free(tq_arg, sizeof(splat_taskq_arg_t)); + taskq_wait(tq); + rc = -EINVAL; + goto out; + } + } + + splat_vprint(file, SPLAT_TASKQ_TEST9_NAME, "Taskq '%s' waiting for " + "%d delay dispatches\n", SPLAT_TASKQ_TEST9_NAME, nr_tasks); + + taskq_wait(tq); + if (atomic_read(&count) != nr_tasks) + rc = -ERANGE; + + splat_vprint(file, SPLAT_TASKQ_TEST9_NAME, "Taskq '%s' %d/%d delay " + "dispatches finished on time\n", SPLAT_TASKQ_TEST9_NAME, + atomic_read(&count), nr_tasks); + splat_vprint(file, SPLAT_TASKQ_TEST9_NAME, "Taskq '%s' destroying\n", + SPLAT_TASKQ_TEST9_NAME); +out: + taskq_destroy(tq); + + return rc; +} + splat_subsystem_t * splat_taskq_init(void) { @@ -1139,6 +1238,8 @@ splat_taskq_init(void) SPLAT_TASKQ_TEST7_ID, splat_taskq_test7); SPLAT_TEST_INIT(sub, SPLAT_TASKQ_TEST8_NAME, SPLAT_TASKQ_TEST8_DESC, SPLAT_TASKQ_TEST8_ID, splat_taskq_test8); + SPLAT_TEST_INIT(sub, SPLAT_TASKQ_TEST9_NAME, SPLAT_TASKQ_TEST9_DESC, + SPLAT_TASKQ_TEST9_ID, splat_taskq_test9); return sub; } @@ -1147,6 +1248,7 @@ void splat_taskq_fini(splat_subsystem_t *sub) { ASSERT(sub); + SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST9_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST8_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST7_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST6_ID); From 3238e717631c68f6f44907b46733f4ae70452d3b Mon Sep 17 00:00:00 2001 From: Brian Behlendorf Date: Thu, 6 Dec 2012 15:42:32 -0800 Subject: [PATCH 05/11] splat taskq:cancel: Add test case Add a test case for taskq_cancel_id() to verify it is working properly. Just like taskq:delay we start by dispatching 100 tasks. However this time 1/3 of the tasks use taskq_dispatch() and will be run immediately, and 2/3 use taskq_dispatch_delay(). The idea is to create a busy taskq with both active, pending, and delayed tasks. After all the items have been successfully dispatched the test begins randomly canceling known task ids. It will do this for 5 seconds randomly canceling a task id and then sleeping for a few milliseconds. The task being canceled may have already run, still be on the pending list, or may be currently being executed by a worker thread. The idea is to ensure we catch any subtle race conditions. Once all the non-canceled tasks have completed we cross check the number of tasks which ran with the number of tasks which were successfully canceled. Additionally, we verify that the taskq_cancel_id() function never blocks longer than needed. This time is bounded by the longest run time of the task which was dispatched. Signed-off-by: Brian Behlendorf --- module/splat/splat-taskq.c | 183 +++++++++++++++++++++++++++++++++++++ 1 file changed, 183 insertions(+) diff --git a/module/splat/splat-taskq.c b/module/splat/splat-taskq.c index 272d334c0..673f4bd33 100644 --- a/module/splat/splat-taskq.c +++ b/module/splat/splat-taskq.c @@ -68,6 +68,10 @@ #define SPLAT_TASKQ_TEST9_NAME "delay" #define SPLAT_TASKQ_TEST9_DESC "Delayed task execution" +#define SPLAT_TASKQ_TEST10_ID 0x020a +#define SPLAT_TASKQ_TEST10_NAME "cancel" +#define SPLAT_TASKQ_TEST10_DESC "Cancel task execution" + #define SPLAT_TASKQ_ORDER_MAX 8 #define SPLAT_TASKQ_DEPTH_MAX 16 @@ -1205,6 +1209,182 @@ out: return rc; } +/* + * Create a taskq and dispatch then cancel tasks in the queue. + */ +static void +splat_taskq_test10_func(void *arg) +{ + splat_taskq_arg_t *tq_arg = (splat_taskq_arg_t *)arg; + uint8_t rnd; + + if (ddi_get_lbolt() >= tq_arg->expire) + atomic_inc(tq_arg->count); + + /* Randomly sleep to further perturb the system */ + get_random_bytes((void *)&rnd, 1); + msleep(1 + (rnd % 9)); +} + +static int +splat_taskq_test10(struct file *file, void *arg) +{ + taskq_t *tq; + splat_taskq_arg_t **tqas; + atomic_t count; + int i, j, rc = 0; + int minalloc = 1; + int maxalloc = 10; + int nr_tasks = 100; + int canceled = 0; + int completed = 0; + int blocked = 0; + unsigned long start, cancel; + + tqas = vmalloc(sizeof(*tqas) * nr_tasks); + if (tqas == NULL) + return -ENOMEM; + memset(tqas, 0, sizeof(*tqas) * nr_tasks); + + splat_vprint(file, SPLAT_TASKQ_TEST10_NAME, + "Taskq '%s' creating (%s dispatch) (%d/%d/%d)\n", + SPLAT_TASKQ_TEST10_NAME, "delay", minalloc, maxalloc, nr_tasks); + if ((tq = taskq_create(SPLAT_TASKQ_TEST10_NAME, 3, maxclsyspri, + minalloc, maxalloc, TASKQ_PREPOPULATE)) == NULL) { + splat_vprint(file, SPLAT_TASKQ_TEST10_NAME, + "Taskq '%s' create failed\n", SPLAT_TASKQ_TEST10_NAME); + rc = -EINVAL; + goto out_free; + } + + atomic_set(&count, 0); + + for (i = 0; i < nr_tasks; i++) { + splat_taskq_arg_t *tq_arg; + uint32_t rnd; + + /* A random timeout in jiffies of at most 5 seconds */ + get_random_bytes((void *)&rnd, 4); + rnd = rnd % (5 * HZ); + + tq_arg = kmem_alloc(sizeof(splat_taskq_arg_t), KM_SLEEP); + tq_arg->file = file; + tq_arg->name = SPLAT_TASKQ_TEST10_NAME; + tq_arg->count = &count; + tqas[i] = tq_arg; + + /* + * Dispatch every 1/3 one immediately to mix it up, the cancel + * code is inherently racy and we want to try and provoke any + * subtle concurrently issues. + */ + if ((i % 3) == 0) { + tq_arg->expire = ddi_get_lbolt(); + tq_arg->id = taskq_dispatch(tq, splat_taskq_test10_func, + tq_arg, TQ_SLEEP); + } else { + tq_arg->expire = ddi_get_lbolt() + rnd; + tq_arg->id = taskq_dispatch_delay(tq, + splat_taskq_test10_func, + tq_arg, TQ_SLEEP, ddi_get_lbolt() + rnd); + } + + if (tq_arg->id == 0) { + splat_vprint(file, SPLAT_TASKQ_TEST10_NAME, + "Taskq '%s' dispatch failed\n", + SPLAT_TASKQ_TEST10_NAME); + kmem_free(tq_arg, sizeof(splat_taskq_arg_t)); + taskq_wait(tq); + rc = -EINVAL; + goto out; + } else { + splat_vprint(file, SPLAT_TASKQ_TEST10_NAME, + "Taskq '%s' dispatch %lu in %lu jiffies\n", + SPLAT_TASKQ_TEST10_NAME, (unsigned long)tq_arg->id, + !(i % 3) ? 0 : tq_arg->expire - ddi_get_lbolt()); + } + } + + /* + * Start randomly canceling tasks for the duration of the test. We + * happen to know the valid task id's will be in the range 1..nr_tasks + * because the taskq is private and was just created. However, we + * have no idea of a particular task has already executed or not. + */ + splat_vprint(file, SPLAT_TASKQ_TEST10_NAME, "Taskq '%s' randomly " + "canceling task ids\n", SPLAT_TASKQ_TEST10_NAME); + + start = ddi_get_lbolt(); + i = 0; + + while (ddi_get_lbolt() < start + 5 * HZ) { + taskqid_t id; + uint32_t rnd; + + i++; + cancel = ddi_get_lbolt(); + get_random_bytes((void *)&rnd, 4); + id = 1 + (rnd % nr_tasks); + rc = taskq_cancel_id(tq, id); + + /* + * Keep track of the results of the random cancels. + */ + if (rc == 0) { + canceled++; + } else if (rc == ENOENT) { + completed++; + } else if (rc == EBUSY) { + blocked++; + } else { + rc = -EINVAL; + break; + } + + /* + * Verify we never get blocked to long in taskq_cancel_id(). + * The worst case is 10ms if we happen to cancel the task + * which is currently executing. We allow a factor of 2x. + */ + if (ddi_get_lbolt() - cancel > HZ / 50) { + splat_vprint(file, SPLAT_TASKQ_TEST10_NAME, + "Taskq '%s' cancel for %lu took %lu\n", + SPLAT_TASKQ_TEST10_NAME, (unsigned long)id, + ddi_get_lbolt() - cancel); + rc = -ETIMEDOUT; + break; + } + + get_random_bytes((void *)&rnd, 4); + msleep(1 + (rnd % 100)); + rc = 0; + } + + taskq_wait(tq); + + /* + * Cross check the results of taskq_cancel_id() with the number of + * times the dispatched function actually ran successfully. + */ + if ((rc == 0) && (nr_tasks - canceled != atomic_read(&count))) + rc = -EDOM; + + splat_vprint(file, SPLAT_TASKQ_TEST10_NAME, "Taskq '%s' %d attempts, " + "%d canceled, %d completed, %d blocked, %d/%d tasks run\n", + SPLAT_TASKQ_TEST10_NAME, i, canceled, completed, blocked, + atomic_read(&count), nr_tasks); + splat_vprint(file, SPLAT_TASKQ_TEST10_NAME, "Taskq '%s' destroying %d\n", + SPLAT_TASKQ_TEST10_NAME, rc); +out: + taskq_destroy(tq); +out_free: + for (j = 0; j < nr_tasks && tqas[j] != NULL; j++) + kmem_free(tqas[j], sizeof(splat_taskq_arg_t)); + vfree(tqas); + + return rc; +} + splat_subsystem_t * splat_taskq_init(void) { @@ -1240,6 +1420,8 @@ splat_taskq_init(void) SPLAT_TASKQ_TEST8_ID, splat_taskq_test8); SPLAT_TEST_INIT(sub, SPLAT_TASKQ_TEST9_NAME, SPLAT_TASKQ_TEST9_DESC, SPLAT_TASKQ_TEST9_ID, splat_taskq_test9); + SPLAT_TEST_INIT(sub, SPLAT_TASKQ_TEST10_NAME, SPLAT_TASKQ_TEST10_DESC, + SPLAT_TASKQ_TEST10_ID, splat_taskq_test10); return sub; } @@ -1248,6 +1430,7 @@ void splat_taskq_fini(splat_subsystem_t *sub) { ASSERT(sub); + SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST10_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST9_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST8_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST7_ID); From 94ff5d38e30e5c79a45099413ac642c94b55a619 Mon Sep 17 00:00:00 2001 From: Brian Behlendorf Date: Mon, 10 Dec 2012 15:24:39 -0800 Subject: [PATCH 06/11] splat taskq:order: Reduce stack frame The slightly increased size of the taskq_ent_t when debugging is enabled has pushed the taskq:order splat test over frame size limit. To resolve this dynamically allocate the taskq_ent_t structures so they are part of the heap instead of the stack. In function 'splat_taskq_test5_impl' error: the frame size of 1680 bytes is larger than 1024 bytes Signed-off-by: Brian Behlendorf --- module/splat/splat-taskq.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/module/splat/splat-taskq.c b/module/splat/splat-taskq.c index 673f4bd33..f05d1c0eb 100644 --- a/module/splat/splat-taskq.c +++ b/module/splat/splat-taskq.c @@ -669,9 +669,12 @@ splat_taskq_test5_impl(struct file *file, void *arg, boolean_t prealloc) splat_taskq_arg_t tq_arg; int order1[SPLAT_TASKQ_ORDER_MAX] = { 1,2,4,5,3,0,0,0 }; int order2[SPLAT_TASKQ_ORDER_MAX] = { 1,2,4,5,3,8,6,7 }; - taskq_ent_t tqes[SPLAT_TASKQ_ORDER_MAX]; + taskq_ent_t *tqes; int i, rc = 0; + tqes = kmem_alloc(sizeof(*tqes) * SPLAT_TASKQ_ORDER_MAX, KM_SLEEP); + memset(tqes, 0, sizeof(*tqes) * SPLAT_TASKQ_ORDER_MAX); + splat_vprint(file, SPLAT_TASKQ_TEST5_NAME, "Taskq '%s' creating (%s dispatch)\n", SPLAT_TASKQ_TEST5_NAME, @@ -738,6 +741,8 @@ out: "Taskq '%s' destroying\n", tq_arg.name); taskq_destroy(tq); + kmem_free(tqes, sizeof(*tqes) * SPLAT_TASKQ_ORDER_MAX); + return rc; } From a5a98e72605c071f94b9fdc4bf1811f8ed8d7f32 Mon Sep 17 00:00:00 2001 From: Brian Behlendorf Date: Mon, 10 Dec 2012 15:27:05 -0800 Subject: [PATCH 07/11] splat taskq:front: Reduce stack frame The slightly increased size of the taskq_ent_t when debugging is enabled has pushed the taskq:front splat test over frame size limit. To resolve this dynamically allocate the taskq_ent_t structures so they are part of the heap instead of the stack. In function 'splat_taskq_test6_impl' error: the frame size of 1648 bytes is larger than 1024 bytes Signed-off-by: Brian Behlendorf --- module/splat/splat-taskq.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/module/splat/splat-taskq.c b/module/splat/splat-taskq.c index f05d1c0eb..5a9681e21 100644 --- a/module/splat/splat-taskq.c +++ b/module/splat/splat-taskq.c @@ -828,10 +828,13 @@ splat_taskq_test6_impl(struct file *file, void *arg, boolean_t prealloc) splat_taskq_id_t tq_id[SPLAT_TASKQ_ORDER_MAX]; splat_taskq_arg_t tq_arg; int order[SPLAT_TASKQ_ORDER_MAX] = { 1,2,3,6,7,8,4,5 }; - taskq_ent_t tqes[SPLAT_TASKQ_ORDER_MAX]; + taskq_ent_t *tqes; int i, rc = 0; uint_t tflags; + tqes = kmem_alloc(sizeof(*tqes) * SPLAT_TASKQ_ORDER_MAX, KM_SLEEP); + memset(tqes, 0, sizeof(*tqes) * SPLAT_TASKQ_ORDER_MAX); + splat_vprint(file, SPLAT_TASKQ_TEST6_NAME, "Taskq '%s' creating (%s dispatch)\n", SPLAT_TASKQ_TEST6_NAME, @@ -899,6 +902,8 @@ out: "Taskq '%s' destroying\n", tq_arg.name); taskq_destroy(tq); + kmem_free(tqes, sizeof(*tqes) * SPLAT_TASKQ_ORDER_MAX); + return rc; } From 296a8e596dac344cf3af5e7f2dff5be12c979d80 Mon Sep 17 00:00:00 2001 From: Brian Behlendorf Date: Mon, 10 Dec 2012 11:01:08 -0800 Subject: [PATCH 08/11] kmem-cache: spl_kmem_cache_create() may always sleep When this code was originally written I went overboard and allowed for the possibility of creating a cache in an atomic context. In practice there are no callers which ever do this. This makes sense since a cache is by design a long lived data structure. To prevent abuse of this function going forward I'm removing the code which is supported to handle an atomic context. All allocators have been updated to use KM_SLEEP and the might_sleep() debug macro has been added to immediately detect atomic callers. Signed-off-by: Brian Behlendorf --- module/spl/spl-kmem.c | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/module/spl/spl-kmem.c b/module/spl/spl-kmem.c index b171d446a..f78f820aa 100644 --- a/module/spl/spl-kmem.c +++ b/module/spl/spl-kmem.c @@ -1482,7 +1482,7 @@ spl_kmem_cache_create(char *name, size_t size, size_t align, void *priv, void *vmp, int flags) { spl_kmem_cache_t *skc; - int rc, kmem_flags = KM_SLEEP; + int rc; SENTRY; ASSERTF(!(flags & KMC_NOMAGAZINE), "Bad KMC_NOMAGAZINE (%x)\n", flags); @@ -1490,25 +1490,22 @@ spl_kmem_cache_create(char *name, size_t size, size_t align, ASSERTF(!(flags & KMC_QCACHE), "Bad KMC_QCACHE (%x)\n", flags); ASSERT(vmp == NULL); - /* We may be called when there is a non-zero preempt_count or - * interrupts are disabled is which case we must not sleep. - */ - if (current_thread_info()->preempt_count || irqs_disabled()) - kmem_flags = KM_NOSLEEP; + might_sleep(); - /* Allocate memory for a new cache an initialize it. Unfortunately, + /* + * Allocate memory for a new cache an initialize it. Unfortunately, * this usually ends up being a large allocation of ~32k because * we need to allocate enough memory for the worst case number of * cpus in the magazine, skc_mag[NR_CPUS]. Because of this we - * explicitly pass KM_NODEBUG to suppress the kmem warning */ - skc = (spl_kmem_cache_t *)kmem_zalloc(sizeof(*skc), - kmem_flags | KM_NODEBUG); + * explicitly pass KM_NODEBUG to suppress the kmem warning + */ + skc = kmem_zalloc(sizeof(*skc), KM_SLEEP| KM_NODEBUG); if (skc == NULL) SRETURN(NULL); skc->skc_magic = SKC_MAGIC; skc->skc_name_size = strlen(name) + 1; - skc->skc_name = (char *)kmem_alloc(skc->skc_name_size, kmem_flags); + skc->skc_name = (char *)kmem_alloc(skc->skc_name_size, KM_SLEEP); if (skc->skc_name == NULL) { kmem_free(skc, sizeof(*skc)); SRETURN(NULL); From a10287e00d13c4c4dbbff14f42b00b03da363fcb Mon Sep 17 00:00:00 2001 From: Brian Behlendorf Date: Mon, 10 Dec 2012 10:53:46 -0800 Subject: [PATCH 09/11] kmem-cache: Use taskqs for ageing Shift the cache and magazine ageing functionality over to the new delayed taskq interfaces. This allows us to abandon the kernels delayed work queue interface and all the compatibility code it requires. However, the delayed taskq interface does not allow us to schedule a task for a specfic cpu so the ageing code was slightly reworked. The magazine ageing delay has been directly linked to the cache ageing function. The spl_cache_age() function invokes on_each_cpu() in order to run spl_magazine_age() on each cpu. It then blocks waiting for them to complete and promptly reclaims any free slabs. When restructing the code wasn't the primary goal I think the new code is far more understable and maintainable. It also should help minimize magazine thrashing because free slabs are immediately released after the magazine is aged. Signed-off-by: Brian Behlendorf --- include/sys/kmem.h | 4 +- module/spl/spl-kmem.c | 91 ++++++++++++++++++++++++------------------- 2 files changed, 52 insertions(+), 43 deletions(-) diff --git a/include/sys/kmem.h b/include/sys/kmem.h index 83adc8d2a..e189922ef 100644 --- a/include/sys/kmem.h +++ b/include/sys/kmem.h @@ -37,6 +37,7 @@ #include #include #include +#include /* * Memory allocation interfaces @@ -406,7 +407,6 @@ typedef struct spl_kmem_magazine { uint32_t skm_size; /* Magazine size */ uint32_t skm_refill; /* Batch refill size */ struct spl_kmem_cache *skm_cache; /* Owned by cache */ - struct delayed_work skm_work; /* Magazine reclaim work */ unsigned long skm_age; /* Last cache access */ unsigned int skm_cpu; /* Owned by cpu */ void *skm_objs[0]; /* Object pointers */ @@ -460,7 +460,7 @@ typedef struct spl_kmem_cache { uint32_t skc_delay; /* Slab reclaim interval */ uint32_t skc_reap; /* Slab reclaim count */ atomic_t skc_ref; /* Ref count callers */ - struct delayed_work skc_work; /* Slab reclaim work */ + taskqid_t skc_taskqid; /* Slab reclaim task */ struct list_head skc_list; /* List of caches linkage */ struct list_head skc_complete_list;/* Completely alloc'ed */ struct list_head skc_partial_list; /* Partially alloc'ed */ diff --git a/module/spl/spl-kmem.c b/module/spl/spl-kmem.c index f78f820aa..3900c9cf0 100644 --- a/module/spl/spl-kmem.c +++ b/module/spl/spl-kmem.c @@ -825,6 +825,7 @@ EXPORT_SYMBOL(vmem_free_debug); struct list_head spl_kmem_cache_list; /* List of caches */ struct rw_semaphore spl_kmem_cache_sem; /* Cache list lock */ +taskq_t *spl_kmem_cache_taskq; /* Task queue for ageing / reclaim */ static int spl_cache_flush(spl_kmem_cache_t *skc, spl_kmem_magazine_t *skm, int flush); @@ -1243,50 +1244,59 @@ spl_emergency_free(spl_kmem_cache_t *skc, void *obj) SRETURN(0); } -/* - * Called regularly on all caches to age objects out of the magazines - * which have not been access in skc->skc_delay seconds. This prevents - * idle magazines from holding memory which might be better used by - * other caches or parts of the system. The delay is present to - * prevent thrashing the magazine. - */ static void spl_magazine_age(void *data) { - spl_kmem_magazine_t *skm = - spl_get_work_data(data, spl_kmem_magazine_t, skm_work.work); - spl_kmem_cache_t *skc = skm->skm_cache; + spl_kmem_cache_t *skc = (spl_kmem_cache_t *)data; + spl_kmem_magazine_t *skm = skc->skc_mag[smp_processor_id()]; ASSERT(skm->skm_magic == SKM_MAGIC); - ASSERT(skc->skc_magic == SKC_MAGIC); - ASSERT(skc->skc_mag[skm->skm_cpu] == skm); + ASSERT(skm->skm_cpu == smp_processor_id()); - if (skm->skm_avail > 0 && - time_after(jiffies, skm->skm_age + skc->skc_delay * HZ)) - (void)spl_cache_flush(skc, skm, skm->skm_refill); - - if (!test_bit(KMC_BIT_DESTROY, &skc->skc_flags)) - schedule_delayed_work_on(skm->skm_cpu, &skm->skm_work, - skc->skc_delay / 3 * HZ); + if (skm->skm_avail > 0) + if (time_after(jiffies, skm->skm_age + skc->skc_delay * HZ)) + (void) spl_cache_flush(skc, skm, skm->skm_refill); } /* - * Called regularly to keep a downward pressure on the size of idle - * magazines and to release free slabs from the cache. This function - * never calls the registered reclaim function, that only occurs - * under memory pressure or with a direct call to spl_kmem_reap(). + * Called regularly to keep a downward pressure on the cache. + * + * Objects older than skc->skc_delay seconds in the per-cpu magazines will + * be returned to the caches. This is done to prevent idle magazines from + * holding memory which could be better used elsewhere. The delay is + * present to prevent thrashing the magazine. + * + * The newly released objects may result in empty partial slabs. Those + * slabs should be released to the system. Otherwise moving the objects + * out of the magazines is just wasted work. */ static void spl_cache_age(void *data) { - spl_kmem_cache_t *skc = - spl_get_work_data(data, spl_kmem_cache_t, skc_work.work); + spl_kmem_cache_t *skc = (spl_kmem_cache_t *)data; + taskqid_t id = 0; ASSERT(skc->skc_magic == SKC_MAGIC); + + atomic_inc(&skc->skc_ref); + spl_on_each_cpu(spl_magazine_age, skc, 1); spl_slab_reclaim(skc, skc->skc_reap, 0); - if (!test_bit(KMC_BIT_DESTROY, &skc->skc_flags)) - schedule_delayed_work(&skc->skc_work, skc->skc_delay / 3 * HZ); + while (!test_bit(KMC_BIT_DESTROY, &skc->skc_flags) && !id) { + id = taskq_dispatch_delay( + spl_kmem_cache_taskq, spl_cache_age, skc, TQ_SLEEP, + ddi_get_lbolt() + skc->skc_delay / 3 * HZ); + + /* Destroy issued after dispatch immediately cancel it */ + if (test_bit(KMC_BIT_DESTROY, &skc->skc_flags) && id) + taskq_cancel_id(spl_kmem_cache_taskq, id); + } + + spin_lock(&skc->skc_lock); + skc->skc_taskqid = id; + spin_unlock(&skc->skc_lock); + + atomic_dec(&skc->skc_ref); } /* @@ -1380,7 +1390,6 @@ spl_magazine_alloc(spl_kmem_cache_t *skc, int cpu) skm->skm_size = skc->skc_mag_size; skm->skm_refill = skc->skc_mag_refill; skm->skm_cache = skc; - spl_init_delayed_work(&skm->skm_work, spl_magazine_age, skm); skm->skm_age = jiffies; skm->skm_cpu = cpu; } @@ -1427,11 +1436,6 @@ spl_magazine_create(spl_kmem_cache_t *skc) } } - /* Only after everything is allocated schedule magazine work */ - for_each_online_cpu(i) - schedule_delayed_work_on(i, &skc->skc_mag[i]->skm_work, - skc->skc_delay / 3 * HZ); - SRETURN(0); } @@ -1566,8 +1570,9 @@ spl_kmem_cache_create(char *name, size_t size, size_t align, if (rc) SGOTO(out, rc); - spl_init_delayed_work(&skc->skc_work, spl_cache_age, skc); - schedule_delayed_work(&skc->skc_work, skc->skc_delay / 3 * HZ); + skc->skc_taskqid = taskq_dispatch_delay(spl_kmem_cache_taskq, + spl_cache_age, skc, TQ_SLEEP, + ddi_get_lbolt() + skc->skc_delay / 3 * HZ); down_write(&spl_kmem_cache_sem); list_add_tail(&skc->skc_list, &spl_kmem_cache_list); @@ -1600,7 +1605,7 @@ void spl_kmem_cache_destroy(spl_kmem_cache_t *skc) { DECLARE_WAIT_QUEUE_HEAD(wq); - int i; + taskqid_t id; SENTRY; ASSERT(skc->skc_magic == SKC_MAGIC); @@ -1609,13 +1614,14 @@ spl_kmem_cache_destroy(spl_kmem_cache_t *skc) list_del_init(&skc->skc_list); up_write(&spl_kmem_cache_sem); - /* Cancel any and wait for any pending delayed work */ + /* Cancel any and wait for any pending delayed tasks */ VERIFY(!test_and_set_bit(KMC_BIT_DESTROY, &skc->skc_flags)); - cancel_delayed_work_sync(&skc->skc_work); - for_each_online_cpu(i) - cancel_delayed_work_sync(&skc->skc_mag[i]->skm_work); - flush_scheduled_work(); + spin_lock(&skc->skc_lock); + id = skc->skc_taskqid; + spin_unlock(&skc->skc_lock); + + taskq_cancel_id(spl_kmem_cache_taskq, id); /* Wait until all current callers complete, this is mainly * to catch the case where a low memory situation triggers a @@ -2394,6 +2400,8 @@ spl_kmem_init(void) init_rwsem(&spl_kmem_cache_sem); INIT_LIST_HEAD(&spl_kmem_cache_list); + spl_kmem_cache_taskq = taskq_create("spl_kmem_cache", + 1, maxclsyspri, 1, 32, TASKQ_PREPOPULATE); spl_register_shrinker(&spl_kmem_cache_shrinker); @@ -2432,6 +2440,7 @@ spl_kmem_fini(void) SENTRY; spl_unregister_shrinker(&spl_kmem_cache_shrinker); + taskq_destroy(spl_kmem_cache_taskq); SEXIT; } From 33e94ef1dd2678e28a5fbdb80f4ce35fd8c85974 Mon Sep 17 00:00:00 2001 From: Brian Behlendorf Date: Mon, 10 Dec 2012 13:40:03 -0800 Subject: [PATCH 10/11] kmem-cache: Use a taskq for async allocations Shift the asynchronous allocations over to use the taskq interfaces. This allows us to abandon the kernels delayed work queue interface and all the compatibility code it requires. This code never actually used the delay functionality it was just done this way to leverage the existing compatibility code. All that is required is a thread context to perform the allocation in. The only thing clever in this change is that we take advantage of the preallocated task queue entries to avoid a memory allocation. Signed-off-by: Brian Behlendorf --- include/sys/kmem.h | 2 +- module/spl/spl-kmem.c | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/include/sys/kmem.h b/include/sys/kmem.h index e189922ef..6904bec3f 100644 --- a/include/sys/kmem.h +++ b/include/sys/kmem.h @@ -432,7 +432,7 @@ typedef struct spl_kmem_slab { typedef struct spl_kmem_alloc { struct spl_kmem_cache *ska_cache; /* Owned by cache */ int ska_flags; /* Allocation flags */ - struct delayed_work ska_work; /* Allocation work */ + taskq_ent_t ska_tqe; /* Task queue entry */ } spl_kmem_alloc_t; typedef struct spl_kmem_emergency { diff --git a/module/spl/spl-kmem.c b/module/spl/spl-kmem.c index 3900c9cf0..bc08a5598 100644 --- a/module/spl/spl-kmem.c +++ b/module/spl/spl-kmem.c @@ -1697,8 +1697,7 @@ spl_cache_obj(spl_kmem_cache_t *skc, spl_kmem_slab_t *sks) static void spl_cache_grow_work(void *data) { - spl_kmem_alloc_t *ska = - spl_get_work_data(data, spl_kmem_alloc_t, ska_work.work); + spl_kmem_alloc_t *ska = (spl_kmem_alloc_t *)data; spl_kmem_cache_t *skc = ska->ska_cache; spl_kmem_slab_t *sks; @@ -1777,8 +1776,9 @@ spl_cache_grow(spl_kmem_cache_t *skc, int flags, void **obj) atomic_inc(&skc->skc_ref); ska->ska_cache = skc; ska->ska_flags = flags & ~__GFP_FS; - spl_init_delayed_work(&ska->ska_work, spl_cache_grow_work, ska); - schedule_delayed_work(&ska->ska_work, 0); + taskq_init_ent(&ska->ska_tqe); + taskq_dispatch_ent(spl_kmem_cache_taskq, + spl_cache_grow_work, ska, 0, &ska->ska_tqe); } /* From eb0be2ed46d3f0eb01378458f421a88798608592 Mon Sep 17 00:00:00 2001 From: Brian Behlendorf Date: Mon, 10 Dec 2012 13:53:25 -0800 Subject: [PATCH 11/11] Removed SPL_AC_3ARGS_INIT_WORK check All consumers of the kernel delayed work queues have been shifted over to rely on the taskq implementation. This compatibility code can now be removed. Any new callers which need this functionality should use the taskq interfaces for delayed work items. Signed-off-by: Brian Behlendorf --- config/spl-build.m4 | 21 -------------- include/linux/workqueue_compat.h | 49 -------------------------------- include/sys/types.h | 1 - 3 files changed, 71 deletions(-) delete mode 100644 include/linux/workqueue_compat.h diff --git a/config/spl-build.m4 b/config/spl-build.m4 index e8ecbc654..ea25e206f 100644 --- a/config/spl-build.m4 +++ b/config/spl-build.m4 @@ -26,7 +26,6 @@ AC_DEFUN([SPL_AC_CONFIG_KERNEL], [ SPL_AC_TYPE_ATOMIC64_CMPXCHG SPL_AC_TYPE_ATOMIC64_XCHG SPL_AC_TYPE_UINTPTR_T - SPL_AC_3ARGS_INIT_WORK SPL_AC_2ARGS_REGISTER_SYSCTL SPL_AC_SET_SHRINKER SPL_AC_3ARGS_SHRINKER_CALLBACK @@ -870,26 +869,6 @@ AC_DEFUN([SPL_AC_TYPE_UINTPTR_T], ]) ]) -dnl # -dnl # 2.6.20 API change, -dnl # INIT_WORK use 2 args and not store data inside -dnl # -AC_DEFUN([SPL_AC_3ARGS_INIT_WORK], - [AC_MSG_CHECKING([whether INIT_WORK wants 3 args]) - SPL_LINUX_TRY_COMPILE([ - #include - ],[ - struct work_struct work __attribute__ ((unused)); - INIT_WORK(&work, NULL, NULL); - ],[ - AC_MSG_RESULT(yes) - AC_DEFINE(HAVE_3ARGS_INIT_WORK, 1, - [INIT_WORK wants 3 args]) - ],[ - AC_MSG_RESULT(no) - ]) -]) - dnl # dnl # 2.6.21 API change, dnl # 'register_sysctl_table' use only one argument instead of two diff --git a/include/linux/workqueue_compat.h b/include/linux/workqueue_compat.h deleted file mode 100644 index a92800ce5..000000000 --- a/include/linux/workqueue_compat.h +++ /dev/null @@ -1,49 +0,0 @@ -/*****************************************************************************\ - * Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC. - * Copyright (C) 2007 The Regents of the University of California. - * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). - * Written by Brian Behlendorf . - * UCRL-CODE-235197 - * - * This file is part of the SPL, Solaris Porting Layer. - * For details, see . - * - * The SPL is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * The SPL is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * for more details. - * - * You should have received a copy of the GNU General Public License along - * with the SPL. If not, see . -\*****************************************************************************/ - -#ifndef _SPL_WORKQUEUE_COMPAT_H -#define _SPL_WORKQUEUE_COMPAT_H - -#include -#include - -#ifdef HAVE_3ARGS_INIT_WORK - -#define delayed_work work_struct - -#define spl_init_work(wq, cb, d) INIT_WORK((wq), (void *)(cb), \ - (void *)(d)) -#define spl_init_delayed_work(wq,cb,d) INIT_WORK((wq), (void *)(cb), \ - (void *)(d)) -#define spl_get_work_data(d, t, f) (t *)(d) - -#else - -#define spl_init_work(wq, cb, d) INIT_WORK((wq), (void *)(cb)); -#define spl_init_delayed_work(wq,cb,d) INIT_DELAYED_WORK((wq), (void *)(cb)); -#define spl_get_work_data(d, t, f) (t *)container_of(d, t, f) - -#endif /* HAVE_3ARGS_INIT_WORK */ - -#endif /* _SPL_WORKQUEUE_COMPAT_H */ diff --git a/include/sys/types.h b/include/sys/types.h index 35905eb97..b867be111 100644 --- a/include/sys/types.h +++ b/include/sys/types.h @@ -34,7 +34,6 @@ #include #include #include -#include #include #include #include