[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH v1 13/22] migration: convert RDMA to use QIOChan
From: |
Dr. David Alan Gilbert |
Subject: |
Re: [Qemu-devel] [PATCH v1 13/22] migration: convert RDMA to use QIOChannel interface |
Date: |
Tue, 2 Feb 2016 20:01:36 +0000 |
User-agent: |
Mutt/1.5.24 (2015-08-30) |
* Daniel P. Berrange (address@hidden) wrote:
> This converts the RDMA code to provide a subclass of
> QIOChannel that uses RDMA for the data transport.
>
> The RDMA code would be much better off it it could
> be split up in a generic RDMA layer, a QIOChannel
> impl based on RMDA, and then the RMDA migration
> glue. This is left as a future exercise for the brave.
>
> Signed-off-by: Daniel P. Berrange <address@hidden>
> ---
> migration/rdma.c | 260
> ++++++++++++++++++++++++++++++++++---------------------
> 1 file changed, 161 insertions(+), 99 deletions(-)
>
> diff --git a/migration/rdma.c b/migration/rdma.c
> index bffbfaf..3e961cb 100644
> --- a/migration/rdma.c
> +++ b/migration/rdma.c
> @@ -374,14 +374,19 @@ typedef struct RDMAContext {
> GHashTable *blockmap;
> } RDMAContext;
>
> -/*
> - * Interface to the rest of the migration call stack.
> - */
> -typedef struct QEMUFileRDMA {
> +#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
> +#define QIO_CHANNEL_RDMA(obj) \
> + OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA)
> +
> +typedef struct QIOChannelRDMA QIOChannelRDMA;
> +
> +
> +struct QIOChannelRDMA {
> + QIOChannel parent;
> RDMAContext *rdma;
> + QEMUFile *file;
> size_t len;
> - void *file;
> -} QEMUFileRDMA;
> +};
>
> /*
> * Main structure for IB Send/Recv control messages.
> @@ -2518,15 +2523,19 @@ static void *qemu_rdma_data_init(const char
> *host_port, Error **errp)
> * SEND messages for control only.
> * VM's ram is handled with regular RDMA messages.
> */
> -static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
> - int64_t pos, size_t size)
> -{
> - QEMUFileRDMA *r = opaque;
> - QEMUFile *f = r->file;
> - RDMAContext *rdma = r->rdma;
> - size_t remaining = size;
> - uint8_t * data = (void *) buf;
> +static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
> + const struct iovec *iov,
> + size_t niov,
> + int *fds,
> + size_t nfds,
> + Error **errp)
> +{
> + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> + QEMUFile *f = rioc->file;
> + RDMAContext *rdma = rioc->rdma;
> int ret;
> + ssize_t done = 0;
> + size_t i;
>
> CHECK_ERROR_STATE();
>
> @@ -2540,27 +2549,31 @@ static ssize_t qemu_rdma_put_buffer(void *opaque,
> const uint8_t *buf,
> return ret;
> }
>
> - while (remaining) {
> - RDMAControlHeader head;
> + for (i = 0; i < niov; i++) {
> + size_t remaining = iov[i].iov_len;
> + uint8_t * data = (void *)iov[i].iov_base;
> + while (remaining) {
> + RDMAControlHeader head;
>
> - r->len = MIN(remaining, RDMA_SEND_INCREMENT);
> - remaining -= r->len;
> + rioc->len = MIN(remaining, RDMA_SEND_INCREMENT);
> + remaining -= rioc->len;
>
> - /* Guaranteed to fit due to RDMA_SEND_INCREMENT MIN above */
> - head.len = (uint32_t)r->len;
> - head.type = RDMA_CONTROL_QEMU_FILE;
> + head.len = rioc->len;
> + head.type = RDMA_CONTROL_QEMU_FILE;
>
> - ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
> + ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL,
> NULL);
>
> - if (ret < 0) {
> - rdma->error_state = ret;
> - return ret;
> - }
> + if (ret < 0) {
> + rdma->error_state = ret;
> + return ret;
> + }
>
> - data += r->len;
> + data += rioc->len;
> + done += rioc->len;
> + }
> }
>
> - return size;
> + return done;
> }
>
> static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
> @@ -2585,41 +2598,65 @@ static size_t qemu_rdma_fill(RDMAContext *rdma,
> uint8_t *buf,
> * RDMA links don't use bytestreams, so we have to
> * return bytes to QEMUFile opportunistically.
> */
> -static ssize_t qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
> - int64_t pos, size_t size)
> -{
> - QEMUFileRDMA *r = opaque;
> - RDMAContext *rdma = r->rdma;
> +static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
> + const struct iovec *iov,
> + size_t niov,
> + int **fds,
> + size_t *nfds,
> + Error **errp)
> +{
> + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> + RDMAContext *rdma = rioc->rdma;
> RDMAControlHeader head;
> int ret = 0;
> + ssize_t i;
> + size_t done = 0;
>
> CHECK_ERROR_STATE();
>
> - /*
> - * First, we hold on to the last SEND message we
> - * were given and dish out the bytes until we run
> - * out of bytes.
> - */
> - r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
> - if (r->len) {
> - return r->len;
> - }
> + for (i = 0; i < niov; i++) {
> + size_t want = iov[i].iov_len;
> + uint8_t *data = (void *)iov[i].iov_base;
>
> - /*
> - * Once we run out, we block and wait for another
> - * SEND message to arrive.
> - */
> - ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
> + /*
> + * First, we hold on to the last SEND message we
> + * were given and dish out the bytes until we run
> + * out of bytes.
> + */
> + ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
> + if (ret > 0) {
> + done += ret;
> + if (ret < want) {
> + break;
> + } else {
> + continue;
> + }
> + }
>
> - if (ret < 0) {
> - rdma->error_state = ret;
> - return ret;
> - }
> + /*
> + * Once we run out, we block and wait for another
> + * SEND message to arrive.
> + */
> + ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
>
> - /*
> - * SEND was received with new bytes, now try again.
> - */
> - return qemu_rdma_fill(r->rdma, buf, size, 0);
> + if (ret < 0) {
> + rdma->error_state = ret;
> + return ret;
> + }
> +
> + /*
> + * SEND was received with new bytes, now try again.
> + */
> + ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
> + if (ret > 0) {
> + done += ret;
> + if (ret < want) {
> + break;
> + }
> + }
I don't quite understand the behaviour of this loop.
If rdma_fill returns less than you wanted for the first iov we break.
If it returns 0 then we try and get some more.
The weird thing to me is if we have two iov entries; if the
amount returned by the qemu_rdma_fill happens to match the size of
the 1st iov then I think we end up doing the exchange_recv and
waiting for more. Is that what we want? Why?
Dave
> + }
> + rioc->len = done;
> + return rioc->len;
> }
>
> /*
> @@ -2646,15 +2683,16 @@ static int qemu_rdma_drain_cq(QEMUFile *f,
> RDMAContext *rdma)
> return 0;
> }
>
> -static int qemu_rdma_close(void *opaque)
> +static int qio_channel_rdma_close(QIOChannel *ioc,
> + Error **errp)
> {
> + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> trace_qemu_rdma_close();
> - QEMUFileRDMA *r = opaque;
> - if (r->rdma) {
> - qemu_rdma_cleanup(r->rdma);
> - g_free(r->rdma);
> + if (rioc->rdma) {
> + qemu_rdma_cleanup(rioc->rdma);
> + g_free(rioc->rdma);
> + rioc->rdma = NULL;
> }
> - g_free(r);
> return 0;
> }
>
> @@ -2696,8 +2734,8 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void
> *opaque,
> ram_addr_t block_offset, ram_addr_t offset,
> size_t size, uint64_t *bytes_sent)
> {
> - QEMUFileRDMA *rfile = opaque;
> - RDMAContext *rdma = rfile->rdma;
> + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> + RDMAContext *rdma = rioc->rdma;
> int ret;
>
> CHECK_ERROR_STATE();
> @@ -2951,8 +2989,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f,
> void *opaque)
> };
> RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
> .repeat = 1 };
> - QEMUFileRDMA *rfile = opaque;
> - RDMAContext *rdma = rfile->rdma;
> + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> + RDMAContext *rdma = rioc->rdma;
> RDMALocalBlocks *local = &rdma->local_ram_blocks;
> RDMAControlHeader head;
> RDMARegister *reg, *registers;
> @@ -3207,9 +3245,10 @@ out:
> * We've already built our local RAMBlock list, but not yet sent the list to
> * the source.
> */
> -static int rdma_block_notification_handle(QEMUFileRDMA *rfile, const char
> *name)
> +static int
> +rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
> {
> - RDMAContext *rdma = rfile->rdma;
> + RDMAContext *rdma = rioc->rdma;
> int curr;
> int found = -1;
>
> @@ -3251,8 +3290,8 @@ static int rdma_load_hook(QEMUFile *f, void *opaque,
> uint64_t flags, void *data)
> static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
> uint64_t flags, void *data)
> {
> - QEMUFileRDMA *rfile = opaque;
> - RDMAContext *rdma = rfile->rdma;
> + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> + RDMAContext *rdma = rioc->rdma;
>
> CHECK_ERROR_STATE();
>
> @@ -3271,8 +3310,8 @@ static int qemu_rdma_registration_stop(QEMUFile *f,
> void *opaque,
> uint64_t flags, void *data)
> {
> Error *local_err = NULL, **errp = &local_err;
> - QEMUFileRDMA *rfile = opaque;
> - RDMAContext *rdma = rfile->rdma;
> + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> + RDMAContext *rdma = rioc->rdma;
> RDMAControlHeader head = { .len = 0, .repeat = 1 };
> int ret = 0;
>
> @@ -3368,55 +3407,78 @@ err:
> return ret;
> }
>
> -static int qemu_rdma_get_fd(void *opaque)
> -{
> - QEMUFileRDMA *rfile = opaque;
> - RDMAContext *rdma = rfile->rdma;
> -
> - return rdma->comp_channel->fd;
> -}
> -
> -static const QEMUFileOps rdma_read_ops = {
> - .get_buffer = qemu_rdma_get_buffer,
> - .get_fd = qemu_rdma_get_fd,
> - .close = qemu_rdma_close,
> -};
> -
> static const QEMUFileHooks rdma_read_hooks = {
> .hook_ram_load = rdma_load_hook,
> };
>
> -static const QEMUFileOps rdma_write_ops = {
> - .put_buffer = qemu_rdma_put_buffer,
> - .close = qemu_rdma_close,
> -};
> -
> static const QEMUFileHooks rdma_write_hooks = {
> .before_ram_iterate = qemu_rdma_registration_start,
> .after_ram_iterate = qemu_rdma_registration_stop,
> .save_page = qemu_rdma_save_page,
> };
>
> -static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
> +
> +static void qio_channel_rdma_finalize(Object *obj)
> +{
> + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
> + if (rioc->rdma) {
> + qemu_rdma_cleanup(rioc->rdma);
> + g_free(rioc->rdma);
> + rioc->rdma = NULL;
> + }
> +}
> +
> +static void qio_channel_rdma_class_init(ObjectClass *klass,
> + void *class_data G_GNUC_UNUSED)
> +{
> + QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
> +
> + ioc_klass->io_writev = qio_channel_rdma_writev;
> + ioc_klass->io_readv = qio_channel_rdma_readv;
> + /* XXX
> + ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
> + */
> + ioc_klass->io_close = qio_channel_rdma_close;
> + /* XXX
> + ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
> + */
> +}
> +
> +static const TypeInfo qio_channel_rdma_info = {
> + .parent = TYPE_QIO_CHANNEL,
> + .name = TYPE_QIO_CHANNEL_RDMA,
> + .instance_size = sizeof(QIOChannelRDMA),
> + .instance_finalize = qio_channel_rdma_finalize,
> + .class_init = qio_channel_rdma_class_init,
> +};
> +
> +static void qio_channel_rdma_register_types(void)
> +{
> + type_register_static(&qio_channel_rdma_info);
> +}
> +
> +type_init(qio_channel_rdma_register_types);
> +
> +static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
> {
> - QEMUFileRDMA *r;
> + QIOChannelRDMA *rioc;
>
> if (qemu_file_mode_is_not_valid(mode)) {
> return NULL;
> }
>
> - r = g_new0(QEMUFileRDMA, 1);
> - r->rdma = rdma;
> + rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
> + rioc->rdma = rdma;
>
> if (mode[0] == 'w') {
> - r->file = qemu_fopen_ops(r, &rdma_write_ops);
> - qemu_file_set_hooks(r->file, &rdma_write_hooks);
> + rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
> + qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
> } else {
> - r->file = qemu_fopen_ops(r, &rdma_read_ops);
> - qemu_file_set_hooks(r->file, &rdma_read_hooks);
> + rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
> + qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
> }
>
> - return r->file;
> + return rioc->file;
> }
>
> static void rdma_accept_incoming_migration(void *opaque)
> --
> 2.5.0
>
--
Dr. David Alan Gilbert / address@hidden / Manchester, UK
- Re: [Qemu-devel] [PATCH v1 13/22] migration: convert RDMA to use QIOChannel interface,
Dr. David Alan Gilbert <=