qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v4] mirror: Rewrite mirror_iteration


From: Max Reitz
Subject: Re: [Qemu-devel] [PATCH v4] mirror: Rewrite mirror_iteration
Date: Mon, 16 Nov 2015 20:32:42 +0100
User-agent: Mozilla/5.0 (X11; Linux x86_64; rv:38.0) Gecko/20100101 Thunderbird/38.3.0

On 12.11.2015 04:44, Fam Zheng wrote:
> The "pnum < nb_sectors" condition in deciding whether to actually copy
> data is unnecessarily strict, and the qiov initialization is
> unnecessarily for bdrv_aio_write_zeroes and bdrv_aio_discard.
> 
> Rewrite mirror_iteration to fix both flaws.
> 
> Signed-off-by: Fam Zheng <address@hidden>
> ---
>  block/mirror.c | 198 
> ++++++++++++++++++++++++++++++++++++++-------------------
>  1 file changed, 134 insertions(+), 64 deletions(-)

Some rather long comments below, but I still like this patch very much.
mirror_iteration() makes much more sense after this rewrite. Thanks!

> diff --git a/block/mirror.c b/block/mirror.c
> index b1252a1..5f22c65 100644
> --- a/block/mirror.c
> +++ b/block/mirror.c
> @@ -45,7 +45,6 @@ typedef struct MirrorBlockJob {
>      BlockdevOnError on_source_error, on_target_error;
>      bool synced;
>      bool should_complete;
> -    int64_t sector_num;
>      int64_t granularity;
>      size_t buf_size;
>      int64_t bdev_length;
> @@ -157,28 +156,16 @@ static void mirror_read_complete(void *opaque, int ret)
>                      mirror_write_complete, op);
>  }
>  
> -static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
> +static int mirror_do_read(MirrorBlockJob *s, int64_t sector_num,
> +                          int max_sectors)

I don't know if I like this parameter's name because we are actually
copying this much, or maybe even more (if sector_num is unaligned, but
more about that below). We never copy less, as far as I can see.

>  {
>      BlockDriverState *source = s->common.bs;
> -    int nb_sectors, sectors_per_chunk, nb_chunks;
> -    int64_t end, sector_num, next_chunk, next_sector, hbitmap_next_sector;
> -    uint64_t delay_ns = 0;
> +    int sectors_per_chunk, nb_chunks;
> +    int64_t next_chunk, next_sector;
> +    int nb_sectors;
>      MirrorOp *op;
> -    int pnum;
> -    int64_t ret;
>  
> -    s->sector_num = hbitmap_iter_next(&s->hbi);
> -    if (s->sector_num < 0) {
> -        bdrv_dirty_iter_init(s->dirty_bitmap, &s->hbi);
> -        s->sector_num = hbitmap_iter_next(&s->hbi);
> -        trace_mirror_restart_iter(s, bdrv_get_dirty_count(s->dirty_bitmap));
> -        assert(s->sector_num >= 0);
> -    }
> -
> -    hbitmap_next_sector = s->sector_num;
> -    sector_num = s->sector_num;
>      sectors_per_chunk = s->granularity >> BDRV_SECTOR_BITS;
> -    end = s->bdev_length / BDRV_SECTOR_SIZE;
>  
>      /* Extend the QEMUIOVector to include all adjacent blocks that will
>       * be copied in this operation.
> @@ -198,23 +185,9 @@ static uint64_t coroutine_fn 
> mirror_iteration(MirrorBlockJob *s)
>      next_sector = sector_num;
>      next_chunk = sector_num / sectors_per_chunk;
>  
> -    /* Wait for I/O to this cluster (from a previous iteration) to be done.  
> */
> -    while (test_bit(next_chunk, s->in_flight_bitmap)) {
> -        trace_mirror_yield_in_flight(s, sector_num, s->in_flight);
> -        s->waiting_for_io = true;
> -        qemu_coroutine_yield();
> -        s->waiting_for_io = false;
> -    }
> -
>      do {
>          int added_sectors, added_chunks;
>  
> -        if (!bdrv_get_dirty(source, s->dirty_bitmap, next_sector) ||
> -            test_bit(next_chunk, s->in_flight_bitmap)) {
> -            assert(nb_sectors > 0);
> -            break;
> -        }
> -
>          added_sectors = sectors_per_chunk;

(5)

>          if (s->cow_bitmap && !test_bit(next_chunk, s->cow_bitmap)) {
>              bdrv_round_to_clusters(s->target,
> @@ -226,12 +199,15 @@ static uint64_t coroutine_fn 
> mirror_iteration(MirrorBlockJob *s)
>               */
>              if (next_sector < sector_num) {

This can happen only if there are less sectors per chunk than there are
sectors per cluster, right? Because this function will only be called
with sector_num rounded to chunks, as far as I can see.

But if that is the case, then (5) sets added_sectors to a value not
aligned to clusters, and (6) may retain that value. Therefore, the block
(7) adjusts all offsets/indices by a value not aligned to clusters.

Therefore, I think we only allow chunk sizes which are aligned to the
cluster size. And this should make this conditional block here unnecessary.

>                  assert(nb_sectors == 0);
> +                /* We have to make sure [sector_num, sector_num + 
> max_sectors)
> +                 * covers the original range. */
> +                max_sectors += sector_num - next_sector;
>                  sector_num = next_sector;
>                  next_chunk = next_sector / sectors_per_chunk;
>              }
>          }
>  
> -        added_sectors = MIN(added_sectors, end - (sector_num + nb_sectors));
> +        added_sectors = MIN(added_sectors, max_sectors);

(6)

>          added_chunks = (added_sectors + sectors_per_chunk - 1) / 
> sectors_per_chunk;
>  
>          /* When doing COW, it may happen that there is not enough space for
> @@ -252,18 +228,13 @@ static uint64_t coroutine_fn 
> mirror_iteration(MirrorBlockJob *s)
>              break;
>          }
>  
> -        /* We have enough free space to copy these sectors.  */
> -        bitmap_set(s->in_flight_bitmap, next_chunk, added_chunks);
> -
>          nb_sectors += added_sectors;
>          nb_chunks += added_chunks;
>          next_sector += added_sectors;
>          next_chunk += added_chunks;

(7)

> -        if (!s->synced && s->common.speed) {
> -            delay_ns = ratelimit_calculate_delay(&s->limit, added_sectors);
> -        }
> -    } while (delay_ns == 0 && next_sector < end);
> +    } while (next_sector < sector_num + max_sectors);

All in all, what is this loop used for now anyway? Can't we just pull
the COW waiting loops out and drop it?

(i.e. just wait until we have enough free space to copy all max_sectors.)

>  
> +    assert(nb_sectors);
>      /* Allocate a MirrorOp that is used as an AIO callback.  */
>      op = g_new(MirrorOp, 1);
>      op->s = s;
> @@ -274,7 +245,6 @@ static uint64_t coroutine_fn 
> mirror_iteration(MirrorBlockJob *s)
>       * from s->buf_free.
>       */
>      qemu_iovec_init(&op->qiov, nb_chunks);
> -    next_sector = sector_num;
>      while (nb_chunks-- > 0) {
>          MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free);
>          size_t remaining = (nb_sectors * BDRV_SECTOR_SIZE) - op->qiov.size;
> @@ -282,39 +252,139 @@ static uint64_t coroutine_fn 
> mirror_iteration(MirrorBlockJob *s)
>          QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next);
>          s->buf_free_count--;
>          qemu_iovec_add(&op->qiov, buf, MIN(s->granularity, remaining));
> -
> -        /* Advance the HBitmapIter in parallel, so that we do not examine
> -         * the same sector twice.
> -         */
> -        if (next_sector > hbitmap_next_sector
> -            && bdrv_get_dirty(source, s->dirty_bitmap, next_sector)) {
> -            hbitmap_next_sector = hbitmap_iter_next(&s->hbi);
> -        }
> -
> -        next_sector += sectors_per_chunk;
>      }
>  
> -    bdrv_reset_dirty_bitmap(s->dirty_bitmap, sector_num, nb_sectors);
> -
>      /* Copy the dirty cluster.  */
>      s->in_flight++;
>      s->sectors_in_flight += nb_sectors;
>      trace_mirror_one_iteration(s, sector_num, nb_sectors);
>  
> -    ret = bdrv_get_block_status_above(source, NULL, sector_num,
> -                                      nb_sectors, &pnum);
> -    if (ret < 0 || pnum < nb_sectors ||
> -            (ret & BDRV_BLOCK_DATA && !(ret & BDRV_BLOCK_ZERO))) {
> -        bdrv_aio_readv(source, sector_num, &op->qiov, nb_sectors,
> -                       mirror_read_complete, op);
> -    } else if (ret & BDRV_BLOCK_ZERO) {
> +    bdrv_aio_readv(source, sector_num, &op->qiov, nb_sectors,
> +                   mirror_read_complete, op);
> +    return nb_sectors;
> +}
> +
> +static void mirror_do_zero_or_discard(MirrorBlockJob *s,
> +                                      int64_t sector_num,
> +                                      int nb_sectors,
> +                                      bool is_discard)
> +{
> +    MirrorOp *op;
> +
> +    /* Allocate a MirrorOp that is used as an AIO callback. The qiov is 
> zeroed
> +     * so the freeing in mirror_iteration_done is nop. */
> +    op = g_new0(MirrorOp, 1);
> +    op->s = s;
> +    op->sector_num = sector_num;
> +    op->nb_sectors = nb_sectors;
> +
> +    s->in_flight++;
> +    s->sectors_in_flight += nb_sectors;
> +    if (is_discard) {
> +        bdrv_aio_discard(s->target, sector_num, op->nb_sectors,
> +                         mirror_write_complete, op);
> +    } else {
>          bdrv_aio_write_zeroes(s->target, sector_num, op->nb_sectors,
>                                s->unmap ? BDRV_REQ_MAY_UNMAP : 0,
>                                mirror_write_complete, op);
> -    } else {
> -        assert(!(ret & BDRV_BLOCK_DATA));
> -        bdrv_aio_discard(s->target, sector_num, op->nb_sectors,
> -                         mirror_write_complete, op);
> +    }
> +}
> +
> +static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
> +{
> +    BlockDriverState *source = s->common.bs;
> +    int64_t sector_num;
> +    uint64_t delay_ns = 0;
> +    int nb_chunks = 0;
> +    int64_t end = s->bdev_length / BDRV_SECTOR_SIZE;
> +    int sectors_per_chunk = s->granularity >> BDRV_SECTOR_BITS;
> +
> +    sector_num = hbitmap_iter_next(&s->hbi);

(1)

> +    if (sector_num < 0) {
> +        bdrv_dirty_iter_init(s->dirty_bitmap, &s->hbi);
> +        sector_num = hbitmap_iter_next(&s->hbi);
> +        trace_mirror_restart_iter(s, bdrv_get_dirty_count(s->dirty_bitmap));
> +        assert(sector_num >= 0);
> +    }
> +
> +
> +    while (nb_chunks * sectors_per_chunk < (s->buf_size >> 
> BDRV_SECTOR_BITS)) {
> +        int64_t next_sector = sector_num + nb_chunks * sectors_per_chunk;

I'm trying to wrap my head around what this loop does. I think I'd like
a comment with an explanation above it.

nb_chunks counts dirty chunks (4). However, the next_sector calculated
here is not necessarily related to the value returned by
hbitmap_iter_next(), so I don't know what this value is supposed to be.

Imagine the following dirty bitmap:

#---#--#   (# is dirty, - is clean)

Now, we call hbitmap_iter_next(&s->hbi) above (1). We are now here:

#---#--#
^

Therefore, sector_num is 0 and nb_chunks is 0, too (let's assume one
chunk is one sector in size).

Then, we call the hbitmap_iter_next(&s->hbi) below (3):

#---#--#
    ^

Now, nb_chunks is 1, and next_sector is consequently 1, too. That
sector/cluster is not dirty, so (2) will be false and we will quit this
loop.

So I guess this loop is trying to find consecutive dirty chunks?

But because of (3), won't we skip the first non-consecutive chunk in the
next iteration? So in the example above, after we've done (1), won't our
state be:

----#--#
       ^

(Assuming we have cleaned the first dirty chunk)

So we would have skipped the middle chunk. We will reset the iterator
later (if there are still dirty chunks), but it still isn't so nice.
Especially if we have something like ###-####.

Maybe initializing nb_chunks to 1 would help. We do know that the first
chunk is dirty, after all, so we don't have to check it.

> +        int64_t next_chunk = next_sector / sectors_per_chunk;
> +        if (!bdrv_get_dirty(source, s->dirty_bitmap, next_sector)) {

(2)

Also: bdrv_get_dirty()/hbitmap_get() doesn't seem to do a bounds check.
I think we should make sure that next_sector is not beyond the end of
the BDS.

> +            break;
> +        }
> +        /* Wait for I/O to this cluster (from a previous iteration) to be
> +         * done.*/
> +        while (test_bit(next_chunk, s->in_flight_bitmap)) {
> +            trace_mirror_yield_in_flight(s, next_sector, s->in_flight);
> +            s->waiting_for_io = true;
> +            qemu_coroutine_yield();
> +            s->waiting_for_io = false;
> +        }
> +
> +        /* Advance the HBitmapIter in parallel, so that we do not examine
> +         * the same sector twice.
> +         */
> +        hbitmap_iter_next(&s->hbi);

(3): Here, s->hbi is brought to the next dirty cluster.

> +        nb_chunks++;

(4): nb_chunks now "counts" that dirty cluster.

And, by the way, hbitmap_iter_next() may return -1 in which case there
is no dirty chunk there, so nb_chunks should not be incremented.

But that would probably be fixed by initializing nb_chunks to 1 and
adding the range check at (2).

> +    }
> +    assert(nb_chunks);
> +
> +    /* Clear dirty bits before querying the block status, because
> +     * calling bdrv_reset_dirty_bitmap could yield - if some blocks are 
> marked
> +     * dirty in this window, we need to know.
> +     */
> +    bdrv_reset_dirty_bitmap(s->dirty_bitmap, sector_num,
> +                            nb_chunks * sectors_per_chunk);
> +    bitmap_set(s->in_flight_bitmap, sector_num / sectors_per_chunk, 
> nb_chunks);
> +    while (nb_chunks > 0 && sector_num < end) {
> +        int io_sectors;
> +        enum MirrorMethod {
> +            MIRROR_METHOD_COPY,
> +            MIRROR_METHOD_ZERO,
> +            MIRROR_METHOD_DISCARD
> +        } mirror_method = MIRROR_METHOD_COPY;
> +        int ret = bdrv_get_block_status_above(source, NULL, sector_num,
> +                                              nb_chunks * sectors_per_chunk,
> +                                              &io_sectors);
> +        if (ret < 0) {
> +            io_sectors = nb_chunks * sectors_per_chunk;
> +        }
> +
> +        io_sectors -= io_sectors % sectors_per_chunk;
> +        if (io_sectors < sectors_per_chunk) {
> +            io_sectors = sectors_per_chunk;
> +        } else if (!(ret & BDRV_BLOCK_DATA)) {

If ret < 0, this may still evaluate to true. But we may only take this
path if ret >= 0.

Max

> +            int64_t target_sector_num;
> +            int target_nb_sectors;
> +            bdrv_round_to_clusters(s->target, sector_num, io_sectors,
> +                                   &target_sector_num, &target_nb_sectors);
> +            if (target_sector_num == sector_num &&
> +                target_nb_sectors == io_sectors) {
> +                mirror_method = ret & BDRV_BLOCK_ZERO ?
> +                                    MIRROR_METHOD_ZERO :
> +                                    MIRROR_METHOD_DISCARD;
> +            }
> +        }
> +
> +        switch (mirror_method) {
> +        case MIRROR_METHOD_COPY:
> +            io_sectors = mirror_do_read(s, sector_num, io_sectors);
> +            break;
> +        case MIRROR_METHOD_ZERO:
> +            mirror_do_zero_or_discard(s, sector_num, io_sectors, false);
> +            break;
> +        case MIRROR_METHOD_DISCARD:
> +            mirror_do_zero_or_discard(s, sector_num, io_sectors, true);
> +            break;
> +        default:
> +            abort();
> +        }
> +        assert(io_sectors);
> +        sector_num += io_sectors;
> +        nb_chunks -= io_sectors / sectors_per_chunk;
> +        delay_ns += ratelimit_calculate_delay(&s->limit, io_sectors);
>      }
>      return delay_ns;
>  }
> 


Attachment: signature.asc
Description: OpenPGP digital signature


reply via email to

[Prev in Thread] Current Thread [Next in Thread]