qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH 44/47] mirror: switch mirror_iteration to AIO


From: Paolo Bonzini
Subject: [Qemu-devel] [PATCH 44/47] mirror: switch mirror_iteration to AIO
Date: Tue, 24 Jul 2012 13:04:22 +0200

There is really no change in the behavior of the job here, since there
is still a maximum of one in-flight I/O operation between the source and
the target.  However, this patch already introduces moves the copy logic
from mirror_iteration to AIO callbacks; it also adds the logic to count
in-flight operations, and only complete the job after they have finished.

Some care is required in the error and cancellation cases, in order
to avoid access to dangling pointers (and consequent corruption).

Signed-off-by: Paolo Bonzini <address@hidden>
---
 block/mirror.c |  161 ++++++++++++++++++++++++++++++++++++++++++--------------
 trace-events   |    2 +
 2 files changed, 123 insertions(+), 40 deletions(-)

diff --git a/block/mirror.c b/block/mirror.c
index 81a600b..971c923 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -17,7 +17,7 @@
 #include "qemu/ratelimit.h"
 #include "bitmap.h"
 
-#define SLICE_TIME 100000000ULL /* ns */
+#define SLICE_TIME    100000000ULL /* ns */
 
 typedef struct MirrorBlockJob {
     BlockJob common;
@@ -33,17 +33,78 @@ typedef struct MirrorBlockJob {
     unsigned long *cow_bitmap;
     HBitmapIter hbi;
     uint8_t *buf;
+
+    int in_flight;
+    int ret;
 } MirrorBlockJob;
 
-static int coroutine_fn mirror_iteration(MirrorBlockJob *s,
-                                         BlockErrorAction *p_action)
+typedef struct MirrorOp {
+    MirrorBlockJob *s;
+    QEMUIOVector qiov;
+    struct iovec iov;
+    int64_t sector_num;
+    int nb_sectors;
+} MirrorOp;
+
+static void mirror_iteration_done(MirrorOp *op)
 {
+    MirrorBlockJob *s = op->s;
+
+    s->in_flight--;
+    trace_mirror_iteration_done(s, op->sector_num, op->nb_sectors);
+    g_slice_free(MirrorOp, op);
+    qemu_coroutine_enter(s->common.co, NULL);
+}
+
+static void mirror_write_complete(void *opaque, int ret)
+{
+    MirrorOp *op = opaque;
+    MirrorBlockJob *s = op->s;
     BlockDriverState *source = s->common.bs;
     BlockDriverState *target = s->target;
-    QEMUIOVector qiov;
-    int ret, nb_sectors, nb_sectors_chunk;
+    BlockErrorAction action;
+
+    if (ret < 0) {
+        bdrv_set_dirty(source, op->sector_num, op->nb_sectors);
+        action = block_job_error_action(&s->common, target, s->on_target_error,
+                                        false, -ret);
+        s->synced = false;
+        if (action == BDRV_ACTION_REPORT && s->ret >= 0) {
+            s->ret = ret;
+        }
+    }
+    mirror_iteration_done(op);
+}
+
+static void mirror_read_complete(void *opaque, int ret)
+{
+    MirrorOp *op = opaque;
+    MirrorBlockJob *s = op->s;
+    BlockDriverState *source = s->common.bs;
+    BlockDriverState *target = s->target;
+    BlockErrorAction action;
+    if (ret < 0) {
+        bdrv_set_dirty(source, op->sector_num, op->nb_sectors);
+        action = block_job_error_action(&s->common, source, s->on_source_error,
+                                        true, -ret);
+        s->synced = false;
+        if (action == BDRV_ACTION_REPORT && s->ret >= 0) {
+            s->ret = ret;
+        }
+
+        mirror_iteration_done(op);
+        return;
+    }
+    bdrv_aio_writev(target, op->sector_num, &op->qiov, op->nb_sectors,
+                    mirror_write_complete, op);
+}
+
+static void coroutine_fn mirror_iteration(MirrorBlockJob *s)
+{
+    BlockDriverState *source = s->common.bs;
+    int nb_sectors, nb_sectors_chunk;
     int64_t end, sector_num, cluster_num;
-    struct iovec iov;
+    MirrorOp *op;
 
     s->sector_num = hbitmap_iter_next(&s->hbi);
     if (s->sector_num < 0) {
@@ -74,34 +135,30 @@ static int coroutine_fn mirror_iteration(MirrorBlockJob *s,
 
     end = s->common.len >> BDRV_SECTOR_BITS;
     nb_sectors = MIN(nb_sectors, end - sector_num);
+
+    /* Allocate a MirrorOp that is used as an AIO callback.  */
+    op = g_slice_new(MirrorOp);
+    op->s = s;
+    op->iov.iov_base = s->buf;
+    op->iov.iov_len  = nb_sectors * 512;
+    op->sector_num = sector_num;
+    op->nb_sectors = nb_sectors;
+    qemu_iovec_init_external(&op->qiov, &op->iov, 1);
+
     bdrv_reset_dirty(source, sector_num, nb_sectors);
 
     /* Copy the dirty cluster.  */
-    iov.iov_base = s->buf;
-    iov.iov_len  = nb_sectors * 512;
-    qemu_iovec_init_external(&qiov, &iov, 1);
-
+    s->in_flight++;
     trace_mirror_one_iteration(s, sector_num, nb_sectors);
-    ret = bdrv_co_readv(source, sector_num, nb_sectors, &qiov);
-    if (ret < 0) {
-        *p_action = block_job_error_action(&s->common, source,
-                                           s->on_source_error, true, -ret);
-        s->synced = false;
-        goto fail;
-    }
-    ret = bdrv_co_writev(target, sector_num, nb_sectors, &qiov);
-    if (ret < 0) {
-        *p_action = block_job_error_action(&s->common, target,
-                                           s->on_target_error, false, -ret);
-        s->synced = false;
-        goto fail;
-    }
-    return 0;
+    bdrv_aio_readv(source, sector_num, &op->qiov, nb_sectors,
+                   mirror_read_complete, op);
+}
 
-fail:
-    /* Try again later.  */
-    bdrv_set_dirty(source, sector_num, nb_sectors);
-    return ret;
+static void mirror_drain(MirrorBlockJob *s)
+{
+    while (s->in_flight > 0) {
+        qemu_coroutine_yield();
+    }
 }
 
 static void coroutine_fn mirror_run(void *opaque)
@@ -109,6 +166,7 @@ static void coroutine_fn mirror_run(void *opaque)
     MirrorBlockJob *s = opaque;
     BlockDriverState *bs = s->common.bs;
     int64_t sector_num, end, nb_sectors_chunk, length;
+    uint64_t last_pause_ns;
     BlockDriverInfo bdi;
     char backing_filename[1024];
     int ret = 0;
@@ -168,22 +226,37 @@ static void coroutine_fn mirror_run(void *opaque)
     }
 
     bdrv_dirty_iter_init(bs, &s->hbi);
+    last_pause_ns = qemu_get_clock_ns(rt_clock);
     for (;;) {
         uint64_t delay_ns;
         int64_t cnt;
         bool should_complete;
 
+        if (s->ret < 0) {
+            ret = s->ret;
+            break;
+        }
+
         cnt = bdrv_get_dirty_count(bs);
-        if (cnt != 0) {
-            BlockErrorAction action = BDRV_ACTION_REPORT;
-            ret = mirror_iteration(s, &action);
-            if (ret < 0 && action == BDRV_ACTION_REPORT) {
-                break;
+
+        /* Note that even when no rate limit is applied we need to yield
+         * periodically with no pending I/O so that qemu_aio_flush() returns.
+         * We do so every SLICE_TIME milliseconds, or when there is an error,
+         * or when the source is clean, whichever comes first.
+         */
+        if (qemu_get_clock_ns(rt_clock) - last_pause_ns < SLICE_TIME &&
+            s->common.iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
+            if (s->in_flight > 0) {
+                trace_mirror_yield(s, s->in_flight, cnt);
+                qemu_coroutine_yield();
+                continue;
+            } else if (cnt != 0) {
+                mirror_iteration(s);
+                continue;
             }
-            cnt = bdrv_get_dirty_count(bs);
         }
 
-        if (cnt != 0) {
+        if (s->in_flight > 0 || cnt != 0) {
             should_complete = false;
         } else {
             trace_mirror_before_flush(s);
@@ -228,15 +301,12 @@ static void coroutine_fn mirror_run(void *opaque)
                 delay_ns = 0;
             }
 
-            /* Note that even when no rate limit is applied we need to yield
-             * with no pending I/O here so that qemu_aio_flush() returns.
-             */
             block_job_sleep_ns(&s->common, rt_clock, delay_ns);
             if (block_job_is_cancelled(&s->common)) {
                 break;
             }
         } else if (!should_complete) {
-            delay_ns = (cnt == 0 ? SLICE_TIME : 0);
+            delay_ns = (s->in_flight == 0 && cnt == 0 ? SLICE_TIME : 0);
             block_job_sleep_ns(&s->common, rt_clock, delay_ns);
         } else if (cnt == 0) {
             /* The two disks are in sync.  Exit and report successful
@@ -246,9 +316,20 @@ static void coroutine_fn mirror_run(void *opaque)
             s->common.cancelled = false;
             break;
         }
+        last_pause_ns = qemu_get_clock_ns(rt_clock);
+    }
+
+    if (s->in_flight > 0) {
+        /* We get here only if something went wrong.  Either the job failed,
+         * or it was cancelled prematurely so that we do not guarantee that
+         * the target is a copy of the source.
+         */
+        assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common)));
+        mirror_drain(s);
     }
 
 immediate_exit:
+    assert(s->in_flight == 0);
     g_free(s->buf);
     g_free(s->cow_bitmap);
     bdrv_set_dirty_tracking(bs, 0);
diff --git a/trace-events b/trace-events
index 6b504d8..fe20bd7 100644
--- a/trace-events
+++ b/trace-events
@@ -83,6 +83,8 @@ mirror_before_drain(void *s, int64_t cnt) "s %p dirty count 
%"PRId64
 mirror_before_sleep(void *s, int64_t cnt, int synced) "s %p dirty count 
%"PRId64" synced %d"
 mirror_one_iteration(void *s, int64_t sector_num, int nb_sectors) "s %p 
sector_num %"PRId64" nb_sectors %d"
 mirror_cow(void *s, int64_t sector_num) "s %p sector_num %"PRId64
+mirror_iteration_done(void *s, int64_t sector_num, int nb_sectors) "s %p 
sector_num %"PRId64" nb_sectors %d"
+mirror_yield(void *s, int64_t cnt, int in_flight) "s %p dirty count %"PRId64" 
in_flight %d"
 
 # blockdev.c
 qmp_block_job_cancel(void *job) "job %p"
-- 
1.7.10.4





reply via email to

[Prev in Thread] Current Thread [Next in Thread]