From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Tue, 22 Oct 2019 12:48:17 +0200 Subject: [PATCH] qmp_backup: run backup related code inside coroutines So that we can safely use coroutines/yield everywhere. Signed-off-by: Dietmar Maurer --- blockdev.c | 250 ++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 180 insertions(+), 70 deletions(-) diff --git a/blockdev.c b/blockdev.c index 2466a02cbd..85031de942 100644 --- a/blockdev.c +++ b/blockdev.c @@ -3153,6 +3153,34 @@ out: /* PVE backup related function */ +typedef struct BlockOnCoroutineWrapper { + AioContext *ctx; + CoroutineEntry *entry; + void *entry_arg; + bool finished; +} BlockOnCoroutineWrapper; + +static void coroutine_fn block_on_coroutine_wrapper(void *opaque) +{ + BlockOnCoroutineWrapper *wrapper = opaque; + wrapper->entry(wrapper->entry_arg); + wrapper->finished = true; +} + +static void block_on_coroutine_fn(CoroutineEntry *entry, void *entry_arg) +{ + AioContext *ctx = qemu_get_current_aio_context(); + BlockOnCoroutineWrapper wrapper = { + .finished = false, + .entry = entry, + .entry_arg = entry_arg, + .ctx = ctx, + }; + Coroutine *wrapper_co = qemu_coroutine_create(block_on_coroutine_wrapper, &wrapper); + aio_co_enter(ctx, wrapper_co); + AIO_WAIT_WHILE(ctx, !wrapper.finished); +} + static struct PVEBackupState { Error *error; bool cancel; @@ -3180,12 +3208,14 @@ typedef struct PVEBackupDevInfo { BlockDriverState *target; } PVEBackupDevInfo; -static void pvebackup_run_next_job(void); +static void pvebackup_co_run_next_job(void); -static int pvebackup_dump_cb(void *opaque, BlockBackend *target, +static int coroutine_fn pvebackup_co_dump_cb(void *opaque, BlockBackend *target, uint64_t start, uint64_t bytes, const void *pbuf) { + assert(qemu_in_coroutine()); + const uint64_t size = bytes; const unsigned char *buf = pbuf; PVEBackupDevInfo *di = opaque; @@ -3247,8 +3277,10 @@ static int pvebackup_dump_cb(void *opaque, BlockBackend *target, return size; } -static void pvebackup_cleanup(void) +static void coroutine_fn pvebackup_co_cleanup(void) { + assert(qemu_in_coroutine()); + qemu_mutex_lock(&backup_state.backup_mutex); // Avoid race between block jobs and backup-cancel command: if (!backup_state.vmaw) { @@ -3270,18 +3302,19 @@ static void pvebackup_cleanup(void) qemu_mutex_unlock(&backup_state.backup_mutex); } -static void coroutine_fn backup_close_vma_stream(void *opaque) -{ - PVEBackupDevInfo *di = opaque; +typedef struct PVEBackupCompeteCallbackData { + PVEBackupDevInfo *di; + int result; +} PVEBackupCompeteCallbackData; - vma_writer_close_stream(backup_state.vmaw, di->dev_id); -} - -static void pvebackup_complete_cb(void *opaque, int ret) +static void coroutine_fn pvebackup_co_complete_cb(void *opaque) { - // This always runs in the main loop + assert(qemu_in_coroutine()); - PVEBackupDevInfo *di = opaque; + PVEBackupCompeteCallbackData *cb_data = opaque; + + PVEBackupDevInfo *di = cb_data->di; + int ret = cb_data->result; di->completed = true; @@ -3294,8 +3327,7 @@ static void pvebackup_complete_cb(void *opaque, int ret) di->target = NULL; if (backup_state.vmaw) { - Coroutine *co = qemu_coroutine_create(backup_close_vma_stream, di); - qemu_coroutine_enter(co); + vma_writer_close_stream(backup_state.vmaw, di->dev_id); } // remove self from job queue @@ -3305,12 +3337,25 @@ static void pvebackup_complete_cb(void *opaque, int ret) qemu_mutex_unlock(&backup_state.backup_mutex); if (!backup_state.cancel) { - pvebackup_run_next_job(); + pvebackup_co_run_next_job(); } } -static void pvebackup_cancel(void *opaque) +static void pvebackup_complete_cb(void *opaque, int ret) { + // This always called from the main loop + PVEBackupCompeteCallbackData cb_data = { + .di = opaque, + .result = ret, + }; + + block_on_coroutine_fn(pvebackup_co_complete_cb, &cb_data); +} + +static void coroutine_fn pvebackup_co_cancel(void *opaque) +{ + assert(qemu_in_coroutine()); + backup_state.cancel = true; qemu_mutex_lock(&backup_state.backup_mutex); // Avoid race between block jobs and backup-cancel command: @@ -3345,21 +3390,16 @@ static void pvebackup_cancel(void *opaque) } } - qemu_mutex_unlock(&backup_state.backup_mutex); - pvebackup_cleanup(); + qemu_co_mutex_unlock(&backup_state.backup_mutex); + pvebackup_co_cleanup(); } void qmp_backup_cancel(Error **errp) { if (!backup_state.backup_mutex_initialized) return; - Coroutine *co = qemu_coroutine_create(pvebackup_cancel, NULL); - qemu_coroutine_enter(co); - while (backup_state.vmaw) { - /* vma writer use main aio context */ - aio_poll(qemu_get_aio_context(), true); - } + block_on_coroutine_fn(pvebackup_co_cancel, NULL); } static int config_to_vma(const char *file, BackupFormat format, @@ -3400,8 +3440,11 @@ static int config_to_vma(const char *file, BackupFormat format, } bool job_should_pause(Job *job); -static void pvebackup_run_next_job(void) + +static void coroutine_fn pvebackup_co_run_next_job(void) { + assert(qemu_in_coroutine()); + qemu_mutex_lock(&backup_state.backup_mutex); GList *l = backup_state.di_list; @@ -3427,16 +3470,33 @@ static void pvebackup_run_next_job(void) qemu_mutex_unlock(&backup_state.backup_mutex); // no more jobs, run the cleanup - pvebackup_cleanup(); + pvebackup_co_cleanup(); } -UuidInfo *qmp_backup(const char *backup_file, bool has_format, - BackupFormat format, - bool has_config_file, const char *config_file, - bool has_firewall_file, const char *firewall_file, - bool has_devlist, const char *devlist, - bool has_speed, int64_t speed, Error **errp) +typedef struct QmpBackupTask { + const char *backup_file; + bool has_format; + BackupFormat format; + bool has_config_file; + const char *config_file; + bool has_firewall_file; + const char *firewall_file; + bool has_devlist; + const char *devlist; + bool has_speed; + int64_t speed; + Error **errp; + UuidInfo *result; +} QmpBackupTask; + +static void coroutine_fn pvebackup_co_start(void *opaque) { + assert(qemu_in_coroutine()); + + QmpBackupTask *task = opaque; + + task->result = NULL; // just to be sure + BlockBackend *blk; BlockDriverState *bs = NULL; const char *backup_dir = NULL; @@ -3455,16 +3515,16 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format, } if (backup_state.di_list) { - error_set(errp, ERROR_CLASS_GENERIC_ERROR, + error_set(task->errp, ERROR_CLASS_GENERIC_ERROR, "previous backup not finished"); - return NULL; + return; } /* Todo: try to auto-detect format based on file name */ - format = has_format ? format : BACKUP_FORMAT_VMA; + BackupFormat format = task->has_format ? task->format : BACKUP_FORMAT_VMA; - if (has_devlist) { - devs = g_strsplit_set(devlist, ",;:", -1); + if (task->has_devlist) { + devs = g_strsplit_set(task->devlist, ",;:", -1); gchar **d = devs; while (d && *d) { @@ -3472,18 +3532,18 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format, if (blk) { bs = blk_bs(blk); if (bdrv_is_read_only(bs)) { - error_setg(errp, "Node '%s' is read only", *d); + error_setg(task->errp, "Node '%s' is read only", *d); goto err; } if (!bdrv_is_inserted(bs)) { - error_setg(errp, QERR_DEVICE_HAS_NO_MEDIUM, *d); + error_setg(task->errp, QERR_DEVICE_HAS_NO_MEDIUM, *d); goto err; } PVEBackupDevInfo *di = g_new0(PVEBackupDevInfo, 1); di->bs = bs; di_list = g_list_append(di_list, di); } else { - error_set(errp, ERROR_CLASS_DEVICE_NOT_FOUND, + error_set(task->errp, ERROR_CLASS_DEVICE_NOT_FOUND, "Device '%s' not found", *d); goto err; } @@ -3506,7 +3566,7 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format, } if (!di_list) { - error_set(errp, ERROR_CLASS_GENERIC_ERROR, "empty device list"); + error_set(task->errp, ERROR_CLASS_GENERIC_ERROR, "empty device list"); goto err; } @@ -3516,13 +3576,13 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format, while (l) { PVEBackupDevInfo *di = (PVEBackupDevInfo *)l->data; l = g_list_next(l); - if (bdrv_op_is_blocked(di->bs, BLOCK_OP_TYPE_BACKUP_SOURCE, errp)) { + if (bdrv_op_is_blocked(di->bs, BLOCK_OP_TYPE_BACKUP_SOURCE, task->errp)) { goto err; } ssize_t size = bdrv_getlength(di->bs); if (size < 0) { - error_setg_errno(errp, -di->size, "bdrv_getlength failed"); + error_setg_errno(task->errp, -di->size, "bdrv_getlength failed"); goto err; } di->size = size; @@ -3532,10 +3592,10 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format, uuid_generate(uuid); if (format == BACKUP_FORMAT_VMA) { - vmaw = vma_writer_create(backup_file, uuid, &local_err); + vmaw = vma_writer_create(task->backup_file, uuid, &local_err); if (!vmaw) { if (local_err) { - error_propagate(errp, local_err); + error_propagate(task->errp, local_err); } goto err; } @@ -3549,18 +3609,18 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format, const char *devname = bdrv_get_device_name(di->bs); di->dev_id = vma_writer_register_stream(vmaw, devname, di->size); if (di->dev_id <= 0) { - error_set(errp, ERROR_CLASS_GENERIC_ERROR, + error_set(task->errp, ERROR_CLASS_GENERIC_ERROR, "register_stream failed"); goto err; } } } else if (format == BACKUP_FORMAT_DIR) { - if (mkdir(backup_file, 0640) != 0) { - error_setg_errno(errp, errno, "can't create directory '%s'\n", - backup_file); + if (mkdir(task->backup_file, 0640) != 0) { + error_setg_errno(task->errp, errno, "can't create directory '%s'\n", + task->backup_file); goto err; } - backup_dir = backup_file; + backup_dir = task->backup_file; l = di_list; while (l) { @@ -3574,31 +3634,31 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format, bdrv_img_create(di->targetfile, "raw", NULL, NULL, NULL, di->size, flags, false, &local_err); if (local_err) { - error_propagate(errp, local_err); + error_propagate(task->errp, local_err); goto err; } di->target = bdrv_open(di->targetfile, NULL, NULL, flags, &local_err); if (!di->target) { - error_propagate(errp, local_err); + error_propagate(task->errp, local_err); goto err; } } } else { - error_set(errp, ERROR_CLASS_GENERIC_ERROR, "unknown backup format"); + error_set(task->errp, ERROR_CLASS_GENERIC_ERROR, "unknown backup format"); goto err; } /* add configuration file to archive */ - if (has_config_file) { - if (config_to_vma(config_file, format, backup_dir, vmaw, errp) != 0) { + if (task->has_config_file) { + if (config_to_vma(task->config_file, format, backup_dir, vmaw, task->errp) != 0) { goto err; } } /* add firewall file to archive */ - if (has_firewall_file) { - if (config_to_vma(firewall_file, format, backup_dir, vmaw, errp) != 0) { + if (task->has_firewall_file) { + if (config_to_vma(task->firewall_file, format, backup_dir, vmaw, task->errp) != 0) { goto err; } } @@ -3611,7 +3671,7 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format, backup_state.error = NULL; } - backup_state.speed = (has_speed && speed > 0) ? speed : 0; + backup_state.speed = (task->has_speed && task->speed > 0) ? task->speed : 0; backup_state.start_time = time(NULL); backup_state.end_time = 0; @@ -3619,7 +3679,7 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format, if (backup_state.backup_file) { g_free(backup_state.backup_file); } - backup_state.backup_file = g_strdup(backup_file); + backup_state.backup_file = g_strdup(task->backup_file); backup_state.vmaw = vmaw; @@ -3638,14 +3698,13 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format, while (l) { PVEBackupDevInfo *di = (PVEBackupDevInfo *)l->data; l = g_list_next(l); - job = backup_job_create(NULL, di->bs, di->target, speed, MIRROR_SYNC_MODE_FULL, NULL, + job = backup_job_create(NULL, di->bs, di->target, backup_state.speed, MIRROR_SYNC_MODE_FULL, NULL, false, BLOCKDEV_ON_ERROR_REPORT, BLOCKDEV_ON_ERROR_REPORT, - JOB_DEFAULT, - pvebackup_dump_cb, pvebackup_complete_cb, di, + JOB_DEFAULT, pvebackup_co_dump_cb, pvebackup_complete_cb, di, 1, NULL, &local_err); if (!job || local_err != NULL) { error_setg(&backup_state.error, "backup_job_create failed"); - pvebackup_cancel(NULL); + pvebackup_co_cancel(NULL); } else { job_start(&job->job); } @@ -3658,13 +3717,14 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format, qemu_mutex_unlock(&backup_state.backup_mutex); if (!backup_state.error) { - pvebackup_run_next_job(); // run one job + pvebackup_co_run_next_job(); // run one job } uuid_info = g_malloc0(sizeof(*uuid_info)); uuid_info->UUID = g_strdup(backup_state.uuid_str); - return uuid_info; + task->result = uuid_info; + return; err: @@ -3691,23 +3751,61 @@ err: if (vmaw) { Error *err = NULL; vma_writer_close(vmaw, &err); - unlink(backup_file); + unlink(task->backup_file); } if (backup_dir) { rmdir(backup_dir); } - return NULL; + task->result = NULL; + return; } -BackupStatus *qmp_query_backup(Error **errp) +UuidInfo *qmp_backup(const char *backup_file, bool has_format, + BackupFormat format, + bool has_config_file, const char *config_file, + bool has_firewall_file, const char *firewall_file, + bool has_devlist, const char *devlist, + bool has_speed, int64_t speed, Error **errp) +{ + QmpBackupTask task = { + .backup_file = backup_file, + .has_format = has_format, + .format = format, + .has_config_file = has_config_file, + .config_file = config_file, + .has_firewall_file = has_firewall_file, + .firewall_file = firewall_file, + .has_devlist = has_devlist, + .devlist = devlist, + .has_speed = has_speed, + .speed = speed, + .errp = errp, + }; + + block_on_coroutine_fn(pvebackup_co_start, &task); + return task.result; +} + + +typedef struct QmpQueryBackupTask { + Error **errp; + BackupStatus *result; +} QmpQueryBackupTask; + +static void coroutine_fn pvebackup_co_query(void *opaque) { + assert(qemu_in_coroutine()); + + QmpQueryBackupTask *task = opaque; + BackupStatus *info = g_malloc0(sizeof(*info)); if (!backup_state.start_time) { /* not started, return {} */ - return info; + task->result = info; + return; } info->has_status = true; @@ -3743,7 +3841,19 @@ BackupStatus *qmp_query_backup(Error **errp) info->has_transferred = true; info->transferred = backup_state.transferred; - return info; + task->result = info; +} + +BackupStatus *qmp_query_backup(Error **errp) +{ + QmpQueryBackupTask task = { + .errp = errp, + .result = NULL, + }; + + block_on_coroutine_fn(pvebackup_co_query, &task); + + return task.result; } void qmp_block_stream(bool has_job_id, const char *job_id, const char *device,