cddb60ba98
Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
419 lines
13 KiB
Diff
419 lines
13 KiB
Diff
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
|
|
From: Jens Axboe <axboe@kernel.dk>
|
|
Date: Tue, 31 Aug 2021 13:57:32 -0600
|
|
Subject: [PATCH] io-wq: split bounded and unbounded work into separate lists
|
|
|
|
We've got a few issues that all boil down to the fact that we have one
|
|
list of pending work items, yet two different types of workers to
|
|
serve them. This causes some oddities around workers switching type and
|
|
even hashed work vs regular work on the same bounded list.
|
|
|
|
Just separate them out cleanly, similarly to how we already do
|
|
accounting of what is running. That provides a clean separation and
|
|
removes some corner cases that can cause stalls when handling IO
|
|
that is punted to io-wq.
|
|
|
|
Fixes: ecc53c48c13d ("io-wq: check max_worker limits if a worker transitions bound state")
|
|
Signed-off-by: Jens Axboe <axboe@kernel.dk>
|
|
[backport]
|
|
Signed-off-by: Fabian Ebner <f.ebner@proxmox.com>
|
|
Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
|
|
---
|
|
fs/io-wq.c | 160 +++++++++++++++++++++++------------------------------
|
|
1 file changed, 70 insertions(+), 90 deletions(-)
|
|
|
|
diff --git a/fs/io-wq.c b/fs/io-wq.c
|
|
index ba7aaf2b95d0..6710da3d4445 100644
|
|
--- a/fs/io-wq.c
|
|
+++ b/fs/io-wq.c
|
|
@@ -34,7 +34,7 @@ enum {
|
|
};
|
|
|
|
enum {
|
|
- IO_WQE_FLAG_STALLED = 1, /* stalled on hash */
|
|
+ IO_ACCT_STALLED_BIT = 0, /* stalled on hash */
|
|
};
|
|
|
|
/*
|
|
@@ -73,26 +73,25 @@ struct io_wqe_acct {
|
|
unsigned max_workers;
|
|
int index;
|
|
atomic_t nr_running;
|
|
+ struct io_wq_work_list work_list;
|
|
+ unsigned long flags;
|
|
};
|
|
|
|
enum {
|
|
IO_WQ_ACCT_BOUND,
|
|
IO_WQ_ACCT_UNBOUND,
|
|
+ IO_WQ_ACCT_NR,
|
|
};
|
|
|
|
/*
|
|
* Per-node worker thread pool
|
|
*/
|
|
struct io_wqe {
|
|
- struct {
|
|
- raw_spinlock_t lock;
|
|
- struct io_wq_work_list work_list;
|
|
- unsigned flags;
|
|
- } ____cacheline_aligned_in_smp;
|
|
-
|
|
- int node;
|
|
+ raw_spinlock_t lock;
|
|
struct io_wqe_acct acct[2];
|
|
|
|
+ int node;
|
|
+
|
|
struct hlist_nulls_head free_list;
|
|
struct list_head all_list;
|
|
|
|
@@ -196,11 +195,10 @@ static void io_worker_exit(struct io_worker *worker)
|
|
do_exit(0);
|
|
}
|
|
|
|
-static inline bool io_wqe_run_queue(struct io_wqe *wqe)
|
|
- __must_hold(wqe->lock)
|
|
+static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
|
|
{
|
|
- if (!wq_list_empty(&wqe->work_list) &&
|
|
- !(wqe->flags & IO_WQE_FLAG_STALLED))
|
|
+ if (!wq_list_empty(&acct->work_list) &&
|
|
+ !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
|
|
return true;
|
|
return false;
|
|
}
|
|
@@ -209,7 +207,8 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe)
|
|
* Check head of free list for an available worker. If one isn't available,
|
|
* caller must create one.
|
|
*/
|
|
-static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
|
|
+static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
|
|
+ struct io_wqe_acct *acct)
|
|
__must_hold(RCU)
|
|
{
|
|
struct hlist_nulls_node *n;
|
|
@@ -223,6 +222,10 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
|
|
hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
|
|
if (!io_worker_get(worker))
|
|
continue;
|
|
+ if (io_wqe_get_acct(worker) != acct) {
|
|
+ io_worker_release(worker);
|
|
+ continue;
|
|
+ }
|
|
if (wake_up_process(worker->task)) {
|
|
io_worker_release(worker);
|
|
return true;
|
|
@@ -341,7 +344,7 @@ static void io_wqe_dec_running(struct io_worker *worker)
|
|
if (!(worker->flags & IO_WORKER_F_UP))
|
|
return;
|
|
|
|
- if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
|
|
+ if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
|
|
atomic_inc(&acct->nr_running);
|
|
atomic_inc(&wqe->wq->worker_refs);
|
|
io_queue_worker_create(wqe, worker, acct);
|
|
@@ -356,29 +359,10 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
|
|
struct io_wq_work *work)
|
|
__must_hold(wqe->lock)
|
|
{
|
|
- bool worker_bound, work_bound;
|
|
-
|
|
- BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1);
|
|
-
|
|
if (worker->flags & IO_WORKER_F_FREE) {
|
|
worker->flags &= ~IO_WORKER_F_FREE;
|
|
hlist_nulls_del_init_rcu(&worker->nulls_node);
|
|
}
|
|
-
|
|
- /*
|
|
- * If worker is moving from bound to unbound (or vice versa), then
|
|
- * ensure we update the running accounting.
|
|
- */
|
|
- worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
|
|
- work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
|
|
- if (worker_bound != work_bound) {
|
|
- int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND;
|
|
- io_wqe_dec_running(worker);
|
|
- worker->flags ^= IO_WORKER_F_BOUND;
|
|
- wqe->acct[index].nr_workers--;
|
|
- wqe->acct[index ^ 1].nr_workers++;
|
|
- io_wqe_inc_running(worker);
|
|
- }
|
|
}
|
|
|
|
/*
|
|
@@ -420,44 +404,23 @@ static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
|
|
return ret;
|
|
}
|
|
|
|
-/*
|
|
- * We can always run the work if the worker is currently the same type as
|
|
- * the work (eg both are bound, or both are unbound). If they are not the
|
|
- * same, only allow it if incrementing the worker count would be allowed.
|
|
- */
|
|
-static bool io_worker_can_run_work(struct io_worker *worker,
|
|
- struct io_wq_work *work)
|
|
-{
|
|
- struct io_wqe_acct *acct;
|
|
-
|
|
- if (!(worker->flags & IO_WORKER_F_BOUND) !=
|
|
- !(work->flags & IO_WQ_WORK_UNBOUND))
|
|
- return true;
|
|
-
|
|
- /* not the same type, check if we'd go over the limit */
|
|
- acct = io_work_get_acct(worker->wqe, work);
|
|
- return acct->nr_workers < acct->max_workers;
|
|
-}
|
|
-
|
|
-static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
|
|
+static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
|
|
struct io_worker *worker)
|
|
__must_hold(wqe->lock)
|
|
{
|
|
struct io_wq_work_node *node, *prev;
|
|
struct io_wq_work *work, *tail;
|
|
unsigned int stall_hash = -1U;
|
|
+ struct io_wqe *wqe = worker->wqe;
|
|
|
|
- wq_list_for_each(node, prev, &wqe->work_list) {
|
|
+ wq_list_for_each(node, prev, &acct->work_list) {
|
|
unsigned int hash;
|
|
|
|
work = container_of(node, struct io_wq_work, list);
|
|
|
|
- if (!io_worker_can_run_work(worker, work))
|
|
- break;
|
|
-
|
|
/* not hashed, can run anytime */
|
|
if (!io_wq_is_hashed(work)) {
|
|
- wq_list_del(&wqe->work_list, node, prev);
|
|
+ wq_list_del(&acct->work_list, node, prev);
|
|
return work;
|
|
}
|
|
|
|
@@ -468,7 +431,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
|
|
/* hashed, can run if not already running */
|
|
if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
|
|
wqe->hash_tail[hash] = NULL;
|
|
- wq_list_cut(&wqe->work_list, &tail->list, prev);
|
|
+ wq_list_cut(&acct->work_list, &tail->list, prev);
|
|
return work;
|
|
}
|
|
if (stall_hash == -1U)
|
|
@@ -484,12 +447,12 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
|
|
* Set this before dropping the lock to avoid racing with new
|
|
* work being added and clearing the stalled bit.
|
|
*/
|
|
- wqe->flags |= IO_WQE_FLAG_STALLED;
|
|
+ set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
|
|
raw_spin_unlock(&wqe->lock);
|
|
unstalled = io_wait_on_hash(wqe, stall_hash);
|
|
raw_spin_lock(&wqe->lock);
|
|
if (unstalled) {
|
|
- wqe->flags &= ~IO_WQE_FLAG_STALLED;
|
|
+ clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
|
|
if (wq_has_sleeper(&wqe->wq->hash->wait))
|
|
wake_up(&wqe->wq->hash->wait);
|
|
}
|
|
@@ -526,6 +489,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
|
|
static void io_worker_handle_work(struct io_worker *worker)
|
|
__releases(wqe->lock)
|
|
{
|
|
+ struct io_wqe_acct *acct = io_wqe_get_acct(worker);
|
|
struct io_wqe *wqe = worker->wqe;
|
|
struct io_wq *wq = wqe->wq;
|
|
bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
|
|
@@ -540,7 +504,7 @@ static void io_worker_handle_work(struct io_worker *worker)
|
|
* can't make progress, any work completion or insertion will
|
|
* clear the stalled flag.
|
|
*/
|
|
- work = io_get_next_work(wqe, worker);
|
|
+ work = io_get_next_work(acct, worker);
|
|
if (work)
|
|
__io_worker_busy(wqe, worker, work);
|
|
|
|
@@ -576,7 +540,7 @@ static void io_worker_handle_work(struct io_worker *worker)
|
|
/* serialize hash clear with wake_up() */
|
|
spin_lock_irq(&wq->hash->wait.lock);
|
|
clear_bit(hash, &wq->hash->map);
|
|
- wqe->flags &= ~IO_WQE_FLAG_STALLED;
|
|
+ clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
|
|
spin_unlock_irq(&wq->hash->wait.lock);
|
|
if (wq_has_sleeper(&wq->hash->wait))
|
|
wake_up(&wq->hash->wait);
|
|
@@ -595,6 +559,7 @@ static void io_worker_handle_work(struct io_worker *worker)
|
|
static int io_wqe_worker(void *data)
|
|
{
|
|
struct io_worker *worker = data;
|
|
+ struct io_wqe_acct *acct = io_wqe_get_acct(worker);
|
|
struct io_wqe *wqe = worker->wqe;
|
|
struct io_wq *wq = wqe->wq;
|
|
char buf[TASK_COMM_LEN];
|
|
@@ -610,7 +575,7 @@ static int io_wqe_worker(void *data)
|
|
set_current_state(TASK_INTERRUPTIBLE);
|
|
loop:
|
|
raw_spin_lock_irq(&wqe->lock);
|
|
- if (io_wqe_run_queue(wqe)) {
|
|
+ if (io_acct_run_queue(acct)) {
|
|
io_worker_handle_work(worker);
|
|
goto loop;
|
|
}
|
|
@@ -636,7 +601,7 @@ static int io_wqe_worker(void *data)
|
|
|
|
if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
|
|
raw_spin_lock_irq(&wqe->lock);
|
|
- if (!wq_list_empty(&wqe->work_list))
|
|
+ if (!wq_list_empty(&acct->work_list))
|
|
io_worker_handle_work(worker);
|
|
else
|
|
raw_spin_unlock_irq(&wqe->lock);
|
|
@@ -782,12 +747,13 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
|
|
|
|
static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
|
|
{
|
|
+ struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
|
|
unsigned int hash;
|
|
struct io_wq_work *tail;
|
|
|
|
if (!io_wq_is_hashed(work)) {
|
|
append:
|
|
- wq_list_add_tail(&work->list, &wqe->work_list);
|
|
+ wq_list_add_tail(&work->list, &acct->work_list);
|
|
return;
|
|
}
|
|
|
|
@@ -797,7 +763,7 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
|
|
if (!tail)
|
|
goto append;
|
|
|
|
- wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
|
|
+ wq_list_add_after(&work->list, &tail->list, &acct->work_list);
|
|
}
|
|
|
|
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
|
|
@@ -819,10 +785,10 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
|
|
|
|
raw_spin_lock_irqsave(&wqe->lock, flags);
|
|
io_wqe_insert_work(wqe, work);
|
|
- wqe->flags &= ~IO_WQE_FLAG_STALLED;
|
|
+ clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
|
|
|
|
rcu_read_lock();
|
|
- do_create = !io_wqe_activate_free_worker(wqe);
|
|
+ do_create = !io_wqe_activate_free_worker(wqe, acct);
|
|
rcu_read_unlock();
|
|
|
|
raw_spin_unlock_irqrestore(&wqe->lock, flags);
|
|
@@ -875,6 +841,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
|
|
struct io_wq_work *work,
|
|
struct io_wq_work_node *prev)
|
|
{
|
|
+ struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
|
|
unsigned int hash = io_get_work_hash(work);
|
|
struct io_wq_work *prev_work = NULL;
|
|
|
|
@@ -886,7 +853,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
|
|
else
|
|
wqe->hash_tail[hash] = NULL;
|
|
}
|
|
- wq_list_del(&wqe->work_list, &work->list, prev);
|
|
+ wq_list_del(&acct->work_list, &work->list, prev);
|
|
}
|
|
|
|
static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
|
|
@@ -895,22 +862,27 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
|
|
struct io_wq_work_node *node, *prev;
|
|
struct io_wq_work *work;
|
|
unsigned long flags;
|
|
+ int i;
|
|
|
|
retry:
|
|
raw_spin_lock_irqsave(&wqe->lock, flags);
|
|
- wq_list_for_each(node, prev, &wqe->work_list) {
|
|
- work = container_of(node, struct io_wq_work, list);
|
|
- if (!match->fn(work, match->data))
|
|
- continue;
|
|
- io_wqe_remove_pending(wqe, work, prev);
|
|
- raw_spin_unlock_irqrestore(&wqe->lock, flags);
|
|
- io_run_cancel(work, wqe);
|
|
- match->nr_pending++;
|
|
- if (!match->cancel_all)
|
|
- return;
|
|
+ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
|
|
+ struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
|
|
|
|
- /* not safe to continue after unlock */
|
|
- goto retry;
|
|
+ wq_list_for_each(node, prev, &acct->work_list) {
|
|
+ work = container_of(node, struct io_wq_work, list);
|
|
+ if (!match->fn(work, match->data))
|
|
+ continue;
|
|
+ io_wqe_remove_pending(wqe, work, prev);
|
|
+ raw_spin_unlock_irqrestore(&wqe->lock, flags);
|
|
+ io_run_cancel(work, wqe);
|
|
+ match->nr_pending++;
|
|
+ if (!match->cancel_all)
|
|
+ return;
|
|
+
|
|
+ /* not safe to continue after unlock */
|
|
+ goto retry;
|
|
+ }
|
|
}
|
|
raw_spin_unlock_irqrestore(&wqe->lock, flags);
|
|
}
|
|
@@ -971,18 +943,24 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
|
|
int sync, void *key)
|
|
{
|
|
struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
|
|
+ int i;
|
|
|
|
list_del_init(&wait->entry);
|
|
|
|
rcu_read_lock();
|
|
- io_wqe_activate_free_worker(wqe);
|
|
+ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
|
|
+ struct io_wqe_acct *acct = &wqe->acct[i];
|
|
+
|
|
+ if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
|
|
+ io_wqe_activate_free_worker(wqe, acct);
|
|
+ }
|
|
rcu_read_unlock();
|
|
return 1;
|
|
}
|
|
|
|
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
|
|
{
|
|
- int ret = -ENOMEM, node;
|
|
+ int ret, node, i;
|
|
struct io_wq *wq;
|
|
|
|
if (WARN_ON_ONCE(!data->free_work || !data->do_work))
|
|
@@ -1019,18 +997,20 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
|
|
goto err;
|
|
wq->wqes[node] = wqe;
|
|
wqe->node = alloc_node;
|
|
- wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
|
|
- wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND;
|
|
wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
|
|
- atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
|
|
wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
|
|
task_rlimit(current, RLIMIT_NPROC);
|
|
- atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
|
|
- wqe->wait.func = io_wqe_hash_wake;
|
|
INIT_LIST_HEAD(&wqe->wait.entry);
|
|
+ wqe->wait.func = io_wqe_hash_wake;
|
|
+ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
|
|
+ struct io_wqe_acct *acct = &wqe->acct[i];
|
|
+
|
|
+ acct->index = i;
|
|
+ atomic_set(&acct->nr_running, 0);
|
|
+ INIT_WQ_LIST(&acct->work_list);
|
|
+ }
|
|
wqe->wq = wq;
|
|
raw_spin_lock_init(&wqe->lock);
|
|
- INIT_WQ_LIST(&wqe->work_list);
|
|
INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
|
|
INIT_LIST_HEAD(&wqe->all_list);
|
|
}
|