zfs recv hangs if max recordsize is less than received recordsize

- Some optimizations for bqueue enqueue/dequeue.
- Added a fix to prevent deadlock when both bqueue_enqueue_impl()
and bqueue_dequeue() waits for signal to be triggered.

Reviewed-by: Alexander Motin <mav@FreeBSD.org>
Reviewed-by: Ryan Moeller <ryan@iXsystems.com>
Signed-off-by: Ameer Hamza <ahamza@ixsystems.com>
Closes #13855
This commit is contained in:
Ameer Hamza 2022-09-17 01:52:25 +05:00 committed by GitHub
parent 8da218a7a2
commit 577d41d3b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 20 deletions

View File

@ -30,22 +30,22 @@ typedef struct bqueue {
kmutex_t bq_lock; kmutex_t bq_lock;
kcondvar_t bq_add_cv; kcondvar_t bq_add_cv;
kcondvar_t bq_pop_cv; kcondvar_t bq_pop_cv;
uint64_t bq_size; size_t bq_size;
uint64_t bq_maxsize; size_t bq_maxsize;
uint64_t bq_fill_fraction; uint_t bq_fill_fraction;
size_t bq_node_offset; size_t bq_node_offset;
} bqueue_t; } bqueue_t;
typedef struct bqueue_node { typedef struct bqueue_node {
list_node_t bqn_node; list_node_t bqn_node;
uint64_t bqn_size; size_t bqn_size;
} bqueue_node_t; } bqueue_node_t;
int bqueue_init(bqueue_t *, uint64_t, uint64_t, size_t); int bqueue_init(bqueue_t *, uint_t, size_t, size_t);
void bqueue_destroy(bqueue_t *); void bqueue_destroy(bqueue_t *);
void bqueue_enqueue(bqueue_t *, void *, uint64_t); void bqueue_enqueue(bqueue_t *, void *, size_t);
void bqueue_enqueue_flush(bqueue_t *, void *, uint64_t); void bqueue_enqueue_flush(bqueue_t *, void *, size_t);
void *bqueue_dequeue(bqueue_t *); void *bqueue_dequeue(bqueue_t *);
boolean_t bqueue_empty(bqueue_t *); boolean_t bqueue_empty(bqueue_t *);

View File

@ -1758,9 +1758,9 @@ typedef enum {
* against the cost of COWing a giant block to modify one byte, and the * against the cost of COWing a giant block to modify one byte, and the
* large latency of reading or writing a large block. * large latency of reading or writing a large block.
* *
* Note that although blocks up to 16MB are supported, the recordsize * The recordsize property can not be set larger than zfs_max_recordsize
* property can not be set larger than zfs_max_recordsize (default 1MB). * (default 16MB on 64-bit and 1MB on 32-bit). See the comment near
* See the comment near zfs_max_recordsize in dsl_dataset.c for details. * zfs_max_recordsize in dsl_dataset.c for details.
* *
* Note that although the LSIZE field of the blkptr_t can store sizes up * Note that although the LSIZE field of the blkptr_t can store sizes up
* to 32MB, the dnode's dn_datablkszsec can only store sizes up to * to 32MB, the dnode's dn_datablkszsec can only store sizes up to

View File

@ -42,8 +42,7 @@ obj2node(bqueue_t *q, void *data)
* Return 0 on success, or -1 on failure. * Return 0 on success, or -1 on failure.
*/ */
int int
bqueue_init(bqueue_t *q, uint64_t fill_fraction, uint64_t size, bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset)
size_t node_offset)
{ {
if (fill_fraction == 0) { if (fill_fraction == 0) {
return (-1); return (-1);
@ -78,22 +77,26 @@ bqueue_destroy(bqueue_t *q)
} }
static void static void
bqueue_enqueue_impl(bqueue_t *q, void *data, uint64_t item_size, bqueue_enqueue_impl(bqueue_t *q, void *data, size_t item_size, boolean_t flush)
boolean_t flush)
{ {
ASSERT3U(item_size, >, 0); ASSERT3U(item_size, >, 0);
ASSERT3U(item_size, <=, q->bq_maxsize); ASSERT3U(item_size, <=, q->bq_maxsize);
mutex_enter(&q->bq_lock); mutex_enter(&q->bq_lock);
obj2node(q, data)->bqn_size = item_size; obj2node(q, data)->bqn_size = item_size;
while (q->bq_size + item_size > q->bq_maxsize) { while (q->bq_size && q->bq_size + item_size > q->bq_maxsize) {
/*
* Wake up bqueue_dequeue() thread if already sleeping in order
* to prevent the deadlock condition
*/
cv_signal(&q->bq_pop_cv);
cv_wait_sig(&q->bq_add_cv, &q->bq_lock); cv_wait_sig(&q->bq_add_cv, &q->bq_lock);
} }
q->bq_size += item_size; q->bq_size += item_size;
list_insert_tail(&q->bq_list, data); list_insert_tail(&q->bq_list, data);
if (q->bq_size >= q->bq_maxsize / q->bq_fill_fraction)
cv_signal(&q->bq_pop_cv);
if (flush) if (flush)
cv_broadcast(&q->bq_pop_cv); cv_broadcast(&q->bq_pop_cv);
else if (q->bq_size >= q->bq_maxsize / q->bq_fill_fraction)
cv_signal(&q->bq_pop_cv);
mutex_exit(&q->bq_lock); mutex_exit(&q->bq_lock);
} }
@ -103,7 +106,7 @@ bqueue_enqueue_impl(bqueue_t *q, void *data, uint64_t item_size,
* > 0. * > 0.
*/ */
void void
bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size) bqueue_enqueue(bqueue_t *q, void *data, size_t item_size)
{ {
bqueue_enqueue_impl(q, data, item_size, B_FALSE); bqueue_enqueue_impl(q, data, item_size, B_FALSE);
} }
@ -117,7 +120,7 @@ bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size)
* destroy the condvar before the enqueuing thread is done. * destroy the condvar before the enqueuing thread is done.
*/ */
void void
bqueue_enqueue_flush(bqueue_t *q, void *data, uint64_t item_size) bqueue_enqueue_flush(bqueue_t *q, void *data, size_t item_size)
{ {
bqueue_enqueue_impl(q, data, item_size, B_TRUE); bqueue_enqueue_impl(q, data, item_size, B_TRUE);
} }
@ -130,7 +133,7 @@ void *
bqueue_dequeue(bqueue_t *q) bqueue_dequeue(bqueue_t *q)
{ {
void *ret = NULL; void *ret = NULL;
uint64_t item_size; size_t item_size;
mutex_enter(&q->bq_lock); mutex_enter(&q->bq_lock);
while (q->bq_size == 0) { while (q->bq_size == 0) {
cv_wait_sig(&q->bq_pop_cv, &q->bq_lock); cv_wait_sig(&q->bq_pop_cv, &q->bq_lock);