From 89c76cee2e39bd1e1a7b560b0a3eb5e83fa2bae8 Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Wed, 9 Dec 2015 15:40:00 +0100 Subject: [PATCH 19/49] backup: vma: remove async queue --- blockdev.c | 6 ++ vma-writer.c | 179 +++++++++++------------------------------------------------ 2 files changed, 38 insertions(+), 147 deletions(-) diff --git a/blockdev.c b/blockdev.c index 6253ef1c5e..ef159b0753 100644 --- a/blockdev.c +++ b/blockdev.c @@ -3122,6 +3122,11 @@ static void pvebackup_cancel(void *opaque) error_setg(&backup_state.error, "backup cancelled"); } + if (backup_state.vmaw) { + /* make sure vma writer does not block anymore */ + vma_writer_set_error(backup_state.vmaw, "backup cancelled"); + } + /* drain all i/o (awake jobs waiting for aio) */ bdrv_drain_all(); @@ -3134,6 +3139,7 @@ static void pvebackup_cancel(void *opaque) if (job) { if (!di->completed) { block_job_cancel_sync(job); + bdrv_drain_all(); /* drain all i/o (awake jobs waiting for aio) */ } } } diff --git a/vma-writer.c b/vma-writer.c index 689e988423..ec8da5378d 100644 --- a/vma-writer.c +++ b/vma-writer.c @@ -28,14 +28,8 @@ do { if (DEBUG_VMA) { printf("vma: " fmt, ## __VA_ARGS__); } } while (0) #define WRITE_BUFFERS 5 - -typedef struct VmaAIOCB VmaAIOCB; -struct VmaAIOCB { - unsigned char buffer[VMA_MAX_EXTENT_SIZE]; - VmaWriter *vmaw; - size_t bytes; - Coroutine *co; -}; +#define HEADER_CLUSTERS 8 +#define HEADERBUF_SIZE (VMA_CLUSTER_SIZE*HEADER_CLUSTERS) struct VmaWriter { int fd; @@ -47,16 +41,14 @@ struct VmaWriter { bool closed; /* we always write extents */ - unsigned char outbuf[VMA_MAX_EXTENT_SIZE]; + unsigned char *outbuf; int outbuf_pos; /* in bytes */ int outbuf_count; /* in VMA_BLOCKS */ uint64_t outbuf_block_info[VMA_BLOCKS_PER_EXTENT]; - VmaAIOCB *aiocbs[WRITE_BUFFERS]; - CoQueue wqueue; + unsigned char *headerbuf; GChecksum *md5csum; - CoMutex writer_lock; CoMutex flush_lock; Coroutine *co_writer; @@ -217,38 +209,39 @@ static void vma_co_continue_write(void *opaque) } static ssize_t coroutine_fn -vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes) +vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes) { - size_t done = 0; - ssize_t ret; + DPRINTF("vma_queue_write enter %zd\n", bytes); - /* atomic writes (we cannot interleave writes) */ - qemu_co_mutex_lock(&vmaw->writer_lock); + assert(vmaw); + assert(buf); + assert(bytes <= VMA_MAX_EXTENT_SIZE); - DPRINTF("vma_co_write enter %zd\n", bytes); + size_t done = 0; + ssize_t ret; assert(vmaw->co_writer == NULL); vmaw->co_writer = qemu_coroutine_self(); - aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, false, NULL, vma_co_continue_write, vmaw); - - DPRINTF("vma_co_write wait until writable\n"); - qemu_coroutine_yield(); - DPRINTF("vma_co_write starting %zd\n", bytes); - while (done < bytes) { + aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, false, NULL, vma_co_continue_write, NULL, vmaw); + qemu_coroutine_yield(); + aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, false, NULL, NULL, NULL, NULL); + if (vmaw->status < 0) { + DPRINTF("vma_queue_write detected canceled backup\n"); + done = -1; + break; + } ret = write(vmaw->fd, buf + done, bytes - done); if (ret > 0) { done += ret; - DPRINTF("vma_co_write written %zd %zd\n", done, ret); + DPRINTF("vma_queue_write written %zd %zd\n", done, ret); } else if (ret < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { - DPRINTF("vma_co_write yield %zd\n", done); - qemu_coroutine_yield(); - DPRINTF("vma_co_write restart %zd\n", done); - } else { - vma_writer_set_error(vmaw, "vma_co_write write error - %s", + /* try again */ + } else { + vma_writer_set_error(vmaw, "vma_queue_write: write error - %s", g_strerror(errno)); done = -1; /* always return failure for partial writes */ break; @@ -258,102 +251,9 @@ vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes) } } - aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, false, NULL, NULL, NULL); - vmaw->co_writer = NULL; - - qemu_co_mutex_unlock(&vmaw->writer_lock); - - DPRINTF("vma_co_write leave %zd\n", done); - return done; -} - -static void coroutine_fn vma_co_writer_task(void *opaque) -{ - VmaAIOCB *cb = opaque; - - DPRINTF("vma_co_writer_task start\n"); - - int64_t done = vma_co_write(cb->vmaw, cb->buffer, cb->bytes); - DPRINTF("vma_co_writer_task write done %zd\n", done); - - if (done != cb->bytes) { - DPRINTF("vma_co_writer_task failed write %zd %zd", cb->bytes, done); - vma_writer_set_error(cb->vmaw, "vma_co_writer_task failed write %zd", - done); - } - - cb->bytes = 0; - - qemu_co_queue_next(&cb->vmaw->wqueue); - - DPRINTF("vma_co_writer_task end\n"); -} - -static void coroutine_fn vma_queue_flush(VmaWriter *vmaw) -{ - DPRINTF("vma_queue_flush enter\n"); - - assert(vmaw); - - while (1) { - int i; - VmaAIOCB *cb = NULL; - for (i = 0; i < WRITE_BUFFERS; i++) { - if (vmaw->aiocbs[i]->bytes) { - cb = vmaw->aiocbs[i]; - DPRINTF("FOUND USED AIO BUFFER %d %zd\n", i, - vmaw->aiocbs[i]->bytes); - break; - } - } - if (!cb) { - break; - } - qemu_co_queue_wait(&vmaw->wqueue); - } - - DPRINTF("vma_queue_flush leave\n"); -} - -/** - * NOTE: pipe buffer size in only 4096 bytes on linux (see 'ulimit -a') - * So we need to create a coroutione to allow 'parallel' execution. - */ -static ssize_t coroutine_fn -vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes) -{ - DPRINTF("vma_queue_write enter %zd\n", bytes); - - assert(vmaw); - assert(buf); - assert(bytes <= VMA_MAX_EXTENT_SIZE); - - VmaAIOCB *cb = NULL; - while (!cb) { - int i; - for (i = 0; i < WRITE_BUFFERS; i++) { - if (!vmaw->aiocbs[i]->bytes) { - cb = vmaw->aiocbs[i]; - break; - } - } - if (!cb) { - qemu_co_queue_wait(&vmaw->wqueue); - } - } - - memcpy(cb->buffer, buf, bytes); - cb->bytes = bytes; - cb->vmaw = vmaw; - - DPRINTF("vma_queue_write start %zd\n", bytes); - cb->co = qemu_coroutine_create(vma_co_writer_task); - qemu_coroutine_enter(cb->co, cb); - - DPRINTF("vma_queue_write leave\n"); - - return bytes; + + return (done == bytes) ? bytes : -1; } VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, Error **errp) @@ -420,20 +320,16 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, Error **errp) } /* we use O_DIRECT, so we need to align IO buffers */ - int i; - for (i = 0; i < WRITE_BUFFERS; i++) { - vmaw->aiocbs[i] = qemu_memalign(512, sizeof(VmaAIOCB)); - memset(vmaw->aiocbs[i], 0, sizeof(VmaAIOCB)); - } + + vmaw->outbuf = qemu_memalign(512, VMA_MAX_EXTENT_SIZE); + vmaw->headerbuf = qemu_memalign(512, HEADERBUF_SIZE); vmaw->outbuf_count = 0; vmaw->outbuf_pos = VMA_EXTENT_HEADER_SIZE; vmaw->header_blob_table_pos = 1; /* start at pos 1 */ - qemu_co_mutex_init(&vmaw->writer_lock); qemu_co_mutex_init(&vmaw->flush_lock); - qemu_co_queue_init(&vmaw->wqueue); uuid_copy(vmaw->uuid, uuid); @@ -460,8 +356,7 @@ err: static int coroutine_fn vma_write_header(VmaWriter *vmaw) { assert(vmaw); - int header_clusters = 8; - char buf[65536*header_clusters]; + unsigned char *buf = vmaw->headerbuf; VmaHeader *head = (VmaHeader *)buf; int i; @@ -472,7 +367,7 @@ static int coroutine_fn vma_write_header(VmaWriter *vmaw) return vmaw->status; } - memset(buf, 0, sizeof(buf)); + memset(buf, 0, HEADERBUF_SIZE); head->magic = VMA_MAGIC; head->version = GUINT32_TO_BE(1); /* v1 */ @@ -507,7 +402,7 @@ static int coroutine_fn vma_write_header(VmaWriter *vmaw) uint32_t header_size = sizeof(VmaHeader) + vmaw->header_blob_table_size; head->header_size = GUINT32_TO_BE(header_size); - if (header_size > sizeof(buf)) { + if (header_size > HEADERBUF_SIZE) { return -1; /* just to be sure */ } @@ -805,13 +700,7 @@ int vma_writer_close(VmaWriter *vmaw, Error **errp) int i; - vma_queue_flush(vmaw); - - /* this should not happen - just to be sure */ - while (!qemu_co_queue_empty(&vmaw->wqueue)) { - DPRINTF("vma_writer_close wait\n"); - co_aio_sleep_ns(qemu_get_aio_context(), QEMU_CLOCK_REALTIME, 1000000); - } + assert(vmaw->co_writer == NULL); if (vmaw->cmd) { if (pclose(vmaw->cmd) < 0) { @@ -869,9 +758,5 @@ void vma_writer_destroy(VmaWriter *vmaw) g_checksum_free(vmaw->md5csum); } - for (i = 0; i < WRITE_BUFFERS; i++) { - free(vmaw->aiocbs[i]); - } - g_free(vmaw); } -- 2.11.0