diff --git a/man/man5/zfs-module-parameters.5 b/man/man5/zfs-module-parameters.5 index 311d898ff..3c61ac1a9 100644 --- a/man/man5/zfs-module-parameters.5 +++ b/man/man5/zfs-module-parameters.5 @@ -2995,6 +2995,20 @@ must be at least twice the maximum block size in use. Default value: \fB16,777,216\fR. .RE +.sp +.ne 2 +.na +\fBzfs_recv_write_batch_size\fR (int) +.ad +.RS 12n +The maximum amount of data (in bytes) that \fBzfs receive\fR will write in +one DMU transaction. This is the uncompressed size, even when receiving a +compressed send stream. This setting will not reduce the write size below +a single block. Capped at a maximum of 32MB +.sp +Default value: \fB1MB\fR. +.RE + .sp .ne 2 .na diff --git a/module/zfs/dmu_recv.c b/module/zfs/dmu_recv.c index 0fa0e015b..62881753f 100644 --- a/module/zfs/dmu_recv.c +++ b/module/zfs/dmu_recv.c @@ -65,6 +65,7 @@ int zfs_recv_queue_length = SPA_MAXBLOCKSIZE; int zfs_recv_queue_ff = 20; +int zfs_recv_write_batch_size = 1024 * 1024; static char *dmu_recv_tag = "dmu_recv_tag"; const char *recv_clone_name = "%recv"; @@ -110,6 +111,8 @@ struct receive_writer_arg { uint64_t max_object; /* highest object ID referenced in stream */ uint64_t bytes_read; /* bytes read when current record created */ + list_t write_batch; + /* Encryption parameters for the last received DRR_OBJECT_RANGE */ boolean_t or_crypt_params_present; uint64_t or_firstobj; @@ -1698,13 +1701,108 @@ receive_freeobjects(struct receive_writer_arg *rwa, return (0); } -noinline static int -receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw, - arc_buf_t *abuf) +/* + * Note: if this fails, the caller will clean up any records left on the + * rwa->write_batch list. + */ +static int +flush_write_batch_impl(struct receive_writer_arg *rwa) { - int err; - dmu_tx_t *tx; dnode_t *dn; + int err; + + if (dnode_hold(rwa->os, rwa->last_object, FTAG, &dn) != 0) + return (SET_ERROR(EINVAL)); + + struct receive_record_arg *last_rrd = list_tail(&rwa->write_batch); + struct drr_write *last_drrw = &last_rrd->header.drr_u.drr_write; + + struct receive_record_arg *first_rrd = list_head(&rwa->write_batch); + struct drr_write *first_drrw = &first_rrd->header.drr_u.drr_write; + + ASSERT3U(rwa->last_object, ==, last_drrw->drr_object); + ASSERT3U(rwa->last_offset, ==, last_drrw->drr_offset); + + dmu_tx_t *tx = dmu_tx_create(rwa->os); + dmu_tx_hold_write_by_dnode(tx, dn, first_drrw->drr_offset, + last_drrw->drr_offset - first_drrw->drr_offset + + last_drrw->drr_logical_size); + err = dmu_tx_assign(tx, TXG_WAIT); + if (err != 0) { + dmu_tx_abort(tx); + dnode_rele(dn, FTAG); + return (err); + } + + struct receive_record_arg *rrd; + while ((rrd = list_head(&rwa->write_batch)) != NULL) { + struct drr_write *drrw = &rrd->header.drr_u.drr_write; + arc_buf_t *abuf = rrd->arc_buf; + + ASSERT3U(drrw->drr_object, ==, rwa->last_object); + + if (rwa->byteswap && !arc_is_encrypted(abuf) && + arc_get_compression(abuf) == ZIO_COMPRESS_OFF) { + dmu_object_byteswap_t byteswap = + DMU_OT_BYTESWAP(drrw->drr_type); + dmu_ot_byteswap[byteswap].ob_func(abuf->b_data, + DRR_WRITE_PAYLOAD_SIZE(drrw)); + } + + err = dmu_assign_arcbuf_by_dnode(dn, + drrw->drr_offset, abuf, tx); + if (err != 0) { + /* + * This rrd is left on the list, so the caller will + * free it (and the arc_buf). + */ + break; + } + + /* + * Note: If the receive fails, we want the resume stream to + * start with the same record that we last successfully + * received (as opposed to the next record), so that we can + * verify that we are resuming from the correct location. + */ + save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx); + + list_remove(&rwa->write_batch, rrd); + kmem_free(rrd, sizeof (*rrd)); + } + + dmu_tx_commit(tx); + dnode_rele(dn, FTAG); + return (err); +} + +noinline static int +flush_write_batch(struct receive_writer_arg *rwa) +{ + if (list_is_empty(&rwa->write_batch)) + return (0); + int err = rwa->err; + if (err == 0) + err = flush_write_batch_impl(rwa); + if (err != 0) { + struct receive_record_arg *rrd; + while ((rrd = list_remove_head(&rwa->write_batch)) != NULL) { + dmu_return_arcbuf(rrd->arc_buf); + kmem_free(rrd, sizeof (*rrd)); + } + } + ASSERT(list_is_empty(&rwa->write_batch)); + return (err); +} + +noinline static int +receive_process_write_record(struct receive_writer_arg *rwa, + struct receive_record_arg *rrd) +{ + int err = 0; + + ASSERT3U(rrd->header.drr_type, ==, DRR_WRITE); + struct drr_write *drrw = &rrd->header.drr_u.drr_write; if (drrw->drr_offset + drrw->drr_logical_size < drrw->drr_offset || !DMU_OT_IS_VALID(drrw->drr_type)) @@ -1719,52 +1817,31 @@ receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw, drrw->drr_offset < rwa->last_offset)) { return (SET_ERROR(EINVAL)); } + + struct receive_record_arg *first_rrd = list_head(&rwa->write_batch); + struct drr_write *first_drrw = &first_rrd->header.drr_u.drr_write; + uint64_t batch_size = + MIN(zfs_recv_write_batch_size, DMU_MAX_ACCESS / 2); + if (first_rrd != NULL && + (drrw->drr_object != first_drrw->drr_object || + drrw->drr_offset >= first_drrw->drr_offset + batch_size)) { + err = flush_write_batch(rwa); + if (err != 0) + return (err); + } + rwa->last_object = drrw->drr_object; rwa->last_offset = drrw->drr_offset; if (rwa->last_object > rwa->max_object) rwa->max_object = rwa->last_object; - if (dmu_object_info(rwa->os, drrw->drr_object, NULL) != 0) - return (SET_ERROR(EINVAL)); - - tx = dmu_tx_create(rwa->os); - dmu_tx_hold_write(tx, drrw->drr_object, - drrw->drr_offset, drrw->drr_logical_size); - err = dmu_tx_assign(tx, TXG_WAIT); - if (err != 0) { - dmu_tx_abort(tx); - return (err); - } - - if (rwa->byteswap && !arc_is_encrypted(abuf) && - arc_get_compression(abuf) == ZIO_COMPRESS_OFF) { - dmu_object_byteswap_t byteswap = - DMU_OT_BYTESWAP(drrw->drr_type); - dmu_ot_byteswap[byteswap].ob_func(abuf->b_data, - DRR_WRITE_PAYLOAD_SIZE(drrw)); - } - - /* use the bonus buf to look up the dnode in dmu_assign_arcbuf */ - VERIFY0(dnode_hold(rwa->os, drrw->drr_object, FTAG, &dn)); - err = dmu_assign_arcbuf_by_dnode(dn, drrw->drr_offset, abuf, tx); - if (err != 0) { - dnode_rele(dn, FTAG); - dmu_tx_commit(tx); - return (err); - } - dnode_rele(dn, FTAG); - + list_insert_tail(&rwa->write_batch, rrd); /* - * Note: If the receive fails, we want the resume stream to start - * with the same record that we last successfully received (as opposed - * to the next record), so that we can verify that we are - * resuming from the correct location. + * Return EAGAIN to indicate that we will use this rrd again, + * so the caller should not free it */ - save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx); - dmu_tx_commit(tx); - - return (0); + return (EAGAIN); } /* @@ -2482,6 +2559,22 @@ receive_process_record(struct receive_writer_arg *rwa, ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read); rwa->bytes_read = rrd->bytes_read; + if (rrd->header.drr_type != DRR_WRITE) { + err = flush_write_batch(rwa); + if (err != 0) { + if (rrd->arc_buf != NULL) { + dmu_return_arcbuf(rrd->arc_buf); + rrd->arc_buf = NULL; + rrd->payload = NULL; + } else if (rrd->payload != NULL) { + kmem_free(rrd->payload, rrd->payload_size); + rrd->payload = NULL; + } + + return (err); + } + } + switch (rrd->header.drr_type) { case DRR_OBJECT: { @@ -2500,13 +2593,17 @@ receive_process_record(struct receive_writer_arg *rwa, } case DRR_WRITE: { - struct drr_write *drrw = &rrd->header.drr_u.drr_write; - err = receive_write(rwa, drrw, rrd->arc_buf); - /* if receive_write() is successful, it consumes the arc_buf */ - if (err != 0) + err = receive_process_write_record(rwa, rrd); + if (err != EAGAIN) { + /* + * On success, receive_process_write_record() returns + * EAGAIN to indicate that we do not want to free + * the rrd or arc_buf. + */ + ASSERT(err != 0); dmu_return_arcbuf(rrd->arc_buf); - rrd->arc_buf = NULL; - rrd->payload = NULL; + rrd->arc_buf = NULL; + } break; } case DRR_WRITE_BYREF: @@ -2582,8 +2679,9 @@ receive_writer_thread(void *arg) * on the queue, but we need to clear everything in it before we * can exit. */ + int err = 0; if (rwa->err == 0) { - rwa->err = receive_process_record(rwa, rrd); + err = receive_process_record(rwa, rrd); } else if (rrd->arc_buf != NULL) { dmu_return_arcbuf(rrd->arc_buf); rrd->arc_buf = NULL; @@ -2592,9 +2690,22 @@ receive_writer_thread(void *arg) kmem_free(rrd->payload, rrd->payload_size); rrd->payload = NULL; } - kmem_free(rrd, sizeof (*rrd)); + /* + * EAGAIN indicates that this record has been saved (on + * raw->write_batch), and will be used again, so we don't + * free it. + */ + if (err != EAGAIN) { + rwa->err = err; + kmem_free(rrd, sizeof (*rrd)); + } } kmem_free(rrd, sizeof (*rrd)); + + int err = flush_write_batch(rwa); + if (rwa->err == 0) + rwa->err = err; + mutex_enter(&rwa->mutex); rwa->done = B_TRUE; cv_signal(&rwa->cv); @@ -2759,6 +2870,8 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, int cleanup_fd, rwa->raw = drc->drc_raw; rwa->spill = drc->drc_spill; rwa->os->os_raw_receive = drc->drc_raw; + list_create(&rwa->write_batch, sizeof (struct receive_record_arg), + offsetof(struct receive_record_arg, node.bqn_node)); (void) thread_create(NULL, 0, receive_writer_thread, rwa, 0, curproc, TS_RUN, minclsyspri); @@ -2845,6 +2958,7 @@ dmu_recv_stream(dmu_recv_cookie_t *drc, int cleanup_fd, cv_destroy(&rwa->cv); mutex_destroy(&rwa->mutex); bqueue_destroy(&rwa->q); + list_destroy(&rwa->write_batch); if (err == 0) err = rwa->err; @@ -3236,4 +3350,7 @@ ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_length, INT, ZMOD_RW, ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_ff, INT, ZMOD_RW, "Receive queue fill fraction"); + +ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, write_batch_size, INT, ZMOD_RW, + "Maximum amount of writes to batch into one transaction"); /* END CSTYLED */