318 lines
9.3 KiB
Diff
318 lines
9.3 KiB
Diff
From f36eabd014acd6aaa9e90bba23fa6c1a329d7752 Mon Sep 17 00:00:00 2001
|
|
From: Wolfgang Bumiller <w.bumiller@proxmox.com>
|
|
Date: Wed, 9 Dec 2015 15:40:00 +0100
|
|
Subject: [PATCH 19/49] backup: vma: remove async queue
|
|
|
|
---
|
|
blockdev.c | 6 ++
|
|
vma-writer.c | 179 +++++++++++------------------------------------------------
|
|
2 files changed, 38 insertions(+), 147 deletions(-)
|
|
|
|
diff --git a/blockdev.c b/blockdev.c
|
|
index d3aef2cc83..bad5b2a8b8 100644
|
|
--- a/blockdev.c
|
|
+++ b/blockdev.c
|
|
@@ -3122,6 +3122,11 @@ static void pvebackup_cancel(void *opaque)
|
|
error_setg(&backup_state.error, "backup cancelled");
|
|
}
|
|
|
|
+ if (backup_state.vmaw) {
|
|
+ /* make sure vma writer does not block anymore */
|
|
+ vma_writer_set_error(backup_state.vmaw, "backup cancelled");
|
|
+ }
|
|
+
|
|
/* drain all i/o (awake jobs waiting for aio) */
|
|
bdrv_drain_all();
|
|
|
|
@@ -3134,6 +3139,7 @@ static void pvebackup_cancel(void *opaque)
|
|
if (job) {
|
|
if (!di->completed) {
|
|
block_job_cancel_sync(job);
|
|
+ bdrv_drain_all(); /* drain all i/o (awake jobs waiting for aio) */
|
|
}
|
|
}
|
|
}
|
|
diff --git a/vma-writer.c b/vma-writer.c
|
|
index 689e988423..ec8da5378d 100644
|
|
--- a/vma-writer.c
|
|
+++ b/vma-writer.c
|
|
@@ -28,14 +28,8 @@
|
|
do { if (DEBUG_VMA) { printf("vma: " fmt, ## __VA_ARGS__); } } while (0)
|
|
|
|
#define WRITE_BUFFERS 5
|
|
-
|
|
-typedef struct VmaAIOCB VmaAIOCB;
|
|
-struct VmaAIOCB {
|
|
- unsigned char buffer[VMA_MAX_EXTENT_SIZE];
|
|
- VmaWriter *vmaw;
|
|
- size_t bytes;
|
|
- Coroutine *co;
|
|
-};
|
|
+#define HEADER_CLUSTERS 8
|
|
+#define HEADERBUF_SIZE (VMA_CLUSTER_SIZE*HEADER_CLUSTERS)
|
|
|
|
struct VmaWriter {
|
|
int fd;
|
|
@@ -47,16 +41,14 @@ struct VmaWriter {
|
|
bool closed;
|
|
|
|
/* we always write extents */
|
|
- unsigned char outbuf[VMA_MAX_EXTENT_SIZE];
|
|
+ unsigned char *outbuf;
|
|
int outbuf_pos; /* in bytes */
|
|
int outbuf_count; /* in VMA_BLOCKS */
|
|
uint64_t outbuf_block_info[VMA_BLOCKS_PER_EXTENT];
|
|
|
|
- VmaAIOCB *aiocbs[WRITE_BUFFERS];
|
|
- CoQueue wqueue;
|
|
+ unsigned char *headerbuf;
|
|
|
|
GChecksum *md5csum;
|
|
- CoMutex writer_lock;
|
|
CoMutex flush_lock;
|
|
Coroutine *co_writer;
|
|
|
|
@@ -217,38 +209,39 @@ static void vma_co_continue_write(void *opaque)
|
|
}
|
|
|
|
static ssize_t coroutine_fn
|
|
-vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes)
|
|
+vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
|
|
{
|
|
- size_t done = 0;
|
|
- ssize_t ret;
|
|
+ DPRINTF("vma_queue_write enter %zd\n", bytes);
|
|
|
|
- /* atomic writes (we cannot interleave writes) */
|
|
- qemu_co_mutex_lock(&vmaw->writer_lock);
|
|
+ assert(vmaw);
|
|
+ assert(buf);
|
|
+ assert(bytes <= VMA_MAX_EXTENT_SIZE);
|
|
|
|
- DPRINTF("vma_co_write enter %zd\n", bytes);
|
|
+ size_t done = 0;
|
|
+ ssize_t ret;
|
|
|
|
assert(vmaw->co_writer == NULL);
|
|
|
|
vmaw->co_writer = qemu_coroutine_self();
|
|
|
|
- aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, false, NULL, vma_co_continue_write, vmaw);
|
|
-
|
|
- DPRINTF("vma_co_write wait until writable\n");
|
|
- qemu_coroutine_yield();
|
|
- DPRINTF("vma_co_write starting %zd\n", bytes);
|
|
-
|
|
while (done < bytes) {
|
|
+ aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, false, NULL, vma_co_continue_write, NULL, vmaw);
|
|
+ qemu_coroutine_yield();
|
|
+ aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, false, NULL, NULL, NULL, NULL);
|
|
+ if (vmaw->status < 0) {
|
|
+ DPRINTF("vma_queue_write detected canceled backup\n");
|
|
+ done = -1;
|
|
+ break;
|
|
+ }
|
|
ret = write(vmaw->fd, buf + done, bytes - done);
|
|
if (ret > 0) {
|
|
done += ret;
|
|
- DPRINTF("vma_co_write written %zd %zd\n", done, ret);
|
|
+ DPRINTF("vma_queue_write written %zd %zd\n", done, ret);
|
|
} else if (ret < 0) {
|
|
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
|
- DPRINTF("vma_co_write yield %zd\n", done);
|
|
- qemu_coroutine_yield();
|
|
- DPRINTF("vma_co_write restart %zd\n", done);
|
|
- } else {
|
|
- vma_writer_set_error(vmaw, "vma_co_write write error - %s",
|
|
+ /* try again */
|
|
+ } else {
|
|
+ vma_writer_set_error(vmaw, "vma_queue_write: write error - %s",
|
|
g_strerror(errno));
|
|
done = -1; /* always return failure for partial writes */
|
|
break;
|
|
@@ -258,102 +251,9 @@ vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes)
|
|
}
|
|
}
|
|
|
|
- aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, false, NULL, NULL, NULL);
|
|
-
|
|
vmaw->co_writer = NULL;
|
|
-
|
|
- qemu_co_mutex_unlock(&vmaw->writer_lock);
|
|
-
|
|
- DPRINTF("vma_co_write leave %zd\n", done);
|
|
- return done;
|
|
-}
|
|
-
|
|
-static void coroutine_fn vma_co_writer_task(void *opaque)
|
|
-{
|
|
- VmaAIOCB *cb = opaque;
|
|
-
|
|
- DPRINTF("vma_co_writer_task start\n");
|
|
-
|
|
- int64_t done = vma_co_write(cb->vmaw, cb->buffer, cb->bytes);
|
|
- DPRINTF("vma_co_writer_task write done %zd\n", done);
|
|
-
|
|
- if (done != cb->bytes) {
|
|
- DPRINTF("vma_co_writer_task failed write %zd %zd", cb->bytes, done);
|
|
- vma_writer_set_error(cb->vmaw, "vma_co_writer_task failed write %zd",
|
|
- done);
|
|
- }
|
|
-
|
|
- cb->bytes = 0;
|
|
-
|
|
- qemu_co_queue_next(&cb->vmaw->wqueue);
|
|
-
|
|
- DPRINTF("vma_co_writer_task end\n");
|
|
-}
|
|
-
|
|
-static void coroutine_fn vma_queue_flush(VmaWriter *vmaw)
|
|
-{
|
|
- DPRINTF("vma_queue_flush enter\n");
|
|
-
|
|
- assert(vmaw);
|
|
-
|
|
- while (1) {
|
|
- int i;
|
|
- VmaAIOCB *cb = NULL;
|
|
- for (i = 0; i < WRITE_BUFFERS; i++) {
|
|
- if (vmaw->aiocbs[i]->bytes) {
|
|
- cb = vmaw->aiocbs[i];
|
|
- DPRINTF("FOUND USED AIO BUFFER %d %zd\n", i,
|
|
- vmaw->aiocbs[i]->bytes);
|
|
- break;
|
|
- }
|
|
- }
|
|
- if (!cb) {
|
|
- break;
|
|
- }
|
|
- qemu_co_queue_wait(&vmaw->wqueue);
|
|
- }
|
|
-
|
|
- DPRINTF("vma_queue_flush leave\n");
|
|
-}
|
|
-
|
|
-/**
|
|
- * NOTE: pipe buffer size in only 4096 bytes on linux (see 'ulimit -a')
|
|
- * So we need to create a coroutione to allow 'parallel' execution.
|
|
- */
|
|
-static ssize_t coroutine_fn
|
|
-vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
|
|
-{
|
|
- DPRINTF("vma_queue_write enter %zd\n", bytes);
|
|
-
|
|
- assert(vmaw);
|
|
- assert(buf);
|
|
- assert(bytes <= VMA_MAX_EXTENT_SIZE);
|
|
-
|
|
- VmaAIOCB *cb = NULL;
|
|
- while (!cb) {
|
|
- int i;
|
|
- for (i = 0; i < WRITE_BUFFERS; i++) {
|
|
- if (!vmaw->aiocbs[i]->bytes) {
|
|
- cb = vmaw->aiocbs[i];
|
|
- break;
|
|
- }
|
|
- }
|
|
- if (!cb) {
|
|
- qemu_co_queue_wait(&vmaw->wqueue);
|
|
- }
|
|
- }
|
|
-
|
|
- memcpy(cb->buffer, buf, bytes);
|
|
- cb->bytes = bytes;
|
|
- cb->vmaw = vmaw;
|
|
-
|
|
- DPRINTF("vma_queue_write start %zd\n", bytes);
|
|
- cb->co = qemu_coroutine_create(vma_co_writer_task);
|
|
- qemu_coroutine_enter(cb->co, cb);
|
|
-
|
|
- DPRINTF("vma_queue_write leave\n");
|
|
-
|
|
- return bytes;
|
|
+
|
|
+ return (done == bytes) ? bytes : -1;
|
|
}
|
|
|
|
VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, Error **errp)
|
|
@@ -420,20 +320,16 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, Error **errp)
|
|
}
|
|
|
|
/* we use O_DIRECT, so we need to align IO buffers */
|
|
- int i;
|
|
- for (i = 0; i < WRITE_BUFFERS; i++) {
|
|
- vmaw->aiocbs[i] = qemu_memalign(512, sizeof(VmaAIOCB));
|
|
- memset(vmaw->aiocbs[i], 0, sizeof(VmaAIOCB));
|
|
- }
|
|
+
|
|
+ vmaw->outbuf = qemu_memalign(512, VMA_MAX_EXTENT_SIZE);
|
|
+ vmaw->headerbuf = qemu_memalign(512, HEADERBUF_SIZE);
|
|
|
|
vmaw->outbuf_count = 0;
|
|
vmaw->outbuf_pos = VMA_EXTENT_HEADER_SIZE;
|
|
|
|
vmaw->header_blob_table_pos = 1; /* start at pos 1 */
|
|
|
|
- qemu_co_mutex_init(&vmaw->writer_lock);
|
|
qemu_co_mutex_init(&vmaw->flush_lock);
|
|
- qemu_co_queue_init(&vmaw->wqueue);
|
|
|
|
uuid_copy(vmaw->uuid, uuid);
|
|
|
|
@@ -460,8 +356,7 @@ err:
|
|
static int coroutine_fn vma_write_header(VmaWriter *vmaw)
|
|
{
|
|
assert(vmaw);
|
|
- int header_clusters = 8;
|
|
- char buf[65536*header_clusters];
|
|
+ unsigned char *buf = vmaw->headerbuf;
|
|
VmaHeader *head = (VmaHeader *)buf;
|
|
|
|
int i;
|
|
@@ -472,7 +367,7 @@ static int coroutine_fn vma_write_header(VmaWriter *vmaw)
|
|
return vmaw->status;
|
|
}
|
|
|
|
- memset(buf, 0, sizeof(buf));
|
|
+ memset(buf, 0, HEADERBUF_SIZE);
|
|
|
|
head->magic = VMA_MAGIC;
|
|
head->version = GUINT32_TO_BE(1); /* v1 */
|
|
@@ -507,7 +402,7 @@ static int coroutine_fn vma_write_header(VmaWriter *vmaw)
|
|
uint32_t header_size = sizeof(VmaHeader) + vmaw->header_blob_table_size;
|
|
head->header_size = GUINT32_TO_BE(header_size);
|
|
|
|
- if (header_size > sizeof(buf)) {
|
|
+ if (header_size > HEADERBUF_SIZE) {
|
|
return -1; /* just to be sure */
|
|
}
|
|
|
|
@@ -805,13 +700,7 @@ int vma_writer_close(VmaWriter *vmaw, Error **errp)
|
|
|
|
int i;
|
|
|
|
- vma_queue_flush(vmaw);
|
|
-
|
|
- /* this should not happen - just to be sure */
|
|
- while (!qemu_co_queue_empty(&vmaw->wqueue)) {
|
|
- DPRINTF("vma_writer_close wait\n");
|
|
- co_aio_sleep_ns(qemu_get_aio_context(), QEMU_CLOCK_REALTIME, 1000000);
|
|
- }
|
|
+ assert(vmaw->co_writer == NULL);
|
|
|
|
if (vmaw->cmd) {
|
|
if (pclose(vmaw->cmd) < 0) {
|
|
@@ -869,9 +758,5 @@ void vma_writer_destroy(VmaWriter *vmaw)
|
|
g_checksum_free(vmaw->md5csum);
|
|
}
|
|
|
|
- for (i = 0; i < WRITE_BUFFERS; i++) {
|
|
- free(vmaw->aiocbs[i]);
|
|
- }
|
|
-
|
|
g_free(vmaw);
|
|
}
|
|
--
|
|
2.11.0
|
|
|