diff --git a/cmd/zed/agents/zfs_mod.c b/cmd/zed/agents/zfs_mod.c index 57d6f34ba..76d2909c0 100644 --- a/cmd/zed/agents/zfs_mod.c +++ b/cmd/zed/agents/zfs_mod.c @@ -82,7 +82,7 @@ #include #include #include -#include +#include #include #include #include @@ -98,7 +98,7 @@ typedef void (*zfs_process_func_t)(zpool_handle_t *, nvlist_t *, boolean_t); libzfs_handle_t *g_zfshdl; list_t g_pool_list; /* list of unavailable pools at initialization */ list_t g_device_list; /* list of disks with asynchronous label request */ -tpool_t *g_tpool; +taskq_t *g_taskq; boolean_t g_enumeration_done; pthread_t g_zfs_tid; /* zfs_enum_pools() thread */ @@ -749,8 +749,8 @@ zfs_iter_pool(zpool_handle_t *zhp, void *data) continue; if (zfs_toplevel_state(zhp) >= VDEV_STATE_DEGRADED) { list_remove(&g_pool_list, pool); - (void) tpool_dispatch(g_tpool, zfs_enable_ds, - pool); + (void) taskq_dispatch(g_taskq, zfs_enable_ds, + pool, TQ_SLEEP); break; } } @@ -1347,9 +1347,9 @@ zfs_slm_fini(void) /* wait for zfs_enum_pools thread to complete */ (void) pthread_join(g_zfs_tid, NULL); /* destroy the thread pool */ - if (g_tpool != NULL) { - tpool_wait(g_tpool); - tpool_destroy(g_tpool); + if (g_taskq != NULL) { + taskq_wait(g_taskq); + taskq_destroy(g_taskq); } while ((pool = list_remove_head(&g_pool_list)) != NULL) { diff --git a/cmd/zpool/zpool_iter.c b/cmd/zpool/zpool_iter.c index f479eec9c..5a6e3e65a 100644 --- a/cmd/zpool/zpool_iter.c +++ b/cmd/zpool/zpool_iter.c @@ -34,7 +34,6 @@ #include #include #include -#include #include #include @@ -653,21 +652,21 @@ all_pools_for_each_vdev_gather_cb(zpool_handle_t *zhp, void *cb_vcdl) static void all_pools_for_each_vdev_run_vcdl(vdev_cmd_data_list_t *vcdl) { - tpool_t *t; - - t = tpool_create(1, 5 * sysconf(_SC_NPROCESSORS_ONLN), 0, NULL); - if (t == NULL) + taskq_t *tq = taskq_create("vdev_run_cmd", + 5 * sysconf(_SC_NPROCESSORS_ONLN), minclsyspri, 1, INT_MAX, + TASKQ_DYNAMIC); + if (tq == NULL) return; /* Spawn off the command for each vdev */ for (int i = 0; i < vcdl->count; i++) { - (void) tpool_dispatch(t, vdev_run_cmd_thread, - (void *) &vcdl->data[i]); + (void) taskq_dispatch(tq, vdev_run_cmd_thread, + (void *) &vcdl->data[i], TQ_SLEEP); } /* Wait for threads to finish */ - tpool_wait(t); - tpool_destroy(t); + taskq_wait(tq); + taskq_destroy(tq); } /* diff --git a/cmd/zpool/zpool_main.c b/cmd/zpool/zpool_main.c index 8c5556c5f..94708162c 100644 --- a/cmd/zpool/zpool_main.c +++ b/cmd/zpool/zpool_main.c @@ -52,7 +52,6 @@ #include #include #include -#include #include #include #include @@ -2389,7 +2388,7 @@ zpool_do_destroy(int argc, char **argv) } typedef struct export_cbdata { - tpool_t *tpool; + taskq_t *taskq; pthread_mutex_t mnttab_lock; boolean_t force; boolean_t hardforce; @@ -2414,12 +2413,12 @@ zpool_export_one(zpool_handle_t *zhp, void *data) * zpool_disable_datasets() is not thread-safe for mnttab access. * So we serialize access here for 'zpool export -a' parallel case. */ - if (cb->tpool != NULL) + if (cb->taskq != NULL) (void) pthread_mutex_lock(&cb->mnttab_lock); int retval = zpool_disable_datasets(zhp, cb->force); - if (cb->tpool != NULL) + if (cb->taskq != NULL) (void) pthread_mutex_unlock(&cb->mnttab_lock); if (retval) @@ -2463,7 +2462,7 @@ zpool_export_task(void *arg) static int zpool_export_one_async(zpool_handle_t *zhp, void *data) { - tpool_t *tpool = ((export_cbdata_t *)data)->tpool; + taskq_t *tq = ((export_cbdata_t *)data)->taskq; async_export_args_t *aea = safe_malloc(sizeof (async_export_args_t)); /* save pool name since zhp will go out of scope */ @@ -2471,7 +2470,8 @@ zpool_export_one_async(zpool_handle_t *zhp, void *data) aea->aea_cbdata = data; /* ship off actual export to another thread */ - if (tpool_dispatch(tpool, zpool_export_task, (void *)aea) != 0) + if (taskq_dispatch(tq, zpool_export_task, (void *)aea, + TQ_SLEEP) == TASKQID_INVALID) return (errno); /* unlikely */ else return (0); @@ -2517,7 +2517,7 @@ zpool_do_export(int argc, char **argv) cb.force = force; cb.hardforce = hardforce; - cb.tpool = NULL; + cb.taskq = NULL; cb.retval = 0; argc -= optind; argv += optind; @@ -2531,16 +2531,17 @@ zpool_do_export(int argc, char **argv) usage(B_FALSE); } - cb.tpool = tpool_create(1, 5 * sysconf(_SC_NPROCESSORS_ONLN), - 0, NULL); + cb.taskq = taskq_create("zpool_export", + 5 * sysconf(_SC_NPROCESSORS_ONLN), minclsyspri, 1, INT_MAX, + TASKQ_DYNAMIC); (void) pthread_mutex_init(&cb.mnttab_lock, NULL); /* Asynchronously call zpool_export_one using thread pool */ ret = for_each_pool(argc, argv, B_TRUE, NULL, ZFS_TYPE_POOL, B_FALSE, zpool_export_one_async, &cb); - tpool_wait(cb.tpool); - tpool_destroy(cb.tpool); + taskq_wait(cb.taskq); + taskq_destroy(cb.taskq); (void) pthread_mutex_destroy(&cb.mnttab_lock); return (ret | cb.retval); @@ -3945,10 +3946,11 @@ import_pools(nvlist_t *pools, nvlist_t *props, char *mntopts, int flags, uint_t npools = 0; - tpool_t *tp = NULL; + taskq_t *tq = NULL; if (import->do_all) { - tp = tpool_create(1, 5 * sysconf(_SC_NPROCESSORS_ONLN), - 0, NULL); + tq = taskq_create("zpool_import_all", + 5 * sysconf(_SC_NPROCESSORS_ONLN), minclsyspri, 1, INT_MAX, + TASKQ_DYNAMIC); } /* @@ -3997,8 +3999,8 @@ import_pools(nvlist_t *pools, nvlist_t *props, char *mntopts, int flags, ip->ip_mntthreads = mount_tp_nthr / npools; ip->ip_err = &err; - (void) tpool_dispatch(tp, do_import_task, - (void *)ip); + (void) taskq_dispatch(tq, do_import_task, + (void *)ip, TQ_SLEEP); } else { /* * If we're importing from cachefile, then @@ -4047,8 +4049,8 @@ import_pools(nvlist_t *pools, nvlist_t *props, char *mntopts, int flags, } } if (import->do_all) { - tpool_wait(tp); - tpool_destroy(tp); + taskq_wait(tq); + taskq_destroy(tq); } /* diff --git a/include/Makefile.am b/include/Makefile.am index 7917b3393..cc79c1a3d 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -189,8 +189,7 @@ USER_H = \ libzfs_core.h \ libzfsbootenv.h \ libzpool.h \ - libzutil.h \ - thread_pool.h + libzutil.h if CONFIG_USER diff --git a/include/thread_pool.h b/include/thread_pool.h deleted file mode 100644 index b5ef51146..000000000 --- a/include/thread_pool.h +++ /dev/null @@ -1,56 +0,0 @@ -// SPDX-License-Identifier: CDDL-1.0 -/* - * CDDL HEADER START - * - * The contents of this file are subject to the terms of the - * Common Development and Distribution License (the "License"). - * You may not use this file except in compliance with the License. - * - * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE - * or https://opensource.org/licenses/CDDL-1.0. - * See the License for the specific language governing permissions - * and limitations under the License. - * - * When distributing Covered Code, include this CDDL HEADER in each - * file and include the License file at usr/src/OPENSOLARIS.LICENSE. - * If applicable, add the following below this CDDL HEADER, with the - * fields enclosed by brackets "[]" replaced with your own identifying - * information: Portions Copyright [yyyy] [name of copyright owner] - * - * CDDL HEADER END - */ - -/* - * Copyright 2006 Sun Microsystems, Inc. All rights reserved. - * Use is subject to license terms. - */ - -#ifndef _THREAD_POOL_H_ -#define _THREAD_POOL_H_ extern __attribute__((visibility("default"))) - -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct tpool tpool_t; /* opaque thread pool descriptor */ - -_THREAD_POOL_H_ tpool_t *tpool_create(uint_t min_threads, uint_t max_threads, - uint_t linger, pthread_attr_t *attr); -_THREAD_POOL_H_ int tpool_dispatch(tpool_t *tpool, - void (*func)(void *), void *arg); -_THREAD_POOL_H_ void tpool_destroy(tpool_t *tpool); -_THREAD_POOL_H_ void tpool_abandon(tpool_t *tpool); -_THREAD_POOL_H_ void tpool_wait(tpool_t *tpool); -_THREAD_POOL_H_ void tpool_suspend(tpool_t *tpool); -_THREAD_POOL_H_ int tpool_suspended(tpool_t *tpool); -_THREAD_POOL_H_ void tpool_resume(tpool_t *tpool); -_THREAD_POOL_H_ int tpool_member(tpool_t *tpool); - -#ifdef __cplusplus -} -#endif - -#endif /* _THREAD_POOL_H_ */ diff --git a/lib/Makefile.am b/lib/Makefile.am index 7ec500467..bb7e4ada4 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -24,10 +24,10 @@ # libunicode --/ \ / \ \-------\ \ # \ / \ \ | # libzutil libzfs_core* | | -# | | | | \ | | | | -# | | | | | | | | | -# | | | | | | | | | -# libtpool -------------/ | | | \---- libnvpair* | | | +# | | | \ | | | | +# | | | | | | | | +# | | | | | | | | +# | | | \---- libnvpair* | | | # | | | | | | # libefi -----------------/ | \------ libavl* --------/ | # | | | @@ -58,7 +58,6 @@ include $(srcdir)/%D%/libicp/Makefile.am include $(srcdir)/%D%/libnvpair/Makefile.am include $(srcdir)/%D%/libshare/Makefile.am include $(srcdir)/%D%/libspl/Makefile.am -include $(srcdir)/%D%/libtpool/Makefile.am include $(srcdir)/%D%/libunicode/Makefile.am include $(srcdir)/%D%/libzdb/Makefile.am include $(srcdir)/%D%/libzfs_core/Makefile.am diff --git a/lib/libspl/include/sys/taskq.h b/lib/libspl/include/sys/taskq.h index fbe3f388c..74a18fa50 100644 --- a/lib/libspl/include/sys/taskq.h +++ b/lib/libspl/include/sys/taskq.h @@ -31,6 +31,8 @@ #include #include +#include +#include #include #include #include diff --git a/lib/libtpool/Makefile.am b/lib/libtpool/Makefile.am deleted file mode 100644 index 5a2b8a570..000000000 --- a/lib/libtpool/Makefile.am +++ /dev/null @@ -1,11 +0,0 @@ -libtpool_la_CFLAGS = $(AM_CFLAGS) $(LIBRARY_CFLAGS) -libtpool_la_CFLAGS += -fvisibility=hidden -# https://gcc.gnu.org/bugzilla/show_bug.cgi?id=61118 -libtpool_la_CFLAGS += $(NO_CLOBBERED) - -noinst_LTLIBRARIES += libtpool.la -CPPCHECKTARGETS += libtpool.la - -libtpool_la_SOURCES = \ - %D%/thread_pool.c \ - %D%/thread_pool_impl.h diff --git a/lib/libtpool/thread_pool.c b/lib/libtpool/thread_pool.c deleted file mode 100644 index 39b92ae81..000000000 --- a/lib/libtpool/thread_pool.c +++ /dev/null @@ -1,612 +0,0 @@ -// SPDX-License-Identifier: CDDL-1.0 -/* - * CDDL HEADER START - * - * The contents of this file are subject to the terms of the - * Common Development and Distribution License (the "License"). - * You may not use this file except in compliance with the License. - * - * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE - * or https://opensource.org/licenses/CDDL-1.0. - * See the License for the specific language governing permissions - * and limitations under the License. - * - * When distributing Covered Code, include this CDDL HEADER in each - * file and include the License file at usr/src/OPENSOLARIS.LICENSE. - * If applicable, add the following below this CDDL HEADER, with the - * fields enclosed by brackets "[]" replaced with your own identifying - * information: Portions Copyright [yyyy] [name of copyright owner] - * - * CDDL HEADER END - */ - -/* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved. - * Use is subject to license terms. - */ - -#include -#include -#include -#include -#include -#include "thread_pool_impl.h" - -static pthread_mutex_t thread_pool_lock = PTHREAD_MUTEX_INITIALIZER; -static tpool_t *thread_pools = NULL; - -static void -delete_pool(tpool_t *tpool) -{ - tpool_job_t *job; - - ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL); - - /* - * Unlink the pool from the global list of all pools. - */ - (void) pthread_mutex_lock(&thread_pool_lock); - if (thread_pools == tpool) - thread_pools = tpool->tp_forw; - if (thread_pools == tpool) - thread_pools = NULL; - else { - tpool->tp_back->tp_forw = tpool->tp_forw; - tpool->tp_forw->tp_back = tpool->tp_back; - } - pthread_mutex_unlock(&thread_pool_lock); - - /* - * There should be no pending jobs, but just in case... - */ - for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) { - tpool->tp_head = job->tpj_next; - free(job); - } - (void) pthread_attr_destroy(&tpool->tp_attr); - free(tpool); -} - -/* - * Worker thread is terminating. - */ -static void -worker_cleanup(void *arg) -{ - tpool_t *tpool = (tpool_t *)arg; - - if (--tpool->tp_current == 0 && - (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { - if (tpool->tp_flags & TP_ABANDON) { - pthread_mutex_unlock(&tpool->tp_mutex); - delete_pool(tpool); - return; - } - if (tpool->tp_flags & TP_DESTROY) - (void) pthread_cond_broadcast(&tpool->tp_busycv); - } - pthread_mutex_unlock(&tpool->tp_mutex); -} - -static void -notify_waiters(tpool_t *tpool) -{ - if (tpool->tp_head == NULL && tpool->tp_active == NULL) { - tpool->tp_flags &= ~TP_WAIT; - (void) pthread_cond_broadcast(&tpool->tp_waitcv); - } -} - -/* - * Called by a worker thread on return from a tpool_dispatch()d job. - */ -static void -job_cleanup(void *arg) -{ - tpool_t *tpool = (tpool_t *)arg; - - pthread_t my_tid = pthread_self(); - tpool_active_t *activep; - tpool_active_t **activepp; - - pthread_mutex_lock(&tpool->tp_mutex); - for (activepp = &tpool->tp_active; ; activepp = &activep->tpa_next) { - activep = *activepp; - if (activep->tpa_tid == my_tid) { - *activepp = activep->tpa_next; - break; - } - } - if (tpool->tp_flags & TP_WAIT) - notify_waiters(tpool); -} - -static void * -tpool_worker(void *arg) -{ - tpool_t *tpool = (tpool_t *)arg; - int elapsed; - tpool_job_t *job; - void (*func)(void *); - tpool_active_t active; - - pthread_mutex_lock(&tpool->tp_mutex); - pthread_cleanup_push(worker_cleanup, tpool); - - /* - * This is the worker's main loop. - * It will only be left if a timeout or an error has occurred. - */ - active.tpa_tid = pthread_self(); - for (;;) { - elapsed = 0; - tpool->tp_idle++; - if (tpool->tp_flags & TP_WAIT) - notify_waiters(tpool); - while ((tpool->tp_head == NULL || - (tpool->tp_flags & TP_SUSPEND)) && - !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { - if (tpool->tp_current <= tpool->tp_minimum || - tpool->tp_linger == 0) { - (void) pthread_cond_wait(&tpool->tp_workcv, - &tpool->tp_mutex); - } else { - struct timespec ts; - - clock_gettime(CLOCK_REALTIME, &ts); - ts.tv_sec += tpool->tp_linger; - - if (pthread_cond_timedwait(&tpool->tp_workcv, - &tpool->tp_mutex, &ts) != 0) { - elapsed = 1; - break; - } - } - } - tpool->tp_idle--; - if (tpool->tp_flags & TP_DESTROY) - break; - if (tpool->tp_flags & TP_ABANDON) { - /* can't abandon a suspended pool */ - if (tpool->tp_flags & TP_SUSPEND) { - tpool->tp_flags &= ~TP_SUSPEND; - (void) pthread_cond_broadcast( - &tpool->tp_workcv); - } - if (tpool->tp_head == NULL) - break; - } - if ((job = tpool->tp_head) != NULL && - !(tpool->tp_flags & TP_SUSPEND)) { - elapsed = 0; - func = job->tpj_func; - arg = job->tpj_arg; - tpool->tp_head = job->tpj_next; - if (job == tpool->tp_tail) - tpool->tp_tail = NULL; - tpool->tp_njobs--; - active.tpa_next = tpool->tp_active; - tpool->tp_active = &active; - pthread_mutex_unlock(&tpool->tp_mutex); - pthread_cleanup_push(job_cleanup, tpool); - free(job); - - sigset_t maskset; - (void) pthread_sigmask(SIG_SETMASK, NULL, &maskset); - - /* - * Call the specified function. - */ - func(arg); - /* - * We don't know what this thread has been doing, - * so we reset its signal mask and cancellation - * state back to the values prior to calling func(). - */ - (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL); - (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, - NULL); - (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, - NULL); - pthread_cleanup_pop(1); - } - if (elapsed && tpool->tp_current > tpool->tp_minimum) { - /* - * We timed out and there is no work to be done - * and the number of workers exceeds the minimum. - * Exit now to reduce the size of the pool. - */ - break; - } - } - pthread_cleanup_pop(1); - return (arg); -} - -/* - * Create a worker thread, with default signals blocked. - */ -static int -create_worker(tpool_t *tpool) -{ - pthread_t thread; - sigset_t oset; - int error; - - (void) pthread_sigmask(SIG_SETMASK, NULL, &oset); - error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool); - (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); - return (error); -} - - -/* - * pthread_attr_clone: make a copy of a pthread_attr_t. When old_attr - * is NULL initialize the cloned attr using default values. - */ -static int -pthread_attr_clone(pthread_attr_t *attr, const pthread_attr_t *old_attr) -{ - int error; - - error = pthread_attr_init(attr); - if (error || (old_attr == NULL)) - return (error); - -#ifdef __GLIBC__ - cpu_set_t cpuset; - size_t cpusetsize = sizeof (cpuset); - error = pthread_attr_getaffinity_np(old_attr, cpusetsize, &cpuset); - if (error == 0) - error = pthread_attr_setaffinity_np(attr, cpusetsize, &cpuset); - if (error) - goto error; -#endif /* __GLIBC__ */ - - int detachstate; - error = pthread_attr_getdetachstate(old_attr, &detachstate); - if (error == 0) - error = pthread_attr_setdetachstate(attr, detachstate); - if (error) - goto error; - - size_t guardsize; - error = pthread_attr_getguardsize(old_attr, &guardsize); - if (error == 0) - error = pthread_attr_setguardsize(attr, guardsize); - if (error) - goto error; - - int inheritsched; - error = pthread_attr_getinheritsched(old_attr, &inheritsched); - if (error == 0) - error = pthread_attr_setinheritsched(attr, inheritsched); - if (error) - goto error; - - struct sched_param param; - error = pthread_attr_getschedparam(old_attr, ¶m); - if (error == 0) - error = pthread_attr_setschedparam(attr, ¶m); - if (error) - goto error; - - int policy; - error = pthread_attr_getschedpolicy(old_attr, &policy); - if (error == 0) - error = pthread_attr_setschedpolicy(attr, policy); - if (error) - goto error; - - int scope; - error = pthread_attr_getscope(old_attr, &scope); - if (error == 0) - error = pthread_attr_setscope(attr, scope); - if (error) - goto error; - - void *stackaddr; - size_t stacksize; - error = pthread_attr_getstack(old_attr, &stackaddr, &stacksize); - if (error == 0) - error = pthread_attr_setstack(attr, stackaddr, stacksize); - if (error) - goto error; - - return (0); -error: - pthread_attr_destroy(attr); - return (error); -} - -tpool_t * -tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger, - pthread_attr_t *attr) -{ - tpool_t *tpool; - void *stackaddr; - size_t stacksize; - size_t minstack; - int error; - - if (min_threads > max_threads || max_threads < 1) { - errno = EINVAL; - return (NULL); - } - if (attr != NULL) { - if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) { - errno = EINVAL; - return (NULL); - } - /* - * Allow only one thread in the pool with a specified stack. - * Require threads to have at least the minimum stack size. - */ - minstack = PTHREAD_STACK_MIN; - if (stackaddr != NULL) { - if (stacksize < minstack || max_threads != 1) { - errno = EINVAL; - return (NULL); - } - } else if (stacksize != 0 && stacksize < minstack) { - errno = EINVAL; - return (NULL); - } - } - - tpool = calloc(1, sizeof (*tpool)); - if (tpool == NULL) { - errno = ENOMEM; - return (NULL); - } - (void) pthread_mutex_init(&tpool->tp_mutex, NULL); - (void) pthread_cond_init(&tpool->tp_busycv, NULL); - (void) pthread_cond_init(&tpool->tp_workcv, NULL); - (void) pthread_cond_init(&tpool->tp_waitcv, NULL); - tpool->tp_minimum = min_threads; - tpool->tp_maximum = max_threads; - tpool->tp_linger = linger; - - /* - * We cannot just copy the attribute pointer. - * We need to initialize a new pthread_attr_t structure - * with the values from the user-supplied pthread_attr_t. - * If the attribute pointer is NULL, we need to initialize - * the new pthread_attr_t structure with default values. - */ - error = pthread_attr_clone(&tpool->tp_attr, attr); - if (error) { - free(tpool); - errno = error; - return (NULL); - } - - /* make all pool threads be detached daemon threads */ - (void) pthread_attr_setdetachstate(&tpool->tp_attr, - PTHREAD_CREATE_DETACHED); - - /* insert into the global list of all thread pools */ - pthread_mutex_lock(&thread_pool_lock); - if (thread_pools == NULL) { - tpool->tp_forw = tpool; - tpool->tp_back = tpool; - thread_pools = tpool; - } else { - thread_pools->tp_back->tp_forw = tpool; - tpool->tp_forw = thread_pools; - tpool->tp_back = thread_pools->tp_back; - thread_pools->tp_back = tpool; - } - pthread_mutex_unlock(&thread_pool_lock); - - return (tpool); -} - -/* - * Dispatch a work request to the thread pool. - * If there are idle workers, awaken one. - * Else, if the maximum number of workers has - * not been reached, spawn a new worker thread. - * Else just return with the job added to the queue. - */ -int -tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg) -{ - tpool_job_t *job; - - ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); - - if ((job = calloc(1, sizeof (*job))) == NULL) - return (-1); - job->tpj_next = NULL; - job->tpj_func = func; - job->tpj_arg = arg; - - pthread_mutex_lock(&tpool->tp_mutex); - - if (!(tpool->tp_flags & TP_SUSPEND)) { - if (tpool->tp_idle > 0) - (void) pthread_cond_signal(&tpool->tp_workcv); - else if (tpool->tp_current >= tpool->tp_maximum) { - /* At worker limit. Leave task on queue */ - } else { - if (create_worker(tpool) == 0) { - /* Started a new worker thread */ - tpool->tp_current++; - } else if (tpool->tp_current > 0) { - /* Leave task on queue */ - } else { - /* Cannot start a single worker! */ - pthread_mutex_unlock(&tpool->tp_mutex); - free(job); - return (-1); - } - } - } - - if (tpool->tp_head == NULL) - tpool->tp_head = job; - else - tpool->tp_tail->tpj_next = job; - tpool->tp_tail = job; - tpool->tp_njobs++; - - pthread_mutex_unlock(&tpool->tp_mutex); - return (0); -} - -static void -tpool_cleanup(void *arg) -{ - tpool_t *tpool = (tpool_t *)arg; - - pthread_mutex_unlock(&tpool->tp_mutex); -} - -/* - * Assumes: by the time tpool_destroy() is called no one will use this - * thread pool in any way and no one will try to dispatch entries to it. - * Calling tpool_destroy() from a job in the pool will cause deadlock. - */ -void -tpool_destroy(tpool_t *tpool) -{ - tpool_active_t *activep; - - ASSERT(!tpool_member(tpool)); - ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); - - pthread_mutex_lock(&tpool->tp_mutex); - pthread_cleanup_push(tpool_cleanup, tpool); - - /* mark the pool as being destroyed; wakeup idle workers */ - tpool->tp_flags |= TP_DESTROY; - tpool->tp_flags &= ~TP_SUSPEND; - (void) pthread_cond_broadcast(&tpool->tp_workcv); - - /* cancel all active workers */ - for (activep = tpool->tp_active; activep; activep = activep->tpa_next) - (void) pthread_cancel(activep->tpa_tid); - - /* wait for all active workers to finish */ - while (tpool->tp_active != NULL) { - tpool->tp_flags |= TP_WAIT; - (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); - } - - /* the last worker to terminate will wake us up */ - while (tpool->tp_current != 0) - (void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex); - - pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ - delete_pool(tpool); -} - -/* - * Like tpool_destroy(), but don't cancel workers or wait for them to finish. - * The last worker to terminate will delete the pool. - */ -void -tpool_abandon(tpool_t *tpool) -{ - ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); - - pthread_mutex_lock(&tpool->tp_mutex); - if (tpool->tp_current == 0) { - /* no workers, just delete the pool */ - pthread_mutex_unlock(&tpool->tp_mutex); - delete_pool(tpool); - } else { - /* wake up all workers, last one will delete the pool */ - tpool->tp_flags |= TP_ABANDON; - tpool->tp_flags &= ~TP_SUSPEND; - (void) pthread_cond_broadcast(&tpool->tp_workcv); - pthread_mutex_unlock(&tpool->tp_mutex); - } -} - -/* - * Wait for all jobs to complete. - * Calling tpool_wait() from a job in the pool will cause deadlock. - */ -void -tpool_wait(tpool_t *tpool) -{ - ASSERT(!tpool_member(tpool)); - ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); - - pthread_mutex_lock(&tpool->tp_mutex); - pthread_cleanup_push(tpool_cleanup, tpool); - while (tpool->tp_head != NULL || tpool->tp_active != NULL) { - tpool->tp_flags |= TP_WAIT; - (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); - ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); - } - pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ -} - -void -tpool_suspend(tpool_t *tpool) -{ - ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); - - pthread_mutex_lock(&tpool->tp_mutex); - tpool->tp_flags |= TP_SUSPEND; - pthread_mutex_unlock(&tpool->tp_mutex); -} - -int -tpool_suspended(tpool_t *tpool) -{ - int suspended; - - ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); - - pthread_mutex_lock(&tpool->tp_mutex); - suspended = (tpool->tp_flags & TP_SUSPEND) != 0; - pthread_mutex_unlock(&tpool->tp_mutex); - - return (suspended); -} - -void -tpool_resume(tpool_t *tpool) -{ - int excess; - - ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); - - pthread_mutex_lock(&tpool->tp_mutex); - if (!(tpool->tp_flags & TP_SUSPEND)) { - pthread_mutex_unlock(&tpool->tp_mutex); - return; - } - tpool->tp_flags &= ~TP_SUSPEND; - (void) pthread_cond_broadcast(&tpool->tp_workcv); - excess = tpool->tp_njobs - tpool->tp_idle; - while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) { - if (create_worker(tpool) != 0) - break; /* pthread_create() failed */ - tpool->tp_current++; - } - pthread_mutex_unlock(&tpool->tp_mutex); -} - -int -tpool_member(tpool_t *tpool) -{ - pthread_t my_tid = pthread_self(); - tpool_active_t *activep; - - ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); - - pthread_mutex_lock(&tpool->tp_mutex); - for (activep = tpool->tp_active; activep; activep = activep->tpa_next) { - if (activep->tpa_tid == my_tid) { - pthread_mutex_unlock(&tpool->tp_mutex); - return (1); - } - } - pthread_mutex_unlock(&tpool->tp_mutex); - return (0); -} diff --git a/lib/libtpool/thread_pool_impl.h b/lib/libtpool/thread_pool_impl.h deleted file mode 100644 index e2bffd37d..000000000 --- a/lib/libtpool/thread_pool_impl.h +++ /dev/null @@ -1,94 +0,0 @@ -// SPDX-License-Identifier: CDDL-1.0 -/* - * CDDL HEADER START - * - * The contents of this file are subject to the terms of the - * Common Development and Distribution License (the "License"). - * You may not use this file except in compliance with the License. - * - * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE - * or https://opensource.org/licenses/CDDL-1.0. - * See the License for the specific language governing permissions - * and limitations under the License. - * - * When distributing Covered Code, include this CDDL HEADER in each - * file and include the License file at usr/src/OPENSOLARIS.LICENSE. - * If applicable, add the following below this CDDL HEADER, with the - * fields enclosed by brackets "[]" replaced with your own identifying - * information: Portions Copyright [yyyy] [name of copyright owner] - * - * CDDL HEADER END - */ - -/* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved. - * Use is subject to license terms. - */ - -#ifndef _THREAD_POOL_IMPL_H -#define _THREAD_POOL_IMPL_H - -#include - -#ifdef __cplusplus -extern "C" { -#endif - -/* - * Thread pool implementation definitions. - * See for interface declarations. - */ - -/* - * FIFO queued job - */ -typedef struct tpool_job tpool_job_t; -struct tpool_job { - tpool_job_t *tpj_next; /* list of jobs */ - void (*tpj_func)(void *); /* function to call */ - void *tpj_arg; /* its argument */ -}; - -/* - * List of active threads, linked through their stacks. - */ -typedef struct tpool_active tpool_active_t; -struct tpool_active { - tpool_active_t *tpa_next; /* list of active threads */ - pthread_t tpa_tid; /* active thread id */ -}; - -/* - * The thread pool. - */ -struct tpool { - tpool_t *tp_forw; /* circular list of all thread pools */ - tpool_t *tp_back; - pthread_mutex_t tp_mutex; /* protects the pool data */ - pthread_cond_t tp_busycv; /* synchronization in tpool_dispatch */ - pthread_cond_t tp_workcv; /* synchronization with workers */ - pthread_cond_t tp_waitcv; /* synchronization in tpool_wait() */ - tpool_active_t *tp_active; /* threads performing work */ - tpool_job_t *tp_head; /* FIFO job queue */ - tpool_job_t *tp_tail; - pthread_attr_t tp_attr; /* attributes of the workers */ - int tp_flags; /* see below */ - uint_t tp_linger; /* seconds before idle workers exit */ - int tp_njobs; /* number of jobs in job queue */ - int tp_minimum; /* minimum number of worker threads */ - int tp_maximum; /* maximum number of worker threads */ - int tp_current; /* current number of worker threads */ - int tp_idle; /* number of idle workers */ -}; - -/* tp_flags */ -#define TP_WAIT 0x01 /* waiting in tpool_wait() */ -#define TP_SUSPEND 0x02 /* pool is being suspended */ -#define TP_DESTROY 0x04 /* pool is being destroyed */ -#define TP_ABANDON 0x08 /* pool is abandoned (auto-destroy) */ - -#ifdef __cplusplus -} -#endif - -#endif /* _THREAD_POOL_IMPL_H */ diff --git a/lib/libzfs/libzfs.abi b/lib/libzfs/libzfs.abi index f481b6221..371b6f2f0 100644 --- a/lib/libzfs/libzfs.abi +++ b/lib/libzfs/libzfs.abi @@ -6,7 +6,6 @@ - @@ -327,15 +326,6 @@ - - - - - - - - - @@ -1387,17 +1377,16 @@ - - - - - - - - - - + + + + + + + + + @@ -1408,14 +1397,48 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -1917,34 +1940,7 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -1954,44 +1950,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -2091,88 +2049,11 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - + @@ -2181,15 +2062,16 @@ + + + + + - - - - @@ -2205,13 +2087,6 @@ - - - - - - - @@ -2236,28 +2111,11 @@ - - - - - - - - - - - - - - - - - @@ -2293,6 +2151,24 @@ + + + + + + + + + + + + + + + + + + @@ -2303,6 +2179,10 @@ + + + + @@ -2348,339 +2228,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -2696,9 +2243,6 @@ - - - @@ -2706,30 +2250,17 @@ - - - - - - - - - - - - - - + @@ -3070,7 +2601,7 @@ - + @@ -3081,51 +2612,48 @@ - + - - - - + - + - + - + - + - + - + - + - + - + - + - + - + - + @@ -3203,7 +2731,10 @@ + + + @@ -3219,83 +2750,12 @@ - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -3400,6 +2860,53 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -3438,12 +2945,6 @@ - - - - - - @@ -3458,12 +2959,6 @@ - - - - - - @@ -3749,10 +3244,10 @@ - + - + @@ -3793,24 +3288,9 @@ - - - - - - - - - - - - - - - @@ -4142,28 +3622,28 @@ - + - + - + - + - + - + - + - + @@ -4171,7 +3651,7 @@ - + @@ -4182,7 +3662,7 @@ - + @@ -4193,7 +3673,7 @@ - + @@ -4210,7 +3690,7 @@ - + @@ -4223,13 +3703,13 @@ - + - + @@ -4237,7 +3717,7 @@ - + @@ -4245,7 +3725,7 @@ - + @@ -4692,6 +4172,12 @@ + + + + + + @@ -4756,7 +4242,6 @@ - @@ -4926,8 +4411,6 @@ - - @@ -5129,42 +4612,15 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -6192,17 +5648,6 @@ - - - - - - - - - - - @@ -6290,63 +5735,23 @@ + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + @@ -6359,10 +5764,121 @@ - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -6380,6 +5896,30 @@ + + + + + + + + + + + + + + + + + + + + + + + + @@ -6418,6 +5958,44 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -6465,6 +6043,29 @@ + + + + + + + + + + + + + + + + + + + + + + + @@ -6479,12 +6080,15 @@ - + + - - + + + + @@ -6496,27 +6100,6 @@ - - - - - - - - - - - - - - - - - - - - - @@ -6547,6 +6130,30 @@ + + + + + + + + + + + + + + + + + + + + + + + + @@ -6655,7 +6262,6 @@ - @@ -8572,6 +8178,7 @@ + @@ -8685,12 +8292,6 @@ - - - - - - @@ -9951,9 +9552,6 @@ - - - @@ -10220,10 +9818,6 @@ - - - - @@ -10364,6 +9958,7 @@ + diff --git a/lib/libzfs/libzfs_mount.c b/lib/libzfs/libzfs_mount.c index 5c9e2199e..8d840dff7 100644 --- a/lib/libzfs/libzfs_mount.c +++ b/lib/libzfs/libzfs_mount.c @@ -78,7 +78,6 @@ #include #include "libzfs_impl.h" -#include #include #include @@ -1071,7 +1070,7 @@ non_descendant_idx(zfs_handle_t **handles, size_t num_handles, int idx) typedef struct mnt_param { libzfs_handle_t *mnt_hdl; - tpool_t *mnt_tp; + taskq_t *mnt_tq; zfs_handle_t **mnt_zhps; /* filesystems to mount */ size_t mnt_num_handles; int mnt_idx; /* Index of selected entry to mount */ @@ -1085,19 +1084,20 @@ typedef struct mnt_param { */ static void zfs_dispatch_mount(libzfs_handle_t *hdl, zfs_handle_t **handles, - size_t num_handles, int idx, zfs_iter_f func, void *data, tpool_t *tp) + size_t num_handles, int idx, zfs_iter_f func, void *data, taskq_t *tq) { mnt_param_t *mnt_param = zfs_alloc(hdl, sizeof (mnt_param_t)); mnt_param->mnt_hdl = hdl; - mnt_param->mnt_tp = tp; + mnt_param->mnt_tq = tq; mnt_param->mnt_zhps = handles; mnt_param->mnt_num_handles = num_handles; mnt_param->mnt_idx = idx; mnt_param->mnt_func = func; mnt_param->mnt_data = data; - if (tpool_dispatch(tp, zfs_mount_task, (void*)mnt_param)) { + if (taskq_dispatch(tq, zfs_mount_task, (void*)mnt_param, + TQ_SLEEP) == TASKQID_INVALID) { /* Could not dispatch to thread pool; execute directly */ zfs_mount_task((void*)mnt_param); } @@ -1188,7 +1188,7 @@ zfs_mount_task(void *arg) if (!libzfs_path_contains(mountpoint, child)) break; /* not a descendant, return */ zfs_dispatch_mount(mp->mnt_hdl, handles, num_handles, i, - mp->mnt_func, mp->mnt_data, mp->mnt_tp); + mp->mnt_func, mp->mnt_data, mp->mnt_tq); } out: @@ -1246,7 +1246,8 @@ zfs_foreach_mountpoint(libzfs_handle_t *hdl, zfs_handle_t **handles, * Issue the callback function for each dataset using a parallel * algorithm that uses a thread pool to manage threads. */ - tpool_t *tp = tpool_create(1, nthr, 0, NULL); + taskq_t *tq = taskq_create("zfs_foreach_mountpoint", nthr, minclsyspri, + 1, INT_MAX, TASKQ_DYNAMIC); /* * There may be multiple "top level" mountpoints outside of the pool's @@ -1264,11 +1265,11 @@ zfs_foreach_mountpoint(libzfs_handle_t *hdl, zfs_handle_t **handles, zfs_prop_get_int(handles[i], ZFS_PROP_ZONED)) break; zfs_dispatch_mount(hdl, handles, num_handles, i, func, data, - tp); + tq); } - tpool_wait(tp); /* wait for all scheduled mounts to complete */ - tpool_destroy(tp); + taskq_wait(tq); /* wait for all scheduled mounts to complete */ + taskq_destroy(tq); } /* diff --git a/lib/libzfs/os/linux/libzfs_mount_os.c b/lib/libzfs/os/linux/libzfs_mount_os.c index 585d22d9e..cdfb432b1 100644 --- a/lib/libzfs/os/linux/libzfs_mount_os.c +++ b/lib/libzfs/os/linux/libzfs_mount_os.c @@ -49,7 +49,6 @@ #include #include "../../libzfs_impl.h" -#include #define ZS_COMMENT 0x00000000 /* comment */ #define ZS_ZFSUTIL 0x00000001 /* caller is zfs(8) */ diff --git a/lib/libzfs_core/libzfs_core.abi b/lib/libzfs_core/libzfs_core.abi index 238151d43..8b63a0abd 100644 --- a/lib/libzfs_core/libzfs_core.abi +++ b/lib/libzfs_core/libzfs_core.abi @@ -2004,14 +2004,17 @@ + + + @@ -2025,15 +2028,16 @@ + + + + + - - - - @@ -2131,6 +2135,9 @@ + + + diff --git a/lib/libzutil/Makefile.am b/lib/libzutil/Makefile.am index 519906235..e5af9d69d 100644 --- a/lib/libzutil/Makefile.am +++ b/lib/libzutil/Makefile.am @@ -30,7 +30,6 @@ endif libzutil_la_LIBADD = \ libavl.la \ - libtpool.la \ libnvpair.la \ libspl.la diff --git a/lib/libzutil/os/freebsd/zutil_import_os.c b/lib/libzutil/os/freebsd/zutil_import_os.c index 324ba1cf3..1d38d6e6b 100644 --- a/lib/libzutil/os/freebsd/zutil_import_os.c +++ b/lib/libzutil/os/freebsd/zutil_import_os.c @@ -62,7 +62,6 @@ #include #include -#include #include #include diff --git a/lib/libzutil/os/linux/zutil_import_os.c b/lib/libzutil/os/linux/zutil_import_os.c index 2b2043889..7cee63616 100644 --- a/lib/libzutil/os/linux/zutil_import_os.c +++ b/lib/libzutil/os/linux/zutil_import_os.c @@ -63,7 +63,6 @@ #include #include -#include #include #include #include diff --git a/lib/libzutil/zutil_import.c b/lib/libzutil/zutil_import.c index 08367f4c0..56ac11e20 100644 --- a/lib/libzutil/zutil_import.c +++ b/lib/libzutil/zutil_import.c @@ -65,8 +65,8 @@ #include #include #include +#include -#include #include #include @@ -1457,7 +1457,7 @@ zpool_find_import_impl(libpc_handle_t *hdl, importargs_t *iarg, name_entry_t *ne, *nenext; rdsk_node_t *slice; void *cookie; - tpool_t *t; + taskq_t *tq; verify(iarg->poolname == NULL || iarg->guid == 0); @@ -1480,13 +1480,14 @@ zpool_find_import_impl(libpc_handle_t *hdl, importargs_t *iarg, threads = MIN(threads, am / VDEV_LABELS); #endif #endif - t = tpool_create(1, threads, 0, NULL); + tq = taskq_create("zpool_find_import", threads, minclsyspri, 1, INT_MAX, + TASKQ_DYNAMIC); for (slice = avl_first(cache); slice; (slice = avl_walk(cache, slice, AVL_AFTER))) - (void) tpool_dispatch(t, zpool_open_func, slice); + (void) taskq_dispatch(tq, zpool_open_func, slice, TQ_SLEEP); - tpool_wait(t); - tpool_destroy(t); + taskq_wait(tq); + taskq_destroy(tq); /* * Process the cache, filtering out any entries which are not