Implement memory and CPU hotplug

ZFS currently doesn't react to hotplugging cpu or memory into the 
system in any way. This patch changes that by adding logic to the ARC 
that allows the system to take advantage of new memory that is added 
for caching purposes. It also adds logic to the taskq infrastructure 
to support dynamically expanding the number of threads allocated to a 
taskq.

Reviewed-by: Brian Behlendorf <behlendorf1@llnl.gov>
Co-authored-by: Matthew Ahrens <matthew.ahrens@delphix.com>
Co-authored-by: Brian Behlendorf <behlendorf1@llnl.gov>
Signed-off-by: Paul Dagnelie <pcd@delphix.com>
Closes #11212
This commit is contained in:
Paul Dagnelie
2020-12-10 14:09:23 -08:00
committed by GitHub
parent f483daa870
commit 60a4c7d2a2
14 changed files with 290 additions and 36 deletions
+10
View File
@@ -243,3 +243,13 @@ arc_lowmem_fini(void)
if (arc_event_lowmem != NULL)
EVENTHANDLER_DEREGISTER(vm_lowmem, arc_event_lowmem);
}
void
arc_register_hotplug(void)
{
}
void
arc_unregister_hotplug(void)
{
}
+127 -5
View File
@@ -28,6 +28,9 @@
#include <sys/kmem.h>
#include <sys/tsd.h>
#include <sys/trace_spl.h>
#ifdef HAVE_CPU_HOTPLUG
#include <linux/cpuhotplug.h>
#endif
int spl_taskq_thread_bind = 0;
module_param(spl_taskq_thread_bind, int, 0644);
@@ -35,7 +38,7 @@ MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default");
int spl_taskq_thread_dynamic = 1;
module_param(spl_taskq_thread_dynamic, int, 0644);
module_param(spl_taskq_thread_dynamic, int, 0444);
MODULE_PARM_DESC(spl_taskq_thread_dynamic, "Allow dynamic taskq threads");
int spl_taskq_thread_priority = 1;
@@ -59,6 +62,11 @@ EXPORT_SYMBOL(system_delay_taskq);
static taskq_t *dynamic_taskq;
static taskq_thread_t *taskq_thread_create(taskq_t *);
#ifdef HAVE_CPU_HOTPLUG
/* Multi-callback id for cpu hotplugging. */
static int spl_taskq_cpuhp_state;
#endif
/* List of all taskqs */
LIST_HEAD(tq_list);
struct rw_semaphore tq_list_sem;
@@ -1024,13 +1032,14 @@ taskq_thread_create(taskq_t *tq)
}
taskq_t *
taskq_create(const char *name, int nthreads, pri_t pri,
taskq_create(const char *name, int threads_arg, pri_t pri,
int minalloc, int maxalloc, uint_t flags)
{
taskq_t *tq;
taskq_thread_t *tqt;
int count = 0, rc = 0, i;
unsigned long irqflags;
int nthreads = threads_arg;
ASSERT(name != NULL);
ASSERT(minalloc >= 0);
@@ -1041,15 +1050,27 @@ taskq_create(const char *name, int nthreads, pri_t pri,
if (flags & TASKQ_THREADS_CPU_PCT) {
ASSERT(nthreads <= 100);
ASSERT(nthreads >= 0);
nthreads = MIN(nthreads, 100);
nthreads = MIN(threads_arg, 100);
nthreads = MAX(nthreads, 0);
nthreads = MAX((num_online_cpus() * nthreads) / 100, 1);
nthreads = MAX((num_online_cpus() * nthreads) /100, 1);
}
tq = kmem_alloc(sizeof (*tq), KM_PUSHPAGE);
if (tq == NULL)
return (NULL);
tq->tq_hp_support = B_FALSE;
#ifdef HAVE_CPU_HOTPLUG
if (flags & TASKQ_THREADS_CPU_PCT) {
tq->tq_hp_support = B_TRUE;
if (cpuhp_state_add_instance_nocalls(spl_taskq_cpuhp_state,
&tq->tq_hp_cb_node) != 0) {
kmem_free(tq, sizeof (*tq));
return (NULL);
}
}
#endif
spin_lock_init(&tq->tq_lock);
INIT_LIST_HEAD(&tq->tq_thread_list);
INIT_LIST_HEAD(&tq->tq_active_list);
@@ -1058,6 +1079,7 @@ taskq_create(const char *name, int nthreads, pri_t pri,
tq->tq_nthreads = 0;
tq->tq_nspawn = 0;
tq->tq_maxthreads = nthreads;
tq->tq_cpu_pct = threads_arg;
tq->tq_pri = pri;
tq->tq_minalloc = minalloc;
tq->tq_maxalloc = maxalloc;
@@ -1131,6 +1153,12 @@ taskq_destroy(taskq_t *tq)
tq->tq_flags &= ~TASKQ_ACTIVE;
spin_unlock_irqrestore(&tq->tq_lock, flags);
#ifdef HAVE_CPU_HOTPLUG
if (tq->tq_hp_support) {
VERIFY0(cpuhp_state_remove_instance_nocalls(
spl_taskq_cpuhp_state, &tq->tq_hp_cb_node));
}
#endif
/*
* When TASKQ_ACTIVE is clear new tasks may not be added nor may
* new worker threads be spawned for dynamic taskq.
@@ -1198,7 +1226,6 @@ taskq_destroy(taskq_t *tq)
}
EXPORT_SYMBOL(taskq_destroy);
static unsigned int spl_taskq_kick = 0;
/*
@@ -1255,12 +1282,96 @@ module_param_call(spl_taskq_kick, param_set_taskq_kick, param_get_uint,
MODULE_PARM_DESC(spl_taskq_kick,
"Write nonzero to kick stuck taskqs to spawn more threads");
#ifdef HAVE_CPU_HOTPLUG
/*
* This callback will be called exactly once for each core that comes online,
* for each dynamic taskq. We attempt to expand taskqs that have
* TASKQ_THREADS_CPU_PCT set. We need to redo the percentage calculation every
* time, to correctly determine whether or not to add a thread.
*/
static int
spl_taskq_expand(unsigned int cpu, struct hlist_node *node)
{
taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);
unsigned long flags;
int err = 0;
ASSERT(tq);
spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
if (!(tq->tq_flags & TASKQ_ACTIVE))
goto out;
ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);
int nthreads = MIN(tq->tq_cpu_pct, 100);
nthreads = MAX(((num_online_cpus() + 1) * nthreads) / 100, 1);
tq->tq_maxthreads = nthreads;
if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) &&
tq->tq_maxthreads > tq->tq_nthreads) {
ASSERT3U(tq->tq_maxthreads, ==, tq->tq_nthreads + 1);
taskq_thread_t *tqt = taskq_thread_create(tq);
if (tqt == NULL)
err = -1;
}
out:
spin_unlock_irqrestore(&tq->tq_lock, flags);
return (err);
}
/*
* While we don't support offlining CPUs, it is possible that CPUs will fail
* to online successfully. We do need to be able to handle this case
* gracefully.
*/
static int
spl_taskq_prepare_down(unsigned int cpu, struct hlist_node *node)
{
taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);
unsigned long flags;
ASSERT(tq);
spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
if (!(tq->tq_flags & TASKQ_ACTIVE))
goto out;
ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);
int nthreads = MIN(tq->tq_cpu_pct, 100);
nthreads = MAX(((num_online_cpus()) * nthreads) / 100, 1);
tq->tq_maxthreads = nthreads;
if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) &&
tq->tq_maxthreads < tq->tq_nthreads) {
ASSERT3U(tq->tq_maxthreads, ==, tq->tq_nthreads - 1);
taskq_thread_t *tqt = list_entry(tq->tq_thread_list.next,
taskq_thread_t, tqt_thread_list);
struct task_struct *thread = tqt->tqt_thread;
spin_unlock_irqrestore(&tq->tq_lock, flags);
kthread_stop(thread);
return (0);
}
out:
spin_unlock_irqrestore(&tq->tq_lock, flags);
return (0);
}
#endif
int
spl_taskq_init(void)
{
init_rwsem(&tq_list_sem);
tsd_create(&taskq_tsd, NULL);
#ifdef HAVE_CPU_HOTPLUG
spl_taskq_cpuhp_state = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN,
"fs/spl_taskq:online", spl_taskq_expand, spl_taskq_prepare_down);
#endif
system_taskq = taskq_create("spl_system_taskq", MAX(boot_ncpus, 64),
maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);
if (system_taskq == NULL)
@@ -1269,6 +1380,9 @@ spl_taskq_init(void)
system_delay_taskq = taskq_create("spl_delay_taskq", MAX(boot_ncpus, 4),
maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);
if (system_delay_taskq == NULL) {
#ifdef HAVE_CPU_HOTPLUG
cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
#endif
taskq_destroy(system_taskq);
return (1);
}
@@ -1276,6 +1390,9 @@ spl_taskq_init(void)
dynamic_taskq = taskq_create("spl_dynamic_taskq", 1,
maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE);
if (dynamic_taskq == NULL) {
#ifdef HAVE_CPU_HOTPLUG
cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
#endif
taskq_destroy(system_taskq);
taskq_destroy(system_delay_taskq);
return (1);
@@ -1304,4 +1421,9 @@ spl_taskq_fini(void)
system_taskq = NULL;
tsd_destroy(&taskq_tsd);
#ifdef HAVE_CPU_HOTPLUG
cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
spl_taskq_cpuhp_state = 0;
#endif
}
+77 -11
View File
@@ -48,6 +48,8 @@
#include <sys/vmsystm.h>
#include <sys/zpl.h>
#include <linux/page_compat.h>
#include <linux/notifier.h>
#include <linux/memory.h>
#endif
#include <sys/callb.h>
#include <sys/kstat.h>
@@ -73,6 +75,9 @@
*/
int zfs_arc_shrinker_limit = 10000;
#ifdef CONFIG_MEMORY_HOTPLUG
static struct notifier_block arc_hotplug_callback_mem_nb;
#endif
/*
* Return a default max arc size based on the amount of physical memory.
@@ -278,18 +283,9 @@ arc_memory_throttle(spa_t *spa, uint64_t reserve, uint64_t txg)
return (0);
}
void
arc_lowmem_init(void)
static void
arc_set_sys_free(uint64_t allmem)
{
uint64_t allmem = arc_all_memory();
/*
* Register a shrinker to support synchronous (direct) memory
* reclaim from the arc. This is done to prevent kswapd from
* swapping out pages when it is preferable to shrink the arc.
*/
spl_register_shrinker(&arc_shrinker);
/*
* The ARC tries to keep at least this much memory available for the
* system. This gives the ARC time to shrink in response to memory
@@ -342,6 +338,20 @@ arc_lowmem_init(void)
arc_sys_free = wmark * 3 + allmem / 32;
}
void
arc_lowmem_init(void)
{
uint64_t allmem = arc_all_memory();
/*
* Register a shrinker to support synchronous (direct) memory
* reclaim from the arc. This is done to prevent kswapd from
* swapping out pages when it is preferable to shrink the arc.
*/
spl_register_shrinker(&arc_shrinker);
arc_set_sys_free(allmem);
}
void
arc_lowmem_fini(void)
{
@@ -375,6 +385,52 @@ param_set_arc_int(const char *buf, zfs_kernel_param_t *kp)
return (0);
}
#ifdef CONFIG_MEMORY_HOTPLUG
/* ARGSUSED */
static int
arc_hotplug_callback(struct notifier_block *self, unsigned long action,
void *arg)
{
uint64_t allmem = arc_all_memory();
if (action != MEM_ONLINE)
return (NOTIFY_OK);
arc_set_limits(allmem);
#ifdef __LP64__
if (zfs_dirty_data_max_max == 0)
zfs_dirty_data_max_max = MIN(4ULL * 1024 * 1024 * 1024,
allmem * zfs_dirty_data_max_max_percent / 100);
#else
if (zfs_dirty_data_max_max == 0)
zfs_dirty_data_max_max = MIN(1ULL * 1024 * 1024 * 1024,
allmem * zfs_dirty_data_max_max_percent / 100);
#endif
arc_set_sys_free(allmem);
return (NOTIFY_OK);
}
#endif
void
arc_register_hotplug(void)
{
#ifdef CONFIG_MEMORY_HOTPLUG
arc_hotplug_callback_mem_nb.notifier_call = arc_hotplug_callback;
/* There is no significance to the value 100 */
arc_hotplug_callback_mem_nb.priority = 100;
register_memory_notifier(&arc_hotplug_callback_mem_nb);
#endif
}
void
arc_unregister_hotplug(void)
{
#ifdef CONFIG_MEMORY_HOTPLUG
unregister_memory_notifier(&arc_hotplug_callback_mem_nb);
#endif
}
#else /* _KERNEL */
int64_t
arc_available_memory(void)
@@ -405,6 +461,16 @@ arc_free_memory(void)
{
return (spa_get_random(arc_all_memory() * 20 / 100));
}
void
arc_register_hotplug(void)
{
}
void
arc_unregister_hotplug(void)
{
}
#endif /* _KERNEL */
/*
+5
View File
@@ -70,6 +70,11 @@
* zeroing out the borrowed value (forcing that thread to borrow on its next
* request, which will also be expensive). This is what makes aggsums well
* suited for write-many read-rarely operations.
*
* Note that the aggsums do not expand if more CPUs are hot-added. In that
* case, we will have less fanout than boot_ncpus, but we don't want to always
* reserve the RAM necessary to create the extra slots for additional CPUs up
* front, and dynamically adding them is a complex task.
*/
/*
+17 -7
View File
@@ -7592,6 +7592,15 @@ arc_target_bytes(void)
return (arc_c);
}
void
arc_set_limits(uint64_t allmem)
{
/* Set min cache to 1/32 of all memory, or 32MB, whichever is more. */
arc_c_min = MAX(allmem / 32, 2ULL << SPA_MAXBLOCKSHIFT);
/* How to set default max varies by platform. */
arc_c_max = arc_default_max(arc_c_min, allmem);
}
void
arc_init(void)
{
@@ -7607,11 +7616,7 @@ arc_init(void)
arc_lowmem_init();
#endif
/* Set min cache to 1/32 of all memory, or 32MB, whichever is more. */
arc_c_min = MAX(allmem / 32, 2ULL << SPA_MAXBLOCKSHIFT);
/* How to set default max varies by platform. */
arc_c_max = arc_default_max(arc_c_min, allmem);
arc_set_limits(allmem);
#ifndef _KERNEL
/*
@@ -7648,6 +7653,8 @@ arc_init(void)
if (arc_c < arc_c_min)
arc_c = arc_c_min;
arc_register_hotplug();
arc_state_init();
buf_init();
@@ -7656,8 +7663,9 @@ arc_init(void)
offsetof(arc_prune_t, p_node));
mutex_init(&arc_prune_mtx, NULL, MUTEX_DEFAULT, NULL);
arc_prune_taskq = taskq_create("arc_prune", boot_ncpus, defclsyspri,
boot_ncpus, INT_MAX, TASKQ_PREPOPULATE | TASKQ_DYNAMIC);
arc_prune_taskq = taskq_create("arc_prune", 100, defclsyspri,
boot_ncpus, INT_MAX, TASKQ_PREPOPULATE | TASKQ_DYNAMIC |
TASKQ_THREADS_CPU_PCT);
arc_ksp = kstat_create("zfs", 0, "arcstats", "misc", KSTAT_TYPE_NAMED,
sizeof (arc_stats) / sizeof (kstat_named_t), KSTAT_FLAG_VIRTUAL);
@@ -7754,6 +7762,8 @@ arc_fini(void)
buf_fini();
arc_state_fini();
arc_unregister_hotplug();
/*
* We destroy the zthrs after all the ARC state has been
* torn down to avoid the case of them receiving any
+5 -4
View File
@@ -220,11 +220,12 @@ dsl_pool_open_impl(spa_t *spa, uint64_t txg)
mutex_init(&dp->dp_lock, NULL, MUTEX_DEFAULT, NULL);
cv_init(&dp->dp_spaceavail_cv, NULL, CV_DEFAULT, NULL);
dp->dp_zrele_taskq = taskq_create("z_zrele", boot_ncpus, defclsyspri,
boot_ncpus * 8, INT_MAX, TASKQ_PREPOPULATE | TASKQ_DYNAMIC);
dp->dp_zrele_taskq = taskq_create("z_zrele", 100, defclsyspri,
boot_ncpus * 8, INT_MAX, TASKQ_PREPOPULATE | TASKQ_DYNAMIC |
TASKQ_THREADS_CPU_PCT);
dp->dp_unlinked_drain_taskq = taskq_create("z_unlinked_drain",
boot_ncpus, defclsyspri, boot_ncpus, INT_MAX,
TASKQ_PREPOPULATE | TASKQ_DYNAMIC);
100, defclsyspri, boot_ncpus, INT_MAX,
TASKQ_PREPOPULATE | TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT);
return (dp);
}
+6 -3
View File
@@ -96,9 +96,12 @@ multilist_create_impl(size_t size, size_t offset,
}
/*
* Allocate a new multilist, using the default number of sublists
* (the number of CPUs, or at least 4, or the tunable
* zfs_multilist_num_sublists).
* Allocate a new multilist, using the default number of sublists (the number
* of CPUs, or at least 4, or the tunable zfs_multilist_num_sublists). Note
* that the multilists do not expand if more CPUs are hot-added. In that case,
* we will have less fanout than boot_ncpus, but we don't want to always
* reserve the RAM necessary to create the extra slots for additional CPUs up
* front, and dynamically adding them is a complex task.
*/
multilist_t *
multilist_create(size_t size, size_t offset,
+4 -4
View File
@@ -1281,15 +1281,15 @@ spa_activate(spa_t *spa, spa_mode_t mode)
* pool traverse code from monopolizing the global (and limited)
* system_taskq by inappropriately scheduling long running tasks on it.
*/
spa->spa_prefetch_taskq = taskq_create("z_prefetch", boot_ncpus,
defclsyspri, 1, INT_MAX, TASKQ_DYNAMIC);
spa->spa_prefetch_taskq = taskq_create("z_prefetch", 100,
defclsyspri, 1, INT_MAX, TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT);
/*
* The taskq to upgrade datasets in this pool. Currently used by
* feature SPA_FEATURE_USEROBJ_ACCOUNTING/SPA_FEATURE_PROJECT_QUOTA.
*/
spa->spa_upgrade_taskq = taskq_create("z_upgrade", boot_ncpus,
defclsyspri, 1, INT_MAX, TASKQ_DYNAMIC);
spa->spa_upgrade_taskq = taskq_create("z_upgrade", 100,
defclsyspri, 1, INT_MAX, TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT);
}
/*
+3 -2
View File
@@ -446,8 +446,9 @@ txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
* Commit callback taskq hasn't been created yet.
*/
tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
boot_ncpus, defclsyspri, boot_ncpus, boot_ncpus * 2,
TASKQ_PREPOPULATE | TASKQ_DYNAMIC);
100, defclsyspri, boot_ncpus, boot_ncpus * 2,
TASKQ_PREPOPULATE | TASKQ_DYNAMIC |
TASKQ_THREADS_CPU_PCT);
}
cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);