qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v14 7/8] Implement new driver for block replicat


From: Wen Congyang
Subject: Re: [Qemu-devel] [PATCH v14 7/8] Implement new driver for block replication
Date: Thu, 28 Jan 2016 09:13:24 +0800
User-agent: Mozilla/5.0 (X11; Linux x86_64; rv:38.0) Gecko/20100101 Thunderbird/38.5.0

On 01/27/2016 10:46 PM, Stefan Hajnoczi wrote:
> On Wed, Jan 13, 2016 at 05:18:31PM +0800, Changlong Xie wrote:
>> From: Wen Congyang <address@hidden>
>>
>> Signed-off-by: Wen Congyang <address@hidden>
>> Signed-off-by: zhanghailiang <address@hidden>
>> Signed-off-by: Gonglei <address@hidden>
>> Signed-off-by: Changlong Xie <address@hidden>
>> ---
>>  block/Makefile.objs              |   1 +
>>  block/replication-comm.c         |  66 +++++
>>  block/replication.c              | 590 
>> +++++++++++++++++++++++++++++++++++++++
>>  include/block/replication-comm.h |  50 ++++
>>  qapi/block-core.json             |  13 +
>>  5 files changed, 720 insertions(+)
>>  create mode 100644 block/replication-comm.c
>>  create mode 100644 block/replication.c
>>  create mode 100644 include/block/replication-comm.h
>>
>> diff --git a/block/Makefile.objs b/block/Makefile.objs
>> index fa05f37..7037662 100644
>> --- a/block/Makefile.objs
>> +++ b/block/Makefile.objs
>> @@ -23,6 +23,7 @@ block-obj-$(CONFIG_LIBSSH2) += ssh.o
>>  block-obj-y += accounting.o
>>  block-obj-y += write-threshold.o
>>  block-obj-y += backup.o
>> +block-obj-y += replication-comm.o replication.o
>>  
>>  common-obj-y += stream.o
>>  common-obj-y += commit.o
>> diff --git a/block/replication-comm.c b/block/replication-comm.c
>> new file mode 100644
>> index 0000000..8af748b
>> --- /dev/null
>> +++ b/block/replication-comm.c
>> @@ -0,0 +1,66 @@
>> +/*
>> + * Replication Block filter
> 
> Is the start/stop/checkpoint callback interface only useful for block
> replication?
> 
> This seems like a generic interface for registering with COLO.  Other
> components (networking, etc) might also need start/stop/checkpoint
> callbacks.  If that's the case then this code should be outside block/
> and the brs->bs field should either be void *opaque or removed (the
> caller needs to use container_of()).

Yes, we will do it in the next version.

> 
>> + *
>> + * Copyright (c) 2015 HUAWEI TECHNOLOGIES CO., LTD.
>> + * Copyright (c) 2015 Intel Corporation
>> + * Copyright (c) 2015 FUJITSU LIMITED
>> + *
>> + * Author:
>> + *   Wen Congyang <address@hidden>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
>> + * See the COPYING file in the top-level directory.
>> + */
>> +
>> +#include "block/replication-comm.h"
>> +
>> +static QLIST_HEAD(, BlockReplicationState) block_replication_states;
>> +
>> +BlockReplicationState *block_replication_new(BlockDriverState *bs,
>> +                                             BlockReplicationOps *ops)
>> +{
>> +    BlockReplicationState *brs;
>> +
>> +    brs = g_new0(BlockReplicationState, 1);
>> +    brs->bs = bs;
>> +    brs->ops = ops;
>> +    QLIST_INSERT_HEAD(&block_replication_states, brs, node);
>> +
>> +    return brs;
>> +}
>> +
>> +void block_replication_remove(BlockReplicationState *brs)
>> +{
>> +    QLIST_REMOVE(brs, node);
>> +    g_free(brs);
>> +}
>> +
>> +void block_replication_start_all(ReplicationMode mode, Error **errp)
>> +{
>> +    BlockReplicationState *brs, *next;
>> +    QLIST_FOREACH_SAFE(brs, &block_replication_states, node, next) {
>> +        if (brs->ops && brs->ops->start) {
>> +            brs->ops->start(brs, mode, errp);
>> +        }
>> +    }
>> +}
>> +
>> +void block_replication_do_checkpoint_all(Error **errp)
>> +{
>> +    BlockReplicationState *brs, *next;
>> +    QLIST_FOREACH_SAFE(brs, &block_replication_states, node, next) {
>> +        if (brs->ops && brs->ops->checkpoint) {
>> +            brs->ops->checkpoint(brs, errp);
>> +        }
>> +    }
>> +}
>> +
>> +void block_replication_stop_all(bool failover, Error **errp)
>> +{
>> +    BlockReplicationState *brs, *next;
>> +    QLIST_FOREACH_SAFE(brs, &block_replication_states, node, next) {
>> +        if (brs->ops && brs->ops->stop) {
>> +            brs->ops->stop(brs, failover, errp);
>> +        }
>> +    }
>> +}
>> diff --git a/block/replication.c b/block/replication.c
>> new file mode 100644
>> index 0000000..29c677a
>> --- /dev/null
>> +++ b/block/replication.c
>> @@ -0,0 +1,590 @@
>> +/*
>> + * Replication Block filter
>> + *
>> + * Copyright (c) 2015 HUAWEI TECHNOLOGIES CO., LTD.
>> + * Copyright (c) 2015 Intel Corporation
>> + * Copyright (c) 2015 FUJITSU LIMITED
>> + *
>> + * Author:
>> + *   Wen Congyang <address@hidden>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
>> + * See the COPYING file in the top-level directory.
>> + */
>> +
>> +#include "qemu-common.h"
>> +#include "block/blockjob.h"
>> +#include "block/nbd.h"
>> +#include "block/replication-comm.h"
>> +
>> +typedef struct BDRVReplicationState {
>> +    ReplicationMode mode;
>> +    int replication_state;
>> +    BlockDriverState *active_disk;
>> +    BlockDriverState *hidden_disk;
>> +    BlockDriverState *secondary_disk;
>> +    BlockDriverState *top_bs;
>> +    BlockReplicationState *brs;
>> +    Error *blocker;
>> +    int orig_hidden_flags;
>> +    int orig_secondary_flags;
>> +    int error;
>> +} BDRVReplicationState;
>> +
>> +enum {
>> +    BLOCK_REPLICATION_NONE,             /* block replication is not started 
>> */
>> +    BLOCK_REPLICATION_RUNNING,          /* block replication is running */
>> +    BLOCK_REPLICATION_FAILOVER,         /* failover is running in 
>> background */
>> +    BLOCK_REPLICATION_FAILOVER_FAILED,  /* failover failed*/
>> +    BLOCK_REPLICATION_DONE,             /* block replication is done(after 
>> failover) */
>> +};
>> +
>> +static void replication_start(BlockReplicationState *brs, ReplicationMode 
>> mode,
>> +                              Error **errp);
>> +static void replication_do_checkpoint(BlockReplicationState *brs, Error 
>> **errp);
>> +static void replication_stop(BlockReplicationState *brs, bool failover,
>> +                             Error **errp);
>> +
>> +#define REPLICATION_MODE        "mode"
>> +static QemuOptsList replication_runtime_opts = {
>> +    .name = "replication",
>> +    .head = QTAILQ_HEAD_INITIALIZER(replication_runtime_opts.head),
>> +    .desc = {
>> +        {
>> +            .name = REPLICATION_MODE,
>> +            .type = QEMU_OPT_STRING,
>> +        },
>> +        { /* end of list */ }
>> +    },
>> +};
>> +
>> +static BlockReplicationOps replication_ops = {
>> +    .start = replication_start,
>> +    .checkpoint = replication_do_checkpoint,
>> +    .stop = replication_stop,
>> +};
>> +
>> +static int replication_open(BlockDriverState *bs, QDict *options,
>> +                            int flags, Error **errp)
>> +{
>> +    int ret;
>> +    BDRVReplicationState *s = bs->opaque;
>> +    Error *local_err = NULL;
>> +    QemuOpts *opts = NULL;
>> +    const char *mode;
>> +
>> +    ret = -EINVAL;
>> +    opts = qemu_opts_create(&replication_runtime_opts, NULL, 0, 
>> &error_abort);
>> +    qemu_opts_absorb_qdict(opts, options, &local_err);
>> +    if (local_err) {
>> +        goto fail;
>> +    }
>> +
>> +    mode = qemu_opt_get(opts, REPLICATION_MODE);
>> +    if (!mode) {
>> +        error_setg(&local_err, "Missing the option mode");
>> +        goto fail;
>> +    }
>> +
>> +    if (!strcmp(mode, "primary")) {
>> +        s->mode = REPLICATION_MODE_PRIMARY;
>> +    } else if (!strcmp(mode, "secondary")) {
>> +        s->mode = REPLICATION_MODE_SECONDARY;
>> +    } else {
>> +        error_setg(&local_err,
>> +                   "The option mode's value should be primary or 
>> secondary");
>> +        goto fail;
>> +    }
>> +
>> +    s->brs = block_replication_new(bs, &replication_ops);
>> +
>> +    ret = 0;
>> +
>> +fail:
>> +    qemu_opts_del(opts);
>> +    /* propagate error */
>> +    if (local_err) {
>> +        error_propagate(errp, local_err);
>> +    }
>> +    return ret;
>> +}
>> +
>> +static void replication_close(BlockDriverState *bs)
>> +{
>> +    BDRVReplicationState *s = bs->opaque;
>> +
>> +    if (s->replication_state == BLOCK_REPLICATION_RUNNING) {
>> +        replication_stop(s->brs, false, NULL);
>> +        block_replication_remove(s->brs);
>> +    }
>> +}
>> +
>> +static int64_t replication_getlength(BlockDriverState *bs)
>> +{
>> +    return bdrv_getlength(bs->file->bs);
>> +}
>> +
>> +static int replication_get_io_status(BDRVReplicationState *s)
>> +{
>> +    switch (s->replication_state) {
>> +    case BLOCK_REPLICATION_NONE:
>> +        return -EIO;
>> +    case BLOCK_REPLICATION_RUNNING:
>> +        return 0;
>> +    case BLOCK_REPLICATION_FAILOVER:
>> +        return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
>> +    case BLOCK_REPLICATION_FAILOVER_FAILED:
>> +        return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 1;
>> +    case BLOCK_REPLICATION_DONE:
>> +        /*
>> +         * active commit job completes, and active disk and secondary_disk
>> +         * is swapped, so we can operate bs->file directly
>> +         */
>> +        return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
>> +    default:
>> +        abort();
>> +    }
>> +}
>> +
>> +static int replication_return_value(BDRVReplicationState *s, int ret)
>> +{
>> +    if (s->mode == REPLICATION_MODE_SECONDARY) {
>> +        return ret;
>> +    }
>> +
>> +    if (ret < 0) {
>> +        s->error = ret;
>> +        ret = 0;
>> +    }
>> +
>> +    return ret;
>> +}
>> +
>> +static coroutine_fn int replication_co_readv(BlockDriverState *bs,
>> +                                             int64_t sector_num,
>> +                                             int remaining_sectors,
>> +                                             QEMUIOVector *qiov)
>> +{
>> +    BDRVReplicationState *s = bs->opaque;
>> +    int ret;
>> +
>> +    if (s->mode == REPLICATION_MODE_PRIMARY) {
>> +        /* We only use it to forward primary write requests */
>> +        return -EIO;
>> +    }
>> +
>> +    ret = replication_get_io_status(s);
>> +    if (ret < 0) {
>> +        return ret;
>> +    }
>> +
>> +    ret = bdrv_co_readv(bs->file->bs, sector_num, remaining_sectors, qiov);
>> +    return replication_return_value(s, ret);
>> +}
>> +
>> +static coroutine_fn int replication_co_writev(BlockDriverState *bs,
>> +                                              int64_t sector_num,
>> +                                              int remaining_sectors,
>> +                                              QEMUIOVector *qiov)
>> +{
>> +    BDRVReplicationState *s = bs->opaque;
>> +    QEMUIOVector hd_qiov;
>> +    uint64_t bytes_done = 0;
>> +    BlockDriverState *top = bs->file->bs;
>> +    BlockDriverState *base = s->secondary_disk;
>> +    BlockDriverState *target;
>> +    int ret, n;
>> +
>> +    ret = replication_get_io_status(s);
>> +    if (ret < 0) {
>> +        return ret;
>> +    }
>> +
>> +    if (ret == 0) {
>> +        ret = bdrv_co_writev(top, sector_num,
>> +                             remaining_sectors, qiov);
>> +        return replication_return_value(s, ret);
>> +    }
>> +
>> +    /*
>> +     * Failover failed, only write to active disk if the sectors
>> +     * have already been allocated in active disk/hidden disk.
>> +     */
>> +    qemu_iovec_init(&hd_qiov, qiov->niov);
>> +    while (remaining_sectors > 0) {
>> +        ret = bdrv_is_allocated_above(top, base, sector_num,
>> +                                      remaining_sectors, &n);
>> +        if (ret < 0) {
>> +            return ret;
>> +        }
>> +
>> +        qemu_iovec_reset(&hd_qiov);
>> +        qemu_iovec_concat(&hd_qiov, qiov, bytes_done, n * BDRV_SECTOR_SIZE);
>> +
>> +        target = ret ? top : base;
>> +        ret = bdrv_co_writev(target, sector_num, n, &hd_qiov);
>> +        if (ret < 0) {
>> +            return ret;
>> +        }
>> +
>> +        remaining_sectors -= n;
>> +        sector_num += n;
>> +        bytes_done += n * BDRV_SECTOR_SIZE;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +static coroutine_fn int replication_co_discard(BlockDriverState *bs,
>> +                                               int64_t sector_num,
>> +                                               int nb_sectors)
>> +{
>> +    BDRVReplicationState *s = bs->opaque;
>> +    int ret;
>> +
>> +    ret = replication_get_io_status(s);
>> +    if (ret < 0) {
>> +        return ret;
>> +    }
>> +
>> +    if (ret == 1) {
>> +        /* It is secondary qemu and failover are running*/
>> +        ret = bdrv_co_discard(s->secondary_disk, sector_num, nb_sectors);
>> +        if (ret) {
>> +            return ret;
>> +        }
>> +    }
>> +
>> +    ret = bdrv_co_discard(bs->file->bs, sector_num, nb_sectors);
>> +    return replication_return_value(s, ret);
>> +}
>> +
>> +static bool replication_recurse_is_first_non_filter(BlockDriverState *bs,
>> +                                                    BlockDriverState 
>> *candidate)
>> +{
>> +    return bdrv_recurse_is_first_non_filter(bs->file->bs, candidate);
>> +}
>> +
>> +static void secondary_do_checkpoint(BDRVReplicationState *s, Error **errp)
>> +{
>> +    Error *local_err = NULL;
>> +    int ret;
>> +
>> +    if (!s->secondary_disk->job) {
>> +        error_setg(errp, "Backup job is cancelled unexpectedly");
>> +        return;
>> +    }
>> +
>> +    block_job_do_checkpoint(s->secondary_disk->job, &local_err);
>> +    if (local_err) {
>> +        error_propagate(errp, local_err);
>> +        return;
>> +    }
>> +
>> +    ret = s->active_disk->drv->bdrv_make_empty(s->active_disk);
> 
> What happens to in-flight requests to the active and hidden disks?

we MUST call do_checkpoint() when the vm is stopped.

> 
>> +    if (ret < 0) {
>> +        error_setg(errp, "Cannot make active disk empty");
>> +        return;
>> +    }
>> +
>> +    ret = s->hidden_disk->drv->bdrv_make_empty(s->hidden_disk);
>> +    if (ret < 0) {
>> +        error_setg(errp, "Cannot make hidden disk empty");
>> +        return;
>> +    }
>> +}
>> +
>> +static void reopen_backing_file(BDRVReplicationState *s, bool writable,
>> +                                Error **errp)
>> +{
>> +    BlockReopenQueue *reopen_queue = NULL;
>> +    int orig_hidden_flags, orig_secondary_flags;
>> +    int new_hidden_flags, new_secondary_flags;
>> +    Error *local_err = NULL;
>> +
>> +    if (writable) {
>> +        orig_hidden_flags = bdrv_get_flags(s->hidden_disk);
>> +        new_hidden_flags = orig_hidden_flags | BDRV_O_RDWR;
>> +        orig_secondary_flags = bdrv_get_flags(s->secondary_disk);
>> +        new_secondary_flags = orig_secondary_flags | BDRV_O_RDWR;
>> +    } else {
>> +        orig_hidden_flags = s->orig_hidden_flags | BDRV_O_RDWR;
>> +        new_hidden_flags = s->orig_hidden_flags;
>> +        orig_secondary_flags = s->orig_secondary_flags | BDRV_O_RDWR;
>> +        new_secondary_flags = s->orig_secondary_flags;
>> +    }
>> +
>> +    if (orig_hidden_flags != new_hidden_flags) {
>> +        reopen_queue = bdrv_reopen_queue(reopen_queue, s->hidden_disk, NULL,
>> +                                         new_hidden_flags);
>> +    }
>> +
>> +    if (!(orig_secondary_flags & BDRV_O_RDWR)) {
>> +        reopen_queue = bdrv_reopen_queue(reopen_queue, s->secondary_disk, 
>> NULL,
>> +                                         new_secondary_flags);
>> +    }
>> +
>> +    if (reopen_queue) {
>> +        bdrv_reopen_multiple(reopen_queue, &local_err);
>> +        if (local_err != NULL) {
>> +            error_propagate(errp, local_err);
>> +        }
>> +    }
>> +}
>> +
>> +static void backup_job_cleanup(BDRVReplicationState *s)
>> +{
>> +    bdrv_op_unblock_all(s->top_bs, s->blocker);
>> +    error_free(s->blocker);
>> +    reopen_backing_file(s, false, NULL);
>> +}
>> +
>> +static void backup_job_completed(void *opaque, int ret)
>> +{
>> +    BDRVReplicationState *s = opaque;
>> +
>> +    if (s->replication_state != BLOCK_REPLICATION_FAILOVER) {
>> +        /* The backup job is cancelled unexpectedly */
>> +        s->error = -EIO;
>> +    }
>> +
>> +    backup_job_cleanup(s);
>> +}
>> +
>> +static BlockDriverState *get_top_bs(BlockDriverState *bs)
>> +{
>> +    BdrvChild *child;
>> +
>> +    while (!bs->blk) {
>> +        if (QLIST_EMPTY(&bs->parents)) {
>> +            return NULL;
>> +        }
>> +
>> +        child = QLIST_FIRST(&bs->parents);
>> +        if (QLIST_NEXT(child, next_parent)) {
>> +            return NULL;
>> +        }
>> +
>> +        bs = child->parent;
>> +    }
>> +
>> +    return bs;
>> +}
>> +
>> +static void replication_start(BlockReplicationState *brs, ReplicationMode 
>> mode,
>> +                              Error **errp)
>> +{
>> +    BlockDriverState *bs = brs->bs;
>> +    BDRVReplicationState *s = brs->bs->opaque;
>> +    int64_t active_length, hidden_length, disk_length;
>> +    AioContext *aio_context;
>> +    Error *local_err = NULL;
>> +
>> +    if (s->replication_state != BLOCK_REPLICATION_NONE) {
>> +        error_setg(errp, "Block replication is running or done");
>> +        return;
>> +    }
>> +
>> +    if (s->mode != mode) {
>> +        error_setg(errp, "The parameter mode's value is invalid, needs %d,"
>> +                   " but receives %d", s->mode, mode);
>> +        return;
>> +    }
>> +
>> +    switch (s->mode) {
>> +    case REPLICATION_MODE_PRIMARY:
>> +        break;
>> +    case REPLICATION_MODE_SECONDARY:
>> +        s->active_disk = bs->file->bs;
>> +        if (!bs->file->bs->backing) {
>> +            error_setg(errp, "Active disk doesn't have backing file");
>> +            return;
>> +        }
>> +
>> +        s->hidden_disk = s->active_disk->backing->bs;
>> +        if (!s->hidden_disk->backing) {
>> +            error_setg(errp, "Hidden disk doesn't have backing file");
>> +            return;
>> +        }
>> +
>> +        s->secondary_disk = s->hidden_disk->backing->bs;
>> +        if (!s->secondary_disk->blk) {
>> +            error_setg(errp, "The secondary disk doesn't have block 
>> backend");
>> +            return;
>> +        }
> 
> Kevin: Is code allowed to stash away BlockDriverState pointers for
> convenience or should it keep the BdrvChild pointers instead?  In order
> for replication to work as expected, the graph shouldn't change but for
> consistency maybe BdrvChild is best.
> 
>> +
>> +        s->top_bs = get_top_bs(bs);
>> +        if (!s->top_bs) {
>> +            error_setg(errp, "Cannot get the top block driver state to do"
>> +                       " internal backup");
>> +            return;
>> +        }
>> +
>> +        /* verify the length */
>> +        active_length = bdrv_getlength(s->active_disk);
>> +        hidden_length = bdrv_getlength(s->hidden_disk);
>> +        disk_length = bdrv_getlength(s->secondary_disk);
>> +        if (active_length < 0 || hidden_length < 0 || disk_length < 0 ||
>> +            active_length != hidden_length || hidden_length != disk_length) 
>> {
>> +            error_setg(errp, "active disk, hidden disk, secondary disk's 
>> length"
>> +                       " are not the same");
>> +            return;
>> +        }
>> +
>> +        if (!s->active_disk->drv->bdrv_make_empty ||
>> +            !s->hidden_disk->drv->bdrv_make_empty) {
>> +            error_setg(errp,
>> +                       "active disk or hidden disk doesn't support 
>> make_empty");
>> +            return;
>> +        }
>> +
>> +        /* reopen the backing file in r/w mode */
>> +        reopen_backing_file(s, true, &local_err);
>> +        if (local_err) {
>> +            error_propagate(errp, local_err);
>> +            return;
>> +        }
>> +
>> +        /* start backup job now */
>> +        error_setg(&s->blocker,
>> +                   "block device is in use by internal backup job");
>> +        bdrv_op_block_all(s->top_bs, s->blocker);
>> +        bdrv_op_unblock(s->top_bs, BLOCK_OP_TYPE_DATAPLANE, s->blocker);
>> +        bdrv_ref(s->hidden_disk);
> 
> Why is the explicit reference to hidden_disk (but not secondary_disk or
> active_disk) is necessary?

IIRC, we should reference the backup target before calling backup_start(),
and we will reference the backup source in backup_start().

> 
>> +
>> +        aio_context = bdrv_get_aio_context(bs);
>> +        aio_context_acquire(aio_context);
>> +        backup_start(s->secondary_disk, s->hidden_disk, 0,
>> +                     MIRROR_SYNC_MODE_NONE, NULL, BLOCKDEV_ON_ERROR_REPORT,
>> +                     BLOCKDEV_ON_ERROR_REPORT, backup_job_completed,
>> +                     s, NULL, &local_err);
>> +        aio_context_release(aio_context);
> 
> This is the only place that uses AioContext to protect against other
> threads accessing BlockDriverState at the same time.  Any operation like
> replication_start/stop/checkpoint where you are not sure if the calling
> thread is the same thread that holds the AioContext must use
> aio_context_acquire/release() to protect against race conditions.
> 
> For example, replication_do_checkpoint() might be called from a thread
> that doesn't hold AioContext so it must acquire brs->bs AioContext
> before accessing BDRVReplicationState.

OK, I will check it and fix it in the next version.

Thanks
Wen Congyang

> 
>> +        if (local_err) {
>> +            error_propagate(errp, local_err);
>> +            backup_job_cleanup(s);
>> +            bdrv_unref(s->hidden_disk);
>> +            return;
>> +        }
>> +        break;
>> +    default:
>> +        abort();
>> +    }
>> +
>> +    s->replication_state = BLOCK_REPLICATION_RUNNING;
>> +
>> +    if (s->mode == REPLICATION_MODE_SECONDARY) {
>> +        secondary_do_checkpoint(s, errp);
>> +    }
>> +
>> +    s->error = 0;
>> +}
>> +
>> +
>> +static void replication_do_checkpoint(BlockReplicationState *brs, Error 
>> **errp)
>> +{
>> +    BDRVReplicationState *s = brs->bs->opaque;
>> +
>> +    if (s->replication_state != BLOCK_REPLICATION_RUNNING) {
>> +        error_setg(errp, "Block replication is not running");
>> +        return;
>> +    }
>> +
>> +    if (s->error) {
>> +        error_setg(errp, "I/O error occurs");
>> +        return;
>> +    }
>> +
>> +    if (s->mode == REPLICATION_MODE_SECONDARY) {
>> +        secondary_do_checkpoint(s, errp);
>> +    }
>> +}
>> +
>> +static void replication_done(void *opaque, int ret)
>> +{
>> +    BlockDriverState *bs = opaque;
>> +    BDRVReplicationState *s = bs->opaque;
>> +
>> +    if (ret == 0) {
>> +        s->replication_state = BLOCK_REPLICATION_DONE;
>> +
>> +        /* refresh top bs's filename */
>> +        bdrv_refresh_filename(bs);
>> +        s->active_disk = NULL;
>> +        s->secondary_disk = NULL;
>> +        s->hidden_disk = NULL;
>> +        s->error = 0;
>> +    } else {
>> +        s->replication_state = BLOCK_REPLICATION_FAILOVER_FAILED;
>> +        s->error = -EIO;
>> +    }
>> +}
>> +
>> +static void replication_stop(BlockReplicationState *brs, bool failover, 
>> Error **errp)
>> +{
>> +    BlockDriverState *bs = brs->bs;
>> +    BDRVReplicationState *s = brs->bs->opaque;
>> +
>> +    if (s->replication_state != BLOCK_REPLICATION_RUNNING) {
>> +        error_setg(errp, "Block replication is not running");
>> +        return;
>> +    }
>> +
>> +    switch (s->mode) {
>> +    case REPLICATION_MODE_PRIMARY:
>> +        s->replication_state = BLOCK_REPLICATION_DONE;
>> +        s->error = 0;
>> +        break;
>> +    case REPLICATION_MODE_SECONDARY:
>> +        if (!failover) {
>> +            /*
>> +             * This BDS will be closed, and the job should be completed
>> +             * before the BDS is closed, because we will access hidden
>> +             * disk, secondary disk in backup_job_completed().
>> +             */
>> +            if (s->secondary_disk->job) {
>> +                block_job_cancel_sync(s->secondary_disk->job);
>> +            }
>> +            secondary_do_checkpoint(s, errp);
>> +            s->replication_state = BLOCK_REPLICATION_DONE;
>> +            return;
>> +        }
>> +
>> +        s->replication_state = BLOCK_REPLICATION_FAILOVER;
>> +        if (s->secondary_disk->job) {
>> +            block_job_cancel(s->secondary_disk->job);
>> +        }
>> +
>> +        commit_active_start(s->active_disk, s->secondary_disk, 0,
>> +                            BLOCKDEV_ON_ERROR_REPORT, replication_done,
>> +                            bs, errp, true);
>> +        break;
>> +    default:
>> +        abort();
>> +    }
>> +}
>> +
>> +BlockDriver bdrv_replication = {
>> +    .format_name                = "replication",
>> +    .protocol_name              = "replication",
>> +    .instance_size              = sizeof(BDRVReplicationState),
>> +
>> +    .bdrv_open                  = replication_open,
>> +    .bdrv_close                 = replication_close,
>> +
>> +    .bdrv_getlength             = replication_getlength,
>> +    .bdrv_co_readv              = replication_co_readv,
>> +    .bdrv_co_writev             = replication_co_writev,
>> +    .bdrv_co_discard            = replication_co_discard,
>> +
>> +    .is_filter                  = true,
>> +    .bdrv_recurse_is_first_non_filter = 
>> replication_recurse_is_first_non_filter,
>> +
>> +    .has_variable_length        = true,
>> +};
>> +
>> +static void bdrv_replication_init(void)
>> +{
>> +    bdrv_register(&bdrv_replication);
>> +}
>> +
>> +block_init(bdrv_replication_init);
>> diff --git a/include/block/replication-comm.h 
>> b/include/block/replication-comm.h
>> new file mode 100644
>> index 0000000..6483e56
>> --- /dev/null
>> +++ b/include/block/replication-comm.h
>> @@ -0,0 +1,50 @@
>> +/*
>> + * Replication Block filter
>> + *
>> + * Copyright (c) 2015 HUAWEI TECHNOLOGIES CO., LTD.
>> + * Copyright (c) 2015 Intel Corporation
>> + * Copyright (c) 2015 FUJITSU LIMITED
>> + *
>> + * Author:
>> + *   Wen Congyang <address@hidden>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
>> + * See the COPYING file in the top-level directory.
>> + */
>> +
>> +#ifndef REPLICATION_H
>> +#define REPLICATION_H
>> +
>> +#include "qemu/queue.h"
>> +#include "block/block_int.h"
>> +
>> +typedef struct BlockReplicationOps BlockReplicationOps;
>> +typedef struct BlockReplicationState BlockReplicationState;
>> +typedef void (*Start)(BlockReplicationState *brs, ReplicationMode mode, 
>> Error **errp);
>> +typedef void (*Stop)(BlockReplicationState *brs, bool failover, Error 
>> **errp);
>> +typedef void (*Checkpoint)(BlockReplicationState *brs, Error **errp);
>> +
>> +struct BlockReplicationState {
>> +    BlockDriverState *bs;
>> +    BlockReplicationOps *ops;
>> +    QLIST_ENTRY(BlockReplicationState) node;
>> +};
>> +
>> +struct BlockReplicationOps{
>> +    Start start;
>> +    Stop stop;
>> +    Checkpoint checkpoint;
>> +};
>> +
>> +BlockReplicationState *block_replication_new(BlockDriverState *bs,
>> +                                             BlockReplicationOps *ops);
>> +
>> +void block_replication_remove(BlockReplicationState *brs);
>> +
>> +void block_replication_start_all(ReplicationMode mode, Error **errp);
>> +
>> +void block_replication_do_checkpoint_all(Error **errp);
>> +
>> +void block_replication_stop_all(bool failover, Error **errp);
>> +
>> +#endif /* REPLICATION_H */
>> diff --git a/qapi/block-core.json b/qapi/block-core.json
>> index 2e86ede..d73f46b 100644
>> --- a/qapi/block-core.json
>> +++ b/qapi/block-core.json
>> @@ -1975,6 +1975,19 @@
>>              '*read-pattern': 'QuorumReadPattern' } }
>>  
>>  ##
>> +# @ReplicationMode
>> +#
>> +# An enumeration of replication modes.
>> +#
>> +# @primary: Primary mode, the vm's state will be sent to secondary QEMU.
>> +#
>> +# @secondary: Secondary mode, receive the vm's state from primary QEMU.
>> +#
>> +# Since: 2.6
>> +##
>> +{ 'enum' : 'ReplicationMode', 'data' : [ 'primary', 'secondary' ] }
>> +
>> +##
>>  # @BlockdevOptions
>>  #
>>  # Options for creating a block device.
>> -- 
>> 1.9.3
>>
>>
>>






reply via email to

[Prev in Thread] Current Thread [Next in Thread]