New an improved taskq implementation for the SPL. It allows a

configurable number of threads like the Solaris version and almost
all of the options are supported.  Unfortunately, it appears to have
made absolutely no difference to our performance numbers.  I need
to keep looking for where we are bottle necking.



git-svn-id: https://outreach.scidac.gov/svn/spl/trunk@93 7e1ea52c-4ff2-0310-8f11-9dd32ca42a1c
This commit is contained in:
behlendo
2008-04-25 22:10:47 +00:00
parent 839d8b438e
commit bcd68186d8
5 changed files with 483 additions and 138 deletions
+16 -2
View File
@@ -28,6 +28,7 @@ typedef enum { CV_DEFAULT=0, CV_DRIVER } kcv_type_t;
static __inline__ void
cv_init(kcondvar_t *cvp, char *name, kcv_type_t type, void *arg)
{
ENTRY;
ASSERT(cvp);
ASSERT(type == CV_DEFAULT);
ASSERT(arg == NULL);
@@ -44,11 +45,14 @@ cv_init(kcondvar_t *cvp, char *name, kcv_type_t type, void *arg)
if (cvp->cv_name)
strcpy(cvp->cv_name, name);
}
EXIT;
}
static __inline__ void
cv_destroy(kcondvar_t *cvp)
{
ENTRY;
ASSERT(cvp);
ASSERT(cvp->cv_magic == CV_MAGIC);
spin_lock(&cvp->cv_lock);
@@ -60,12 +64,14 @@ cv_destroy(kcondvar_t *cvp)
memset(cvp, CV_POISON, sizeof(*cvp));
spin_unlock(&cvp->cv_lock);
EXIT;
}
static __inline__ void
cv_wait(kcondvar_t *cvp, kmutex_t *mtx)
{
DEFINE_WAIT(wait);
ENTRY;
ASSERT(cvp);
ASSERT(mtx);
@@ -93,6 +99,7 @@ cv_wait(kcondvar_t *cvp, kmutex_t *mtx)
atomic_dec(&cvp->cv_waiters);
finish_wait(&cvp->cv_event, &wait);
EXIT;
}
/* 'expire_time' argument is an absolute wall clock time in jiffies.
@@ -103,6 +110,7 @@ cv_timedwait(kcondvar_t *cvp, kmutex_t *mtx, clock_t expire_time)
{
DEFINE_WAIT(wait);
clock_t time_left;
ENTRY;
ASSERT(cvp);
ASSERT(mtx);
@@ -120,7 +128,7 @@ cv_timedwait(kcondvar_t *cvp, kmutex_t *mtx, clock_t expire_time)
/* XXX - Does not handle jiffie wrap properly */
time_left = expire_time - jiffies;
if (time_left <= 0)
return -1;
RETURN(-1);
prepare_to_wait_exclusive(&cvp->cv_event, &wait,
TASK_UNINTERRUPTIBLE);
@@ -136,12 +144,13 @@ cv_timedwait(kcondvar_t *cvp, kmutex_t *mtx, clock_t expire_time)
atomic_dec(&cvp->cv_waiters);
finish_wait(&cvp->cv_event, &wait);
return (time_left > 0 ? time_left : -1);
RETURN(time_left > 0 ? time_left : -1);
}
static __inline__ void
cv_signal(kcondvar_t *cvp)
{
ENTRY;
ASSERT(cvp);
ASSERT(cvp->cv_magic == CV_MAGIC);
@@ -151,6 +160,8 @@ cv_signal(kcondvar_t *cvp)
* the wait queue to ensure we don't race waking up processes. */
if (atomic_read(&cvp->cv_waiters) > 0)
wake_up(&cvp->cv_event);
EXIT;
}
static __inline__ void
@@ -158,10 +169,13 @@ cv_broadcast(kcondvar_t *cvp)
{
ASSERT(cvp);
ASSERT(cvp->cv_magic == CV_MAGIC);
ENTRY;
/* Wake_up_all() will wake up all waiters even those which
* have the WQ_FLAG_EXCLUSIVE flag set. */
if (atomic_read(&cvp->cv_waiters) > 0)
wake_up_all(&cvp->cv_event);
EXIT;
}
#endif /* _SPL_CONDVAR_H */
+16 -4
View File
@@ -36,6 +36,7 @@ typedef struct {
static __inline__ void
mutex_init(kmutex_t *mp, char *name, int type, void *ibc)
{
ENTRY;
ASSERT(mp);
ASSERT(ibc == NULL); /* XXX - Spin mutexes not needed */
ASSERT(type == MUTEX_DEFAULT); /* XXX - Only default type supported */
@@ -51,12 +52,14 @@ mutex_init(kmutex_t *mp, char *name, int type, void *ibc)
if (mp->km_name)
strcpy(mp->km_name, name);
}
EXIT;
}
#undef mutex_destroy
static __inline__ void
mutex_destroy(kmutex_t *mp)
{
ENTRY;
ASSERT(mp);
ASSERT(mp->km_magic == KM_MAGIC);
spin_lock(&mp->km_lock);
@@ -66,11 +69,13 @@ mutex_destroy(kmutex_t *mp)
memset(mp, KM_POISON, sizeof(*mp));
spin_unlock(&mp->km_lock);
EXIT;
}
static __inline__ void
mutex_enter(kmutex_t *mp)
{
ENTRY;
ASSERT(mp);
ASSERT(mp->km_magic == KM_MAGIC);
spin_lock(&mp->km_lock);
@@ -91,6 +96,7 @@ mutex_enter(kmutex_t *mp)
ASSERT(mp->km_owner == NULL);
mp->km_owner = current;
spin_unlock(&mp->km_lock);
EXIT;
}
/* Return 1 if we acquired the mutex, else zero. */
@@ -98,6 +104,7 @@ static __inline__ int
mutex_tryenter(kmutex_t *mp)
{
int rc;
ENTRY;
ASSERT(mp);
ASSERT(mp->km_magic == KM_MAGIC);
@@ -118,14 +125,16 @@ mutex_tryenter(kmutex_t *mp)
ASSERT(mp->km_owner == NULL);
mp->km_owner = current;
spin_unlock(&mp->km_lock);
return 1;
RETURN(1);
}
return 0;
RETURN(0);
}
static __inline__ void
mutex_exit(kmutex_t *mp)
{
ENTRY;
ASSERT(mp);
ASSERT(mp->km_magic == KM_MAGIC);
spin_lock(&mp->km_lock);
@@ -134,6 +143,7 @@ mutex_exit(kmutex_t *mp)
mp->km_owner = NULL;
spin_unlock(&mp->km_lock);
up(&mp->km_sem);
EXIT;
}
/* Return 1 if mutex is held by current process, else zero. */
@@ -141,6 +151,7 @@ static __inline__ int
mutex_owned(kmutex_t *mp)
{
int rc;
ENTRY;
ASSERT(mp);
ASSERT(mp->km_magic == KM_MAGIC);
@@ -148,7 +159,7 @@ mutex_owned(kmutex_t *mp)
rc = (mp->km_owner == current);
spin_unlock(&mp->km_lock);
return rc;
RETURN(rc);
}
/* Return owner if mutex is owned, else NULL. */
@@ -156,6 +167,7 @@ static __inline__ kthread_t *
mutex_owner(kmutex_t *mp)
{
kthread_t *thr;
ENTRY;
ASSERT(mp);
ASSERT(mp->km_magic == KM_MAGIC);
@@ -163,7 +175,7 @@ mutex_owner(kmutex_t *mp)
thr = mp->km_owner;
spin_unlock(&mp->km_lock);
return thr;
RETURN(thr);
}
#ifdef __cplusplus
+48 -55
View File
@@ -5,82 +5,75 @@
extern "C" {
#endif
/*
* Task Queues - As of linux 2.6.x task queues have been replaced by a
* similar construct called work queues. The big difference on the linux
* side is that functions called from work queues run in process context
* and not interrupt context.
*
* One nice feature of Solaris which does not exist in linux work
* queues in the notion of a dynamic work queue. Rather than implementing
* this in the shim layer I'm hardcoding one-thread per work queue.
*
* XXX - This may end up being a significant performance penalty which
* forces us to implement dynamic workqueues. Which is all very doable
* with a little effort.
*/
#include <linux/module.h>
#include <linux/workqueue.h>
#include <linux/gfp.h>
#include <linux/slab.h>
#include <linux/interrupt.h>
#include <linux/kthread.h>
#include <sys/types.h>
#include <sys/kmem.h>
#undef DEBUG_TASKQ_UNIMPLEMENTED
#define TASKQ_NAMELEN 31
#define TASKQ_NAMELEN 31
#define taskq_t workq_t
#define TASKQ_PREPOPULATE 0x00000001
#define TASKQ_CPR_SAFE 0x00000002
#define TASKQ_DYNAMIC 0x00000004
typedef struct workqueue_struct workq_t;
typedef unsigned long taskqid_t;
typedef void (*task_func_t)(void *);
/*
* Public flags for taskq_create(): bit range 0-15
*/
#define TASKQ_PREPOPULATE 0x0000 /* XXX - Workqueues fully populate */
#define TASKQ_CPR_SAFE 0x0000 /* XXX - No analog */
#define TASKQ_DYNAMIC 0x0000 /* XXX - Worksqueues not dynamic */
typedef void (task_func_t)(void *);
/*
* Flags for taskq_dispatch. TQ_SLEEP/TQ_NOSLEEP should be same as
* KM_SLEEP/KM_NOSLEEP.
* KM_SLEEP/KM_NOSLEEP. TQ_NOQUEUE/TQ_NOALLOC are set particularly
* large so as not to conflict with already used GFP_* defines.
*/
#define TQ_SLEEP 0x00 /* XXX - Workqueues don't support */
#define TQ_NOSLEEP 0x00 /* these sorts of flags. They */
#define TQ_NOQUEUE 0x00 /* always run in application */
#define TQ_NOALLOC 0x00 /* context and can sleep. */
#define TQ_SLEEP KM_SLEEP
#define TQ_NOSLEEP KM_NOSLEEP
#define TQ_NOQUEUE 0x01000000
#define TQ_NOALLOC 0x02000000
#define TQ_NEW 0x04000000
#define TQ_ACTIVE 0x80000000
typedef struct task {
spinlock_t t_lock;
struct list_head t_list;
taskqid_t t_id;
task_func_t *t_func;
void *t_arg;
} task_t;
#ifdef DEBUG_TASKQ_UNIMPLEMENTED
static __inline__ void taskq_init(void) {
#error "taskq_init() not implemented"
}
static __inline__ taskq_t *
taskq_create_instance(const char *, int, int, pri_t, int, int, uint_t) {
#error "taskq_create_instance() not implemented"
}
extern void nulltask(void *);
extern void taskq_suspend(taskq_t *);
extern int taskq_suspended(taskq_t *);
extern void taskq_resume(taskq_t *);
#endif /* DEBUG_TASKQ_UNIMPLEMENTED */
typedef struct taskq {
spinlock_t tq_lock; /* protects taskq_t */
struct task_struct **tq_threads; /* thread pointers */
const char *tq_name; /* taskq name */
int tq_nactive; /* # of active threads */
int tq_nthreads; /* # of total threads */
int tq_pri; /* priority */
int tq_minalloc; /* min task_t pool size */
int tq_maxalloc; /* max task_t pool size */
int tq_nalloc; /* cur task_t pool size */
uint_t tq_flags; /* flags */
taskqid_t tq_next_id; /* next pend/work id */
taskqid_t tq_lowest_id; /* lowest pend/work id */
struct list_head tq_free_list; /* free task_t's */
struct list_head tq_work_list; /* work task_t's */
struct list_head tq_pend_list; /* pending task_t's */
wait_queue_head_t tq_work_waitq; /* new work waitq */
wait_queue_head_t tq_wait_waitq; /* wait waitq */
} taskq_t;
extern taskqid_t __taskq_dispatch(taskq_t *, task_func_t, void *, uint_t);
extern taskq_t *__taskq_create(const char *, int, pri_t, int, int, uint_t);
extern void __taskq_destroy(taskq_t *);
extern void __taskq_wait(taskq_t *);
extern int __taskq_member(taskq_t *, void *);
#define taskq_create(name, thr, pri, min, max, flags) \
__taskq_create(name, thr, pri, min, max, flags)
#define taskq_dispatch(tq, func, priv, flags) \
__taskq_dispatch(tq, (task_func_t)func, priv, flags)
#define taskq_destroy(tq) __taskq_destroy(tq)
#define taskq_wait(tq) __taskq_wait(tq)
#define taskq_member(tq, kthr) 1 /* XXX -Just be true */
#define taskq_member(tq, t) __taskq_member(tq, t)
#define taskq_wait_id(tq, id) __taskq_wait_id(tq, id)
#define taskq_wait(tq) __taskq_wait(tq)
#define taskq_dispatch(tq, f, p, fl) __taskq_dispatch(tq, f, p, fl)
#define taskq_create(n, th, p, mi, ma, fl) __taskq_create(n, th, p, mi, ma, fl)
#define taskq_destroy(tq) __taskq_destroy(tq)
#ifdef __cplusplus
}