pve-qemu-qoup/debian/patches/pve/0032-qmp_backup-run-backup-related-code-inside-coroutines.patch

512 lines
17 KiB
Diff
Raw Normal View History

From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
From: Dietmar Maurer <dietmar@proxmox.com>
Date: Tue, 22 Oct 2019 12:48:17 +0200
Subject: [PATCH] qmp_backup: run backup related code inside coroutines
So that we can safely use coroutines/yield everywhere.
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
blockdev.c | 250 ++++++++++++++++++++++++++++++++++++++---------------
1 file changed, 180 insertions(+), 70 deletions(-)
diff --git a/blockdev.c b/blockdev.c
index 2466a02cbd..85031de942 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -3153,6 +3153,34 @@ out:
/* PVE backup related function */
+typedef struct BlockOnCoroutineWrapper {
+ AioContext *ctx;
+ CoroutineEntry *entry;
+ void *entry_arg;
+ bool finished;
+} BlockOnCoroutineWrapper;
+
+static void coroutine_fn block_on_coroutine_wrapper(void *opaque)
+{
+ BlockOnCoroutineWrapper *wrapper = opaque;
+ wrapper->entry(wrapper->entry_arg);
+ wrapper->finished = true;
+}
+
+static void block_on_coroutine_fn(CoroutineEntry *entry, void *entry_arg)
+{
+ AioContext *ctx = qemu_get_current_aio_context();
+ BlockOnCoroutineWrapper wrapper = {
+ .finished = false,
+ .entry = entry,
+ .entry_arg = entry_arg,
+ .ctx = ctx,
+ };
+ Coroutine *wrapper_co = qemu_coroutine_create(block_on_coroutine_wrapper, &wrapper);
+ aio_co_enter(ctx, wrapper_co);
+ AIO_WAIT_WHILE(ctx, !wrapper.finished);
+}
+
static struct PVEBackupState {
Error *error;
bool cancel;
@@ -3180,12 +3208,14 @@ typedef struct PVEBackupDevInfo {
BlockDriverState *target;
} PVEBackupDevInfo;
-static void pvebackup_run_next_job(void);
+static void pvebackup_co_run_next_job(void);
-static int pvebackup_dump_cb(void *opaque, BlockBackend *target,
+static int coroutine_fn pvebackup_co_dump_cb(void *opaque, BlockBackend *target,
uint64_t start, uint64_t bytes,
const void *pbuf)
{
+ assert(qemu_in_coroutine());
+
const uint64_t size = bytes;
const unsigned char *buf = pbuf;
PVEBackupDevInfo *di = opaque;
@@ -3247,8 +3277,10 @@ static int pvebackup_dump_cb(void *opaque, BlockBackend *target,
return size;
}
-static void pvebackup_cleanup(void)
+static void coroutine_fn pvebackup_co_cleanup(void)
{
+ assert(qemu_in_coroutine());
+
qemu_mutex_lock(&backup_state.backup_mutex);
// Avoid race between block jobs and backup-cancel command:
if (!backup_state.vmaw) {
@@ -3270,18 +3302,19 @@ static void pvebackup_cleanup(void)
qemu_mutex_unlock(&backup_state.backup_mutex);
}
-static void coroutine_fn backup_close_vma_stream(void *opaque)
-{
- PVEBackupDevInfo *di = opaque;
+typedef struct PVEBackupCompeteCallbackData {
+ PVEBackupDevInfo *di;
+ int result;
+} PVEBackupCompeteCallbackData;
- vma_writer_close_stream(backup_state.vmaw, di->dev_id);
-}
-
-static void pvebackup_complete_cb(void *opaque, int ret)
+static void coroutine_fn pvebackup_co_complete_cb(void *opaque)
{
- // This always runs in the main loop
+ assert(qemu_in_coroutine());
- PVEBackupDevInfo *di = opaque;
+ PVEBackupCompeteCallbackData *cb_data = opaque;
+
+ PVEBackupDevInfo *di = cb_data->di;
+ int ret = cb_data->result;
di->completed = true;
@@ -3294,8 +3327,7 @@ static void pvebackup_complete_cb(void *opaque, int ret)
di->target = NULL;
if (backup_state.vmaw) {
- Coroutine *co = qemu_coroutine_create(backup_close_vma_stream, di);
- qemu_coroutine_enter(co);
+ vma_writer_close_stream(backup_state.vmaw, di->dev_id);
}
// remove self from job queue
@@ -3305,12 +3337,25 @@ static void pvebackup_complete_cb(void *opaque, int ret)
qemu_mutex_unlock(&backup_state.backup_mutex);
if (!backup_state.cancel) {
- pvebackup_run_next_job();
+ pvebackup_co_run_next_job();
}
}
-static void pvebackup_cancel(void *opaque)
+static void pvebackup_complete_cb(void *opaque, int ret)
{
+ // This always called from the main loop
+ PVEBackupCompeteCallbackData cb_data = {
+ .di = opaque,
+ .result = ret,
+ };
+
+ block_on_coroutine_fn(pvebackup_co_complete_cb, &cb_data);
+}
+
+static void coroutine_fn pvebackup_co_cancel(void *opaque)
+{
+ assert(qemu_in_coroutine());
+
backup_state.cancel = true;
qemu_mutex_lock(&backup_state.backup_mutex);
// Avoid race between block jobs and backup-cancel command:
@@ -3345,21 +3390,16 @@ static void pvebackup_cancel(void *opaque)
}
}
- qemu_mutex_unlock(&backup_state.backup_mutex);
- pvebackup_cleanup();
+ qemu_co_mutex_unlock(&backup_state.backup_mutex);
+ pvebackup_co_cleanup();
}
void qmp_backup_cancel(Error **errp)
{
if (!backup_state.backup_mutex_initialized)
return;
- Coroutine *co = qemu_coroutine_create(pvebackup_cancel, NULL);
- qemu_coroutine_enter(co);
- while (backup_state.vmaw) {
- /* vma writer use main aio context */
- aio_poll(qemu_get_aio_context(), true);
- }
+ block_on_coroutine_fn(pvebackup_co_cancel, NULL);
}
static int config_to_vma(const char *file, BackupFormat format,
@@ -3400,8 +3440,11 @@ static int config_to_vma(const char *file, BackupFormat format,
}
bool job_should_pause(Job *job);
-static void pvebackup_run_next_job(void)
+
+static void coroutine_fn pvebackup_co_run_next_job(void)
{
+ assert(qemu_in_coroutine());
+
qemu_mutex_lock(&backup_state.backup_mutex);
GList *l = backup_state.di_list;
@@ -3427,16 +3470,33 @@ static void pvebackup_run_next_job(void)
qemu_mutex_unlock(&backup_state.backup_mutex);
// no more jobs, run the cleanup
- pvebackup_cleanup();
+ pvebackup_co_cleanup();
}
-UuidInfo *qmp_backup(const char *backup_file, bool has_format,
- BackupFormat format,
- bool has_config_file, const char *config_file,
- bool has_firewall_file, const char *firewall_file,
- bool has_devlist, const char *devlist,
- bool has_speed, int64_t speed, Error **errp)
+typedef struct QmpBackupTask {
+ const char *backup_file;
+ bool has_format;
+ BackupFormat format;
+ bool has_config_file;
+ const char *config_file;
+ bool has_firewall_file;
+ const char *firewall_file;
+ bool has_devlist;
+ const char *devlist;
+ bool has_speed;
+ int64_t speed;
+ Error **errp;
+ UuidInfo *result;
+} QmpBackupTask;
+
+static void coroutine_fn pvebackup_co_start(void *opaque)
{
+ assert(qemu_in_coroutine());
+
+ QmpBackupTask *task = opaque;
+
+ task->result = NULL; // just to be sure
+
BlockBackend *blk;
BlockDriverState *bs = NULL;
const char *backup_dir = NULL;
@@ -3455,16 +3515,16 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format,
}
if (backup_state.di_list) {
- error_set(errp, ERROR_CLASS_GENERIC_ERROR,
+ error_set(task->errp, ERROR_CLASS_GENERIC_ERROR,
"previous backup not finished");
- return NULL;
+ return;
}
/* Todo: try to auto-detect format based on file name */
- format = has_format ? format : BACKUP_FORMAT_VMA;
+ BackupFormat format = task->has_format ? task->format : BACKUP_FORMAT_VMA;
- if (has_devlist) {
- devs = g_strsplit_set(devlist, ",;:", -1);
+ if (task->has_devlist) {
+ devs = g_strsplit_set(task->devlist, ",;:", -1);
gchar **d = devs;
while (d && *d) {
@@ -3472,18 +3532,18 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format,
if (blk) {
bs = blk_bs(blk);
if (bdrv_is_read_only(bs)) {
- error_setg(errp, "Node '%s' is read only", *d);
+ error_setg(task->errp, "Node '%s' is read only", *d);
goto err;
}
if (!bdrv_is_inserted(bs)) {
- error_setg(errp, QERR_DEVICE_HAS_NO_MEDIUM, *d);
+ error_setg(task->errp, QERR_DEVICE_HAS_NO_MEDIUM, *d);
goto err;
}
PVEBackupDevInfo *di = g_new0(PVEBackupDevInfo, 1);
di->bs = bs;
di_list = g_list_append(di_list, di);
} else {
- error_set(errp, ERROR_CLASS_DEVICE_NOT_FOUND,
+ error_set(task->errp, ERROR_CLASS_DEVICE_NOT_FOUND,
"Device '%s' not found", *d);
goto err;
}
@@ -3506,7 +3566,7 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format,
}
if (!di_list) {
- error_set(errp, ERROR_CLASS_GENERIC_ERROR, "empty device list");
+ error_set(task->errp, ERROR_CLASS_GENERIC_ERROR, "empty device list");
goto err;
}
@@ -3516,13 +3576,13 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format,
while (l) {
PVEBackupDevInfo *di = (PVEBackupDevInfo *)l->data;
l = g_list_next(l);
- if (bdrv_op_is_blocked(di->bs, BLOCK_OP_TYPE_BACKUP_SOURCE, errp)) {
+ if (bdrv_op_is_blocked(di->bs, BLOCK_OP_TYPE_BACKUP_SOURCE, task->errp)) {
goto err;
}
ssize_t size = bdrv_getlength(di->bs);
if (size < 0) {
- error_setg_errno(errp, -di->size, "bdrv_getlength failed");
+ error_setg_errno(task->errp, -di->size, "bdrv_getlength failed");
goto err;
}
di->size = size;
@@ -3532,10 +3592,10 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format,
uuid_generate(uuid);
if (format == BACKUP_FORMAT_VMA) {
- vmaw = vma_writer_create(backup_file, uuid, &local_err);
+ vmaw = vma_writer_create(task->backup_file, uuid, &local_err);
if (!vmaw) {
if (local_err) {
- error_propagate(errp, local_err);
+ error_propagate(task->errp, local_err);
}
goto err;
}
@@ -3549,18 +3609,18 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format,
const char *devname = bdrv_get_device_name(di->bs);
di->dev_id = vma_writer_register_stream(vmaw, devname, di->size);
if (di->dev_id <= 0) {
- error_set(errp, ERROR_CLASS_GENERIC_ERROR,
+ error_set(task->errp, ERROR_CLASS_GENERIC_ERROR,
"register_stream failed");
goto err;
}
}
} else if (format == BACKUP_FORMAT_DIR) {
- if (mkdir(backup_file, 0640) != 0) {
- error_setg_errno(errp, errno, "can't create directory '%s'\n",
- backup_file);
+ if (mkdir(task->backup_file, 0640) != 0) {
+ error_setg_errno(task->errp, errno, "can't create directory '%s'\n",
+ task->backup_file);
goto err;
}
- backup_dir = backup_file;
+ backup_dir = task->backup_file;
l = di_list;
while (l) {
@@ -3574,31 +3634,31 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format,
bdrv_img_create(di->targetfile, "raw", NULL, NULL, NULL,
di->size, flags, false, &local_err);
if (local_err) {
- error_propagate(errp, local_err);
+ error_propagate(task->errp, local_err);
goto err;
}
di->target = bdrv_open(di->targetfile, NULL, NULL, flags, &local_err);
if (!di->target) {
- error_propagate(errp, local_err);
+ error_propagate(task->errp, local_err);
goto err;
}
}
} else {
- error_set(errp, ERROR_CLASS_GENERIC_ERROR, "unknown backup format");
+ error_set(task->errp, ERROR_CLASS_GENERIC_ERROR, "unknown backup format");
goto err;
}
/* add configuration file to archive */
- if (has_config_file) {
- if (config_to_vma(config_file, format, backup_dir, vmaw, errp) != 0) {
+ if (task->has_config_file) {
+ if (config_to_vma(task->config_file, format, backup_dir, vmaw, task->errp) != 0) {
goto err;
}
}
/* add firewall file to archive */
- if (has_firewall_file) {
- if (config_to_vma(firewall_file, format, backup_dir, vmaw, errp) != 0) {
+ if (task->has_firewall_file) {
+ if (config_to_vma(task->firewall_file, format, backup_dir, vmaw, task->errp) != 0) {
goto err;
}
}
@@ -3611,7 +3671,7 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format,
backup_state.error = NULL;
}
- backup_state.speed = (has_speed && speed > 0) ? speed : 0;
+ backup_state.speed = (task->has_speed && task->speed > 0) ? task->speed : 0;
backup_state.start_time = time(NULL);
backup_state.end_time = 0;
@@ -3619,7 +3679,7 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format,
if (backup_state.backup_file) {
g_free(backup_state.backup_file);
}
- backup_state.backup_file = g_strdup(backup_file);
+ backup_state.backup_file = g_strdup(task->backup_file);
backup_state.vmaw = vmaw;
@@ -3638,14 +3698,13 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format,
while (l) {
PVEBackupDevInfo *di = (PVEBackupDevInfo *)l->data;
l = g_list_next(l);
- job = backup_job_create(NULL, di->bs, di->target, speed, MIRROR_SYNC_MODE_FULL, NULL,
+ job = backup_job_create(NULL, di->bs, di->target, backup_state.speed, MIRROR_SYNC_MODE_FULL, NULL,
false, BLOCKDEV_ON_ERROR_REPORT, BLOCKDEV_ON_ERROR_REPORT,
- JOB_DEFAULT,
- pvebackup_dump_cb, pvebackup_complete_cb, di,
+ JOB_DEFAULT, pvebackup_co_dump_cb, pvebackup_complete_cb, di,
1, NULL, &local_err);
if (!job || local_err != NULL) {
error_setg(&backup_state.error, "backup_job_create failed");
- pvebackup_cancel(NULL);
+ pvebackup_co_cancel(NULL);
} else {
job_start(&job->job);
}
@@ -3658,13 +3717,14 @@ UuidInfo *qmp_backup(const char *backup_file, bool has_format,
qemu_mutex_unlock(&backup_state.backup_mutex);
if (!backup_state.error) {
- pvebackup_run_next_job(); // run one job
+ pvebackup_co_run_next_job(); // run one job
}
uuid_info = g_malloc0(sizeof(*uuid_info));
uuid_info->UUID = g_strdup(backup_state.uuid_str);
- return uuid_info;
+ task->result = uuid_info;
+ return;
err:
@@ -3691,23 +3751,61 @@ err:
if (vmaw) {
Error *err = NULL;
vma_writer_close(vmaw, &err);
- unlink(backup_file);
+ unlink(task->backup_file);
}
if (backup_dir) {
rmdir(backup_dir);
}
- return NULL;
+ task->result = NULL;
+ return;
}
-BackupStatus *qmp_query_backup(Error **errp)
+UuidInfo *qmp_backup(const char *backup_file, bool has_format,
+ BackupFormat format,
+ bool has_config_file, const char *config_file,
+ bool has_firewall_file, const char *firewall_file,
+ bool has_devlist, const char *devlist,
+ bool has_speed, int64_t speed, Error **errp)
+{
+ QmpBackupTask task = {
+ .backup_file = backup_file,
+ .has_format = has_format,
+ .format = format,
+ .has_config_file = has_config_file,
+ .config_file = config_file,
+ .has_firewall_file = has_firewall_file,
+ .firewall_file = firewall_file,
+ .has_devlist = has_devlist,
+ .devlist = devlist,
+ .has_speed = has_speed,
+ .speed = speed,
+ .errp = errp,
+ };
+
+ block_on_coroutine_fn(pvebackup_co_start, &task);
+ return task.result;
+}
+
+
+typedef struct QmpQueryBackupTask {
+ Error **errp;
+ BackupStatus *result;
+} QmpQueryBackupTask;
+
+static void coroutine_fn pvebackup_co_query(void *opaque)
{
+ assert(qemu_in_coroutine());
+
+ QmpQueryBackupTask *task = opaque;
+
BackupStatus *info = g_malloc0(sizeof(*info));
if (!backup_state.start_time) {
/* not started, return {} */
- return info;
+ task->result = info;
+ return;
}
info->has_status = true;
@@ -3743,7 +3841,19 @@ BackupStatus *qmp_query_backup(Error **errp)
info->has_transferred = true;
info->transferred = backup_state.transferred;
- return info;
+ task->result = info;
+}
+
+BackupStatus *qmp_query_backup(Error **errp)
+{
+ QmpQueryBackupTask task = {
+ .errp = errp,
+ .result = NULL,
+ };
+
+ block_on_coroutine_fn(pvebackup_co_query, &task);
+
+ return task.result;
}
void qmp_block_stream(bool has_job_id, const char *job_id, const char *device,