[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PULL v3 4/7] rdma: core logic
From: |
Paolo Bonzini |
Subject: |
Re: [Qemu-devel] [PULL v3 4/7] rdma: core logic |
Date: |
Wed, 17 Apr 2013 11:16:23 +0200 |
User-agent: |
Mozilla/5.0 (X11; Linux x86_64; rv:17.0) Gecko/20130311 Thunderbird/17.0.4 |
Some more comments and misunderstandings :) but another step forward.
Paolo
Il 17/04/2013 06:20, address@hidden ha scritto:
> From: "Michael R. Hines" <address@hidden>
>
> As requested, code that does need to be visible is kept
> well contained inside this file and this is the only
> new additional file to the entire patch - good
> progress.
>
> This file includes the entire protocol and interfaces
> required to perform RDMA migration.
>
> Also, the configure and Makefile modifications to link
> this file are included.
>
> Full documentation is in docs/rdma.txt
>
> Signed-off-by: Michael R. Hines <address@hidden>
> ---
> Makefile.objs | 1 +
> configure | 29 +
> include/migration/migration.h | 4 +
> migration-rdma.c | 2778
> +++++++++++++++++++++++++++++++++++++++++
> migration.c | 9 +-
> 5 files changed, 2820 insertions(+), 1 deletion(-)
> create mode 100644 migration-rdma.c
>
> diff --git a/Makefile.objs b/Makefile.objs
> index a473348..d744827 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -49,6 +49,7 @@ common-obj-$(CONFIG_POSIX) += os-posix.o
> common-obj-$(CONFIG_LINUX) += fsdev/
>
> common-obj-y += migration.o migration-tcp.o
> +common-obj-$(CONFIG_RDMA) += migration-rdma.o
> common-obj-y += qemu-char.o #aio.o
> common-obj-y += block-migration.o
> common-obj-y += page_cache.o xbzrle.o
> diff --git a/configure b/configure
> index 4c4f6f6..9decae2 100755
> --- a/configure
> +++ b/configure
> @@ -180,6 +180,7 @@ xfs=""
>
> vhost_net="no"
> kvm="no"
> +rdma="yes"
> gprof="no"
> debug_tcg="no"
> debug="no"
> @@ -925,6 +926,10 @@ for opt do
> ;;
> --enable-gtk) gtk="yes"
> ;;
> + --enable-rdma) rdma="yes"
> + ;;
> + --disable-rdma) rdma="no"
> + ;;
> --with-gtkabi=*) gtkabi="$optarg"
> ;;
> --enable-tpm) tpm="yes"
> @@ -1133,6 +1138,8 @@ echo " --enable-bluez enable bluez stack
> connectivity"
> echo " --disable-slirp disable SLIRP userspace network
> connectivity"
> echo " --disable-kvm disable KVM acceleration support"
> echo " --enable-kvm enable KVM acceleration support"
> +echo " --disable-rdma disable RDMA-based migration support"
> +echo " --enable-rdma enable RDMA-based migration support"
> echo " --enable-tcg-interpreter enable TCG with bytecode interpreter (TCI)"
> echo " --disable-nptl disable usermode NPTL support"
> echo " --enable-nptl enable usermode NPTL support"
> @@ -1782,6 +1789,23 @@ EOF
> libs_softmmu="$sdl_libs $libs_softmmu"
> fi
>
> +if test "$rdma" != "no" ; then
> + cat > $TMPC <<EOF
> +#include <rdma/rdma_cma.h>
> +int main(void) { return 0; }
> +EOF
> + rdma_libs="-lrdmacm -libverbs"
> + if compile_prog "-Werror" "$rdma_libs" ; then
> + rdma="yes"
> + libs_softmmu="$libs_softmmu $rdma_libs"
> + else
> + if test "$rdma" = "yes" ; then
> + feature_not_found "rdma"
> + fi
> + rdma="no"
> + fi
> +fi
> +
> ##########################################
> # VNC TLS/WS detection
> if test "$vnc" = "yes" -a \( "$vnc_tls" != "no" -o "$vnc_ws" != "no" \) ;
> then
> @@ -3524,6 +3548,7 @@ echo "Linux AIO support $linux_aio"
> echo "ATTR/XATTR support $attr"
> echo "Install blobs $blobs"
> echo "KVM support $kvm"
> +echo "RDMA support $rdma"
> echo "TCG interpreter $tcg_interpreter"
> echo "fdt support $fdt"
> echo "preadv support $preadv"
> @@ -4510,6 +4535,10 @@ if [ "$pixman" = "internal" ]; then
> echo "config-host.h: subdir-pixman" >> $config_host_mak
> fi
>
> +if test "$rdma" = "yes" ; then
> +echo "CONFIG_RDMA=y" >> $config_host_mak
> +fi
> +
> # build tree in object directory in case the source is not in the current
> directory
> DIRS="tests tests/tcg tests/tcg/cris tests/tcg/lm32"
> DIRS="$DIRS pc-bios/optionrom pc-bios/spapr-rtas"
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 8e02391..720e0a5 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -76,6 +76,10 @@ void fd_start_incoming_migration(const char *path, Error
> **errp);
>
> void fd_start_outgoing_migration(MigrationState *s, const char *fdname,
> Error **errp);
>
> +void rdma_start_outgoing_migration(void *opaque, const char *host_port,
> Error **errp);
> +
> +void rdma_start_incoming_migration(const char *host_port, Error **errp);
> +
> void migrate_fd_error(MigrationState *s);
>
> void migrate_fd_connect(MigrationState *s);
> diff --git a/migration-rdma.c b/migration-rdma.c
> new file mode 100644
> index 0000000..b1173cd
> --- /dev/null
> +++ b/migration-rdma.c
> @@ -0,0 +1,2778 @@
> +/*
> + * Copyright (C) 2013 Michael R. Hines <address@hidden>
> + * Copyright (C) 2010 Jiuxing Liu <address@hidden>
> + *
> + * RDMA protocol and interfaces
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; under version 2 of the License.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program; if not, see <http://www.gnu.org/licenses/>.
> + */
> +#include "qemu-common.h"
> +#include "migration/migration.h"
> +#include "migration/qemu-file.h"
> +#include "exec/cpu-common.h"
> +#include "qemu/main-loop.h"
> +#include "qemu/sockets.h"
> +#include <stdio.h>
> +#include <sys/types.h>
> +#include <sys/socket.h>
> +#include <netdb.h>
> +#include <arpa/inet.h>
> +#include <string.h>
> +#include <poll.h>
> +#include <rdma/rdma_cma.h>
> +
> +//#define DEBUG_RDMA
> +//#define DEBUG_RDMA_VERBOSE
> +
> +#ifdef DEBUG_RDMA
> +#define DPRINTF(fmt, ...) \
> + do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
> +#else
> +#define DPRINTF(fmt, ...) \
> + do { } while (0)
> +#endif
> +
> +#ifdef DEBUG_RDMA_VERBOSE
> +#define DDPRINTF(fmt, ...) \
> + do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
> +#else
> +#define DDPRINTF(fmt, ...) \
> + do { } while (0)
> +#endif
> +
> +#define RDMA_RESOLVE_TIMEOUT_MS 10000
> +
> +/*
> + * Debugging only. Not optional by default.
> + * Chunk != lazy source != lazy dest
> + * These are all different optimizations,
> + * the only one of which "chunk register destination" is optional.
> + */
> +#define RDMA_CHUNK_REGISTRATION
> +
> +#define RDMA_LAZY_CLIENT_REGISTRATION
Please drop this...
> +/* Do not merge data if larger than this. */
> +#define RDMA_MERGE_MAX (4 * 1024 * 1024)
> +#define RDMA_UNSIGNALED_SEND_MAX 64
> +
> +#define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
> +
> +/*
> + * Debugging only. Hard-coded only
> + */
> +//#define RDMA_REG_CHUNK_SHIFT 21 /* 2 MB */
> +//#define RDMA_REG_CHUNK_SHIFT 22 /* 4 MB */
> +//#define RDMA_REG_CHUNK_SHIFT 23 /* 8 MB */
> +//#define RDMA_REG_CHUNK_SHIFT 24 /* 16 MB */
> +//#define RDMA_REG_CHUNK_SHIFT 25 /* 32 MB */
> +//#define RDMA_REG_CHUNK_SHIFT 26 /* 64 MB */
> +//#define RDMA_REG_CHUNK_SHIFT 27 /* 128 MB */
> +//#define RDMA_REG_CHUNK_SHIFT 28 /* 256 MB */
Please drop this...
> +#define RDMA_REG_CHUNK_SIZE (1UL << (RDMA_REG_CHUNK_SHIFT))
> +
> +/*
> + * This is only for non-live state being migrated.
> + * Instead of RDMA_WRITE messages, we use RDMA_SEND
> + * messages for that state, which requires a different
> + * delivery design than main memory.
> + */
> +#define RDMA_SEND_INCREMENT 32768
> +
> +/*
> + * Completion queue can be filled by both read and write work requests,
> + * so must reflect the sum of both possible queue sizes.
> + */
> +#define RDMA_QP_SIZE 1000
> +#define RDMA_CQ_SIZE (RDMA_QP_SIZE * 3)
> +
> +/*
> + * Maximum size infiniband SEND message
> + */
> +#define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
> +#define RDMA_CONTROL_MAX_WR 2
> +#define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
> +
> +/*
> + * Capabilities for negotiation.
> + */
> +#define RDMA_CAPABILITY_CHUNK_REGISTER 0x01
> +#define RDMA_CAPABILITY_NEXT_FEATURE 0x02 /* not used, just code reminder
> */
Please drop this...
> +#define CHECK_ERROR_STATE() \
> + do { \
> + if (rdma->error_state) { \
> + fprintf(stderr, "RDMA is in an error state waiting migration" \
> + " to abort!\n"); \
> + return rdma->error_state; \
> + } \
> + } while(0);
> +/*
> + * RDMA migration protocol:
> + * 1. RDMA Writes (data messages, i.e. RAM)
> + * 2. IB Send/Recv (control channel messages)
> + */
> +enum {
> + RDMA_WRID_NONE = 0,
> + RDMA_WRID_RDMA_WRITE,
> + RDMA_WRID_SEND_CONTROL = 1000,
> + RDMA_WRID_RECV_CONTROL = 2000,
> +};
> +
> +const char *wrid_desc[] = {
> + [RDMA_WRID_NONE] = "NONE",
> + [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
> + [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
> + [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
> +};
> +
> +/*
> + * SEND/RECV IB Control Messages.
> + */
> +enum {
> + RDMA_CONTROL_NONE = 0,
> + RDMA_CONTROL_ERROR,
> + RDMA_CONTROL_READY, /* ready to receive */
> + RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */
> + RDMA_CONTROL_RAM_BLOCKS, /* RAMBlock synchronization */
> + RDMA_CONTROL_COMPRESS, /* page contains repeat values */
> + RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */
> + RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */
> + RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */
> +};
> +
> +const char *control_desc[] = {
> + [RDMA_CONTROL_NONE] = "NONE",
> + [RDMA_CONTROL_ERROR] = "ERROR",
> + [RDMA_CONTROL_READY] = "READY",
> + [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
> + [RDMA_CONTROL_RAM_BLOCKS] = "REMOTE INFO",
> + [RDMA_CONTROL_COMPRESS] = "COMPRESS",
> + [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
> + [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
> + [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
> +};
> +
> +/*
> + * Memory and MR structures used to represent an IB Send/Recv work request.
> + * This is *not* used for RDMA, only IB Send/Recv.
> + */
> +typedef struct {
> + uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register
> */
> + struct ibv_mr *control_mr; /* registration metadata */
> + size_t control_len; /* length of the message */
> + uint8_t *control_curr; /* start of unconsumed bytes
> */
> +} RDMAWorkRequestData;
> +
> +/*
> + * Negotiate RDMA capabilities during connection-setup time.
> + */
> +typedef struct {
> + uint32_t version;
> + uint32_t flags;
> +} RDMACapabilities;
> +
> +static void caps_to_network(RDMACapabilities *cap)
> +{
> + cap->version = htonl(cap->version);
> + cap->flags = htonl(cap->flags);
> +}
> +
> +static void network_to_caps(RDMACapabilities *cap)
> +{
> + cap->version = ntohl(cap->version);
> + cap->flags = ntohl(cap->flags);
> +}
> +
> +/*
> + * Representation of a RAMBlock from an RDMA perspective.
> + * This is not transmitted, only local.
> + * This and subsequent structures cannot be linked lists
> + * because we're using a single IB message to transmit
> + * the information. It's small anyway, so a list is overkill.
> + */
> +typedef struct RDMALocalBlock {
> + uint8_t *local_host_addr; /* local virtual address */
> + uint64_t remote_host_addr; /* remote virtual address */
> + uint64_t offset;
> + uint64_t length;
> + struct ibv_mr **pmr; /* MRs for chunk-level registration */
> + struct ibv_mr *mr; /* MR for non-chunk-level registration */
> + uint32_t *remote_keys; /* rkeys for chunk-level registration */
> + uint32_t remote_rkey; /* rkeys for non-chunk-level registration */
> +} RDMALocalBlock;
> +
> +/*
> + * Also represents a RAMblock, but only on the dest.
> + * This gets transmitted by the dest during connection-time
> + * to the source / primary VM and then is used to populate the
> + * corresponding RDMALocalBlock with
> + * the information needed to perform the actual RDMA.
> + */
> +typedef struct QEMU_PACKED RDMARemoteBlock {
> + uint64_t remote_host_addr;
> + uint64_t offset;
> + uint64_t length;
> + uint32_t remote_rkey;
> + uint32_t padding;
> +} QEMU_PACKED RDMARemoteBlock;
> +
> +/*
> + * Virtual address of the above structures used for transmitting
> + * the RAMBlock descriptions at connection-time.
> + */
> +typedef struct RDMALocalBlocks {
> + int num_blocks;
> + RDMALocalBlock *block;
> +} RDMALocalBlocks;
> +
> +/*
> + * Same as above
> + */
> +typedef struct RDMARemoteBlocks {
> + int *num_blocks;
> + RDMARemoteBlock *block;
> + void *remote_area;
> + int remote_size;
> +} RDMARemoteBlocks;
This is still wrong, you cannot compute block like that. Please remove
num_blocks from the wire protocol and use head->len instead to compute
it. Then num_blocks can be an int instead of an int *.
> +/*
> + * Main data structure for RDMA state.
> + * While there is only one copy of this structure being allocated right now,
> + * this is the place where one would start if you wanted to consider
> + * having more than one RDMA connection open at the same time.
> + */
> +typedef struct RDMAContext {
> + char *host;
> + int port;
> +
> + /* This is used by the migration protocol to transmit
> + * control messages (such as device state and registration commands)
> + *
> + * WR #0 is for control channel ready messages from the destination.
> + * WR #1 is for control channel data messages from the destination.
> + * WR #2 is for control channel send messages.
> + *
> + * We could use more WRs, but we have enough for now.
> + */
> + RDMAWorkRequestData wr_data[RDMA_CONTROL_MAX_WR + 1];
> +
> + /*
> + * This is used by *_exchange_send() to figure out whether or not
> + * the initial "READY" message has already been received or not.
> + * This is because other functions may potentially poll() and detect
> + * the READY message before send() does, in which case we need to
> + * know if it completed.
> + */
> + int control_ready_expected;
> +
> + /* number of outstanding unsignaled send */
> + int num_unsignaled_send;
> +
> + /* number of outstanding signaled send */
> + int num_signaled_send;
> +
> + /* store info about current buffer so that we can
> + merge it with future sends */
> + uint64_t current_offset;
> + uint64_t current_length;
> + /* index of ram block the current buffer belongs to */
> + int current_index;
> + /* index of the chunk in the current ram block */
> + int current_chunk;
> +
> + bool chunk_register_destination;
> +
> + /*
> + * infiniband-specific variables for opening the device
> + * and maintaining connection state and so forth.
> + *
> + * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
> + * cm_id->verbs, cm_id->channel, and cm_id->qp.
> + */
> + struct rdma_cm_id *cm_id; /* connection manager ID */
> + struct rdma_cm_id *listen_id;
> +
> + struct ibv_context *verbs;
> + struct rdma_event_channel *channel;
> + struct ibv_qp *qp; /* queue pair */
> + struct ibv_comp_channel *comp_channel; /* completion channel */
> + struct ibv_pd *pd; /* protection domain */
> + struct ibv_cq *cq; /* completion queue */
> +
> + /*
> + * If a previous write failed (perhaps because of a failed
> + * memory registration, then do not attempt any future work
> + * and remember the error state.
> + */
> + int error_state;
> +
> + /*
> + * Description of ram blocks used throughout the code.
> + */
> + RDMALocalBlocks local_ram_blocks;
> + RDMARemoteBlocks remote_ram_blocks;
> +} RDMAContext;
> +
> +/*
> + * Interface to the rest of the migration call stack.
> + */
> +typedef struct QEMUFileRDMA {
> + RDMAContext *rdma;
> + size_t len;
> + void *file;
> +} QEMUFileRDMA;
> +
> +#define RDMA_CONTROL_VERSION_CURRENT 1
> +
> +/*
> + * Main structure for IB Send/Recv control messages.
> + * This gets prepended at the beginning of every Send/Recv.
> + */
> +typedef struct QEMU_PACKED {
> + uint32_t len; /* Total length of data portion */
> + uint32_t type; /* which control command to perform */
> + uint32_t repeat; /* number of commands in data portion of same type */
> + uint32_t padding;
> +} QEMU_PACKED RDMAControlHeader;
> +
> +static void control_to_network(RDMAControlHeader *control)
> +{
> + control->type = htonl(control->type);
> + control->len = htonl(control->len);
> + control->repeat = htonl(control->repeat);
> +}
> +
> +static void network_to_control(RDMAControlHeader *control)
> +{
> + control->type = ntohl(control->type);
> + control->len = ntohl(control->len);
> + control->repeat = ntohl(control->repeat);
> +}
> +
> +/*
> + * Register a single Chunk.
> + * Information sent by the primary VM to inform the dest
> + * to register an single chunk of memory before we can perform
> + * the actual RDMA operation.
> + */
> +typedef struct QEMU_PACKED {
> + uint32_t len; /* length of the chunk to be registered */
> + uint32_t current_index; /* which ramblock the chunk belongs to */
> + uint64_t offset; /* offset into the ramblock of the chunk */
> +} QEMU_PACKED RDMARegister;
> +
> +typedef struct QEMU_PACKED {
> + uint32_t value; /* if zero, we will madvise() */
> + uint32_t block_idx; /* which ram block index */
> + uint64_t offset; /* where in the remote ramblock this chunk */
> + uint64_t length; /* length of the chunk */
> +} QEMU_PACKED RDMACompress;
> +
> +/*
> + * The result of the dest's memory registration produces an "rkey"
> + * which the primary VM must reference in order to perform
> + * the RDMA operation.
> + */
> +typedef struct QEMU_PACKED {
> + uint32_t rkey;
> + uint32_t padding;
> +} QEMU_PACKED RDMARegisterResult;
> +
> +
> +inline static int ram_chunk_index(uint8_t *start, uint8_t *host)
> +{
> + return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
> +}
> +
> +inline static int ram_chunk_count(RDMALocalBlock *rdma_ram_block)
> +{
> + return ram_chunk_index(rdma_ram_block->local_host_addr,
> + rdma_ram_block->local_host_addr + rdma_ram_block->length) + 1;
> +}
> +
> +static inline uint8_t *ram_chunk_start(RDMALocalBlock *rdma_ram_block, int i)
> +{
> + return (uint8_t *) (((uintptr_t) rdma_ram_block->local_host_addr)
> + + (i << RDMA_REG_CHUNK_SHIFT));
> +}
> +
> +inline static uint8_t *ram_chunk_end(RDMALocalBlock *rdma_ram_block, int i)
> +{
> + return ram_chunk_start(rdma_ram_block, i) + RDMA_REG_CHUNK_SIZE;
Please clamp the result against rdma_ram_block->local_host_addr +
rdma_ram_block->length here.
> +}
> +
> +/*
> + * Memory regions need to be registered with the device and queue pairs setup
> + * in advanced before the migration starts. This tells us where the RAM
> blocks
> + * are so that we can register them individually.
> + */
> +static void qemu_rdma_init_one_block(void *host_addr,
> + ram_addr_t offset, ram_addr_t length, void *opaque)
> +{
> + RDMALocalBlocks *rdma_local_ram_blocks = opaque;
> + int num_blocks = rdma_local_ram_blocks->num_blocks;
> +
> + rdma_local_ram_blocks->block[num_blocks].local_host_addr = host_addr;
> + rdma_local_ram_blocks->block[num_blocks].offset = (uint64_t)offset;
> + rdma_local_ram_blocks->block[num_blocks].length = (uint64_t)length;
> + rdma_local_ram_blocks->num_blocks++;
> +
> +}
> +
> +static void qemu_rdma_ram_block_counter(void *host_addr,
> + ram_addr_t offset, ram_addr_t length, void *opaque)
> +{
> + int *num_blocks = opaque;
> + *num_blocks = *num_blocks + 1;
> +}
> +
> +/*
> + * Identify the RAMBlocks and their quantity. They will be references to
> + * identify chunk boundaries inside each RAMBlock and also be referenced
> + * during dynamic page registration.
> + */
> +static int qemu_rdma_init_ram_blocks(RDMALocalBlocks *rdma_local_ram_blocks)
> +{
> + int num_blocks = 0;
> +
> + qemu_ram_foreach_block(qemu_rdma_ram_block_counter, &num_blocks);
> +
> + memset(rdma_local_ram_blocks, 0, sizeof *rdma_local_ram_blocks);
> + rdma_local_ram_blocks->block = g_malloc0(sizeof(RDMALocalBlock) *
> + num_blocks);
> +
> + rdma_local_ram_blocks->num_blocks = 0;
> + qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma_local_ram_blocks);
> +
> + DPRINTF("Allocated %d local ram block structures\n",
> + rdma_local_ram_blocks->num_blocks);
> + return 0;
> +}
> +
> +/*
> + * Put in the log file which RDMA device was opened and the details
> + * associated with that device.
> + */
> +static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
> +{
> + printf("%s RDMA Device opened: kernel name %s "
> + "uverbs device name %s, "
> + "infiniband_verbs class device path %s,"
> + " infiniband class device path %s\n",
> + who,
> + verbs->device->name,
> + verbs->device->dev_name,
> + verbs->device->dev_path,
> + verbs->device->ibdev_path);
> +}
> +
> +/*
> + * Put in the log file the RDMA gid addressing information,
> + * useful for folks who have trouble understanding the
> + * RDMA device hierarchy in the kernel.
> + */
> +static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
> +{
> + char sgid[33];
> + char dgid[33];
> + inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
> + inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
> + DPRINTF("%s Source GID: %s, Dest GID: %s\n", who, sgid, dgid);
> +}
> +
> +/*
> + * Figure out which RDMA device corresponds to the requested IP hostname
> + * Also create the initial connection manager identifiers for opening
> + * the connection.
> + */
> +static int qemu_rdma_resolve_host(RDMAContext *rdma)
> +{
> + int ret;
> + struct addrinfo *res;
> + char port_str[16];
> + struct rdma_cm_event *cm_event;
> + char ip[40] = "unknown";
> +
> + if (rdma->host == NULL || !strcmp(rdma->host, "")) {
> + fprintf(stderr, "RDMA hostname has not been set\n");
> + return -1;
> + }
> +
> + /* create CM channel */
> + rdma->channel = rdma_create_event_channel();
> + if (!rdma->channel) {
> + fprintf(stderr, "could not create CM channel\n");
> + return -1;
> + }
> +
> + /* create CM id */
> + ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
> + if (ret) {
> + fprintf(stderr, "could not create channel id\n");
> + goto err_resolve_create_id;
> + }
> +
> + snprintf(port_str, 16, "%d", rdma->port);
> + port_str[15] = '\0';
> +
> + ret = getaddrinfo(rdma->host, port_str, NULL, &res);
> + if (ret < 0) {
> + fprintf(stderr, "could not getaddrinfo destination address %s\n",
> + rdma->host);
> + goto err_resolve_get_addr;
> + }
> +
> + inet_ntop(AF_INET, &((struct sockaddr_in *) res->ai_addr)->sin_addr,
> + ip, sizeof ip);
> + printf("%s => %s\n", rdma->host, ip);
> +
> + /* resolve the first address */
> + ret = rdma_resolve_addr(rdma->cm_id, NULL, res->ai_addr,
> + RDMA_RESOLVE_TIMEOUT_MS);
> + if (ret) {
> + fprintf(stderr, "could not resolve address %s\n", rdma->host);
> + goto err_resolve_get_addr;
> + }
> +
> + qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
> +
> + ret = rdma_get_cm_event(rdma->channel, &cm_event);
> + if (ret) {
> + fprintf(stderr, "could not perform event_addr_resolved\n");
> + goto err_resolve_get_addr;
> + }
> +
> + if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
> + fprintf(stderr, "result not equal to event_addr_resolved %s\n",
> + rdma_event_str(cm_event->event));
> + perror("rdma_resolve_addr");
> + goto err_resolve_get_addr;
> + }
> + rdma_ack_cm_event(cm_event);
> +
> + /* resolve route */
> + ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
> + if (ret) {
> + fprintf(stderr, "could not resolve rdma route\n");
> + goto err_resolve_get_addr;
> + }
> +
> + ret = rdma_get_cm_event(rdma->channel, &cm_event);
> + if (ret) {
> + fprintf(stderr, "could not perform event_route_resolved\n");
> + goto err_resolve_get_addr;
> + }
> + if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
> + fprintf(stderr, "result not equal to event_route_resolved: %s\n",
> + rdma_event_str(cm_event->event));
> + rdma_ack_cm_event(cm_event);
> + goto err_resolve_get_addr;
> + }
> + rdma_ack_cm_event(cm_event);
> + rdma->verbs = rdma->cm_id->verbs;
> + qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
> + qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
> + return 0;
> +
> +err_resolve_get_addr:
> + rdma_destroy_id(rdma->cm_id);
> +err_resolve_create_id:
> + rdma_destroy_event_channel(rdma->channel);
> + rdma->channel = NULL;
> +
> + return -1;
> +}
> +
> +/*
> + * Create protection domain and completion queues
> + */
> +static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
> +{
> + /* allocate pd */
> + rdma->pd = ibv_alloc_pd(rdma->verbs);
> + if (!rdma->pd) {
> + fprintf(stderr, "failed to allocate protection domain\n");
> + return -1;
> + }
> +
> + /* create completion channel */
> + rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
> + if (!rdma->comp_channel) {
> + fprintf(stderr, "failed to allocate completion channel\n");
> + goto err_alloc_pd_cq;
> + }
> +
> + qemu_set_nonblock(rdma->comp_channel->fd);
Again showing my lack of IB-fu, do you have to test for POLLOUT
somewhere too? Or does it just work?
> + /* create cq */
> + rdma->cq = ibv_create_cq(rdma->verbs, RDMA_CQ_SIZE,
> + NULL, rdma->comp_channel, 0);
> + if (!rdma->cq) {
> + fprintf(stderr, "failed to allocate completion queue\n");
> + goto err_alloc_pd_cq;
> + }
> +
> + return 0;
> +
> +err_alloc_pd_cq:
> + if (rdma->pd) {
> + ibv_dealloc_pd(rdma->pd);
> + }
> + if (rdma->comp_channel) {
> + ibv_destroy_comp_channel(rdma->comp_channel);
> + }
> + rdma->pd = NULL;
> + rdma->comp_channel = NULL;
> + return -1;
> +
> +}
> +
> +/*
> + * Create queue pairs.
> + */
> +static int qemu_rdma_alloc_qp(RDMAContext *rdma)
> +{
> + struct ibv_qp_init_attr attr = { 0 };
> + int ret;
> +
> + attr.cap.max_send_wr = RDMA_QP_SIZE;
> + attr.cap.max_recv_wr = 3;
> + attr.cap.max_send_sge = 1;
> + attr.cap.max_recv_sge = 1;
> + attr.send_cq = rdma->cq;
> + attr.recv_cq = rdma->cq;
> + attr.qp_type = IBV_QPT_RC;
> +
> + ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
> + if (ret) {
> + return -1;
> + }
> +
> + rdma->qp = rdma->cm_id->qp;
> + return 0;
> +}
> +
> +/*
> + * Very important debugging-only code.
> + * Will bitrot if maintained out-of-tree.
Will also bitrot if maintained in-tree, trust me. Drop.
> + */
> +#if !defined(RDMA_LAZY_CLIENT_REGISTRATION)
> +static int qemu_rdma_reg_chunk_ram_blocks(RDMAContext *rdma,
> + RDMALocalBlocks *rdma_local_ram_blocks)
> +{
> + int i, j;
> + for (i = 0; i < rdma_local_ram_blocks->num_blocks; i++) {
> + RDMALocalBlock *block = &(rdma_local_ram_blocks->block[i]);
> + int num_chunks = ram_chunk_count(block);
> + /* allocate memory to store chunk MRs */
> + rdma_local_ram_blocks->block[i].pmr = g_malloc0(
> + num_chunks * sizeof(struct ibv_mr *));
> +
> + if (!block->pmr) {
> + goto err_reg_chunk_ram_blocks;
> + }
> +
> + for (j = 0; j < num_chunks; j++) {
> + uint8_t *start_addr = ram_chunk_start(block, j);
> + uint8_t *end_addr = ram_chunk_end(block, j);
> + if (start_addr < block->local_host_addr) {
> + start_addr = block->local_host_addr;
> + }
> + if (end_addr > block->local_host_addr + block->length) {
> + end_addr = block->local_host_addr + block->length;
> + }
> + block->pmr[j] = ibv_reg_mr(rdma->pd,
> + start_addr,
> + end_addr - start_addr,
> + IBV_ACCESS_REMOTE_READ
> + );
> + DDPRINTF("Registering blocks\n");
> + if (!block->pmr[j]) {
> + fprintf(stderr, "reg_chunk_ram_blocks failed!\n");
> + break;
> + }
> + DDPRINTF("Finished registering blocks\n");
> + }
> + if (j < num_chunks) {
> + for (j--; j >= 0; j--) {
> + ibv_dereg_mr(block->pmr[j]);
> + }
> + block->pmr[i] = NULL;
> + goto err_reg_chunk_ram_blocks;
> + }
> + }
> +
> + return 0;
> +
> +err_reg_chunk_ram_blocks:
> + for (i--; i >= 0; i--) {
> + int num_chunks =
> + ram_chunk_count(&(rdma_local_ram_blocks->block[i]));
> + for (j = 0; j < num_chunks; j++) {
> + ibv_dereg_mr(rdma_local_ram_blocks->block[i].pmr[j]);
> + }
> + g_free(rdma_local_ram_blocks->block[i].pmr);
> + rdma_local_ram_blocks->block[i].pmr = NULL;
> + }
> +
> + return -1;
> +
> +}
> +#endif
> +
> +static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma,
> + RDMALocalBlocks *rdma_local_ram_blocks)
> +{
> + int i;
> + for (i = 0; i < rdma_local_ram_blocks->num_blocks; i++) {
> + DDPRINTF("Registering whole ram blocks\n");
> + rdma_local_ram_blocks->block[i].mr =
> + ibv_reg_mr(rdma->pd,
> + rdma_local_ram_blocks->block[i].local_host_addr,
> + rdma_local_ram_blocks->block[i].length,
> + IBV_ACCESS_LOCAL_WRITE |
> + IBV_ACCESS_REMOTE_WRITE
> + );
> + if (!rdma_local_ram_blocks->block[i].mr) {
> + fprintf(stderr, "Failed to register local dest ram block!\n");
> + break;
> + }
> + DDPRINTF("Finished registering whole ram blocks\n");
> + }
> +
> + if (i >= rdma_local_ram_blocks->num_blocks) {
> + return 0;
> + }
> +
> + for (i--; i >= 0; i--) {
> + ibv_dereg_mr(rdma_local_ram_blocks->block[i].mr);
> + }
> +
> + return -1;
> +
> +}
> +
> +/*
> + * Important debugging-only code. Client lazy registration is not optional.
Drop.
> + */
> +static int qemu_rdma_source_reg_ram_blocks(RDMAContext *rdma,
> + RDMALocalBlocks *rdma_local_ram_blocks)
> +{
> +#ifdef RDMA_CHUNK_REGISTRATION
> +#ifdef RDMA_LAZY_CLIENT_REGISTRATION
> + return 0;
> +#else
> + return qemu_rdma_reg_chunk_ram_blocks(rdma, rdma_local_ram_blocks);
> +#endif
> +#else
> + return qemu_rdma_reg_whole_ram_blocks(rdma, rdma_local_ram_blocks);
> +#endif
> +}
> +
> +/*
> + * Shutdown and clean things up.
> + */
> +static void qemu_rdma_dereg_ram_blocks(RDMALocalBlocks
> *rdma_local_ram_blocks)
> +{
> + int i, j;
> + for (i = 0; i < rdma_local_ram_blocks->num_blocks; i++) {
> + int num_chunks;
> + if (!rdma_local_ram_blocks->block[i].pmr) {
> + continue;
> + }
> + num_chunks = ram_chunk_count(&(rdma_local_ram_blocks->block[i]));
> + for (j = 0; j < num_chunks; j++) {
> + if (!rdma_local_ram_blocks->block[i].pmr[j]) {
> + continue;
> + }
> + ibv_dereg_mr(rdma_local_ram_blocks->block[i].pmr[j]);
> + }
> + g_free(rdma_local_ram_blocks->block[i].pmr);
> + rdma_local_ram_blocks->block[i].pmr = NULL;
> + }
> + for (i = 0; i < rdma_local_ram_blocks->num_blocks; i++) {
> + if (!rdma_local_ram_blocks->block[i].mr) {
> + continue;
> + }
> + ibv_dereg_mr(rdma_local_ram_blocks->block[i].mr);
> + rdma_local_ram_blocks->block[i].mr = NULL;
> + }
> +}
> +
> +/*
> + * Server uses this to prepare to transmit the RAMBlock descriptions
> + * to the primary VM after connection setup.
> + * Both sides use the "remote" structure to communicate and update
> + * their "local" descriptions with what was sent.
> + */
> +static void qemu_rdma_copy_to_remote_ram_blocks(RDMAContext *rdma,
> + RDMALocalBlocks *local,
> + RDMARemoteBlocks *remote)
> +{
> + int i;
> + DPRINTF("Allocating %d remote ram block structures\n",
> local->num_blocks);
> + *remote->num_blocks = local->num_blocks;
> +
> + for (i = 0; i < local->num_blocks; i++) {
> + remote->block[i].remote_host_addr =
> + (uint64_t)(local->block[i].local_host_addr);
> +
> + if (!rdma->chunk_register_destination) {
> + remote->block[i].remote_rkey = local->block[i].mr->rkey;
> + }
> +
> + remote->block[i].offset = local->block[i].offset;
> + remote->block[i].length = local->block[i].length;
> + }
> +}
> +
> +/*
> + * The protocol uses two different sets of rkeys (mutually exclusive):
> + * 1. One key to represent the virtual address of the entire ram block.
> + * (dynamic chunk registration disabled - pin everything with one rkey.)
> + * 2. One to represent individual chunks within a ram block.
> + * (dynamic chunk registration enabled - pin individual chunks.)
> + *
> + * Once the capability is successfully negotiated, the destination transmits
> + * the keys to use (or sends them later) including the virtual addresses
> + * and then propagates the remote ram block descriptions to his local copy.
> + */
> +static int qemu_rdma_process_remote_ram_blocks(RDMALocalBlocks *local,
> + RDMARemoteBlocks *remote)
> +{
> + int i, j;
> +
> + if (local->num_blocks != *remote->num_blocks) {
> + fprintf(stderr, "local %d != remote %d\n",
> + local->num_blocks, *remote->num_blocks);
> + return -1;
> + }
> +
> + for (i = 0; i < *remote->num_blocks; i++) {
> + /* search local ram blocks */
> + for (j = 0; j < local->num_blocks; j++) {
> + if (remote->block[i].offset != local->block[j].offset) {
> + continue;
> + }
> + if (remote->block[i].length != local->block[j].length) {
> + return -1;
> + }
> + local->block[j].remote_host_addr =
> + remote->block[i].remote_host_addr;
> + local->block[j].remote_rkey = remote->block[i].remote_rkey;
> + break;
> + }
> + if (j >= local->num_blocks) {
> + return -1;
> + }
> + }
> +
> + return 0;
> +}
> +
> +/*
> + * Find the ram block that corresponds to the page requested to be
> + * transmitted by QEMU.
> + *
> + * Once the block is found, also identify which 'chunk' within that
> + * block that the page belongs to.
> + *
> + * This search cannot fail or the migration will fail.
> + */
> +static int qemu_rdma_search_ram_block(uint64_t offset, uint64_t length,
> + RDMALocalBlocks *blocks, int *block_index, int *chunk_index)
> +{
> + int i;
> + uint8_t *host_addr;
> +
> + for (i = 0; i < blocks->num_blocks; i++) {
> + if (offset < blocks->block[i].offset) {
> + continue;
> + }
> + if (offset + length >
> + blocks->block[i].offset + blocks->block[i].length) {
> + continue;
> + }
> +
> + *block_index = i;
> + host_addr = blocks->block[i].local_host_addr +
> + (offset - blocks->block[i].offset);
> + *chunk_index = ram_chunk_index(blocks->block[i].local_host_addr,
> host_addr);
> + return 0;
> + }
> + return -1;
> +}
> +
> +/*
> + * Register a chunk with IB. If the chunk was already registered
> + * previously, then skip.
> + *
> + * Also return the keys associated with the registration needed
> + * to perform the actual RDMA operation.
> + */
> +static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
> + RDMALocalBlock *block, uint8_t * host_addr,
> + uint32_t *lkey, uint32_t *rkey)
> +{
> + int chunk;
> + if (block->mr) {
> + if (lkey) {
> + *lkey = block->mr->lkey;
> + }
> + if (rkey) {
> + *rkey = block->mr->rkey;
> + }
> + return 0;
> + }
> +
> + /* allocate memory to store chunk MRs */
> + if (!block->pmr) {
> + int num_chunks = ram_chunk_count(block);
> + block->pmr = g_malloc0(num_chunks *
> + sizeof(struct ibv_mr *));
> + if (!block->pmr) {
> + return -1;
> + }
> + }
> +
> + /*
> + * If 'rkey', then we're the destination, so grant access to the source.
> + *
> + * If 'lkey', then we're the primary VM, so grant access only to
> ourselves.
> + */
> + chunk = ram_chunk_index(block->local_host_addr, host_addr);
> + if (!block->pmr[chunk]) {
> + uint8_t *start_addr = ram_chunk_start(block, chunk);
> + uint8_t *end_addr = ram_chunk_end(block, chunk);
> + if (start_addr < block->local_host_addr) {
> + start_addr = block->local_host_addr;
> + }
> + if (end_addr > (block->local_host_addr + block->length)) {
> + end_addr = block->local_host_addr + block->length;
> + }
Drop these ifs (the first is useless, the second will be done in
ram_chunk_end).
> + DDPRINTF("Registering chunk\n");
> + block->pmr[chunk] = ibv_reg_mr(rdma->pd,
> + start_addr,
> + end_addr - start_addr,
> + (rkey ? (IBV_ACCESS_LOCAL_WRITE |
> + IBV_ACCESS_REMOTE_WRITE) : 0)
> + | IBV_ACCESS_REMOTE_READ);
> + if (!block->pmr[chunk]) {
> + fprintf(stderr, "Failed to register chunk!\n");
> + return -1;
> + }
> + DDPRINTF("Finished registering chunk\n");
> + }
> +
> + if (lkey) {
> + *lkey = block->pmr[chunk]->lkey;
> + }
> + if (rkey) {
> + *rkey = block->pmr[chunk]->rkey;
> + }
> + return 0;
> +}
> +
> +/*
> + * Register (at connection time) the memory used for control
> + * channel messages.
> + */
> +static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
> +{
> + DDPRINTF("Registering control\n");
> + rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
> + rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
> + IBV_ACCESS_LOCAL_WRITE |
> + IBV_ACCESS_REMOTE_WRITE |
> + IBV_ACCESS_REMOTE_READ);
> + if (rdma->wr_data[idx].control_mr) {
> + DDPRINTF("Finished registering control\n");
> + return 0;
> + }
> + fprintf(stderr, "qemu_rdma_reg_control failed!\n");
> + return -1;
> +}
> +
> +static int qemu_rdma_dereg_control(RDMAContext *rdma, int idx)
> +{
> + return ibv_dereg_mr(rdma->wr_data[idx].control_mr);
> +}
> +
> +#if defined(DEBUG_RDMA) || defined(DEBUG_RDMA_VERBOSE)
> +static const char *print_wrid(int wrid)
> +{
> + if (wrid >= RDMA_WRID_RECV_CONTROL) {
> + return wrid_desc[RDMA_WRID_RECV_CONTROL];
> + }
> + return wrid_desc[wrid];
> +}
> +#endif
> +
> +/*
> + * Consult the connection manager to see a work request
> + * (of any kind) has completed.
> + * Return the work request ID that completed.
> + */
> +static int qemu_rdma_poll(RDMAContext *rdma)
> +{
> + int ret;
> + struct ibv_wc wc;
> +
> + ret = ibv_poll_cq(rdma->cq, 1, &wc);
> + if (!ret) {
> + return RDMA_WRID_NONE;
> + }
> + if (ret < 0) {
> + fprintf(stderr, "ibv_poll_cq return %d!\n", ret);
> + return ret;
> + }
> + if (wc.status != IBV_WC_SUCCESS) {
> + fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
> + wc.status, ibv_wc_status_str(wc.status));
> + fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wc.wr_id]);
> +
> + return -1;
> + }
> +
> + if (rdma->control_ready_expected &&
> + (wc.wr_id >= RDMA_WRID_RECV_CONTROL)) {
> + DDPRINTF("completion %s #%" PRId64 " received (%" PRId64 ")\n",
> + wrid_desc[RDMA_WRID_RECV_CONTROL], wc.wr_id -
> + RDMA_WRID_RECV_CONTROL, wc.wr_id);
> + rdma->control_ready_expected = 0;
> + }
> +
> + if (wc.wr_id == RDMA_WRID_RDMA_WRITE) {
> + rdma->num_signaled_send--;
> + DDPRINTF("completions %s (%" PRId64 ") left %d\n",
> + print_wrid(wc.wr_id), wc.wr_id, rdma->num_signaled_send);
> + } else {
> + DDPRINTF("other completion %s (%" PRId64 ") received left %d\n",
> + print_wrid(wc.wr_id), wc.wr_id, rdma->num_signaled_send);
> + }
> +
> + return (int)wc.wr_id;
> +}
> +
> +/*
> + * Block until the next work request has completed.
> + *
> + * First poll to see if a work request has already completed,
> + * otherwise block.
> + *
> + * If we encounter completed work requests for IDs other than
> + * the one we're interested in, then that's generally an error.
> + *
> + * The only exception is actual RDMA Write completions. These
> + * completions only need to be recorded, but do not actually
> + * need further processing.
> + */
> +static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid)
> +{
> + int num_cq_events = 0;
> + int r = RDMA_WRID_NONE;
> + struct ibv_cq *cq;
> + void *cq_ctx;
> +
> + if (ibv_req_notify_cq(rdma->cq, 0)) {
> + return -1;
> + }
> + /* poll cq first */
> + while (r != wrid) {
> + r = qemu_rdma_poll(rdma);
> + if (r < 0) {
> + return r;
> + }
> + if (r == RDMA_WRID_NONE) {
> + break;
> + }
> + if (r != wrid) {
> + DDPRINTF("A Wanted wrid %s (%d) but got %s (%d)\n",
> + print_wrid(wrid), wrid, print_wrid(r), r);
> + }
> + }
> + if (r == wrid) {
> + return 0;
> + }
> +
> + while (1) {
> + struct ibv_qp_attr attr;
> + struct ibv_qp_init_attr init;
> + int ret = 0;
> + struct pollfd mypoll = {
> + .fd = rdma->comp_channel->fd,
> + .events = POLLIN,
> + .revents = 0,
> + };
> + while (1) {
> + ret = poll(&mypoll, 1, 10000);
Please do something like this:
do {
if (qemu_in_coroutine()) {
ret = poll(&mypoll, 1, 0);
if (ret == 0) {
yield_until_fd_readable(rdma->comp_channel->fd);
}
} else {
ret = poll(&mypoll, 1, -1);
}
if (ret < 0 || (pfd.revents & (POLLHUP|POLLERR))) {
return ret;
}
} while (ret == 0 && pfd.revents == 0);
if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
...
}
> + if (ret == 0) {
> + ret = ibv_query_qp(rdma->qp, &attr, IBV_QP_STATE, &init);
> + if (ret < 0)
> + return ret;
> + DDPRINTF("Still waiting for data for wrid %s (%d)"
> + ", QP state: %d\n",
> + print_wrid(wrid), r, attr.qp_state);
> + continue;
> + }
> +
> + if (ret < 0) {
> + return ret;
> + }
> +
> + break;
> + }
> +
> + if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
> + goto err_block_for_wrid;
> + }
> +
> + num_cq_events++;
> +
> + if (ibv_req_notify_cq(cq, 0)) {
> + goto err_block_for_wrid;
> + }
> + /* poll cq */
> + while (r != wrid) {
> + r = qemu_rdma_poll(rdma);
> + if (r < 0) {
> + goto err_block_for_wrid;
> + }
> + if (r == RDMA_WRID_NONE) {
> + break;
> + }
> + if (r != wrid) {
> + DDPRINTF("B Wanted wrid %s (%d) but got %s (%d)\n",
> + print_wrid(wrid), wrid, print_wrid(r), r);
> + }
> + }
> + if (r == wrid) {
> + goto success_block_for_wrid;
> + }
> + }
> +
> +success_block_for_wrid:
> + if (num_cq_events) {
> + ibv_ack_cq_events(cq, num_cq_events);
> + }
> + return 0;
> +
> +err_block_for_wrid:
> + if (num_cq_events) {
> + ibv_ack_cq_events(cq, num_cq_events);
> + }
> + return -1;
> +}
> +
> +/*
> + * Post a SEND message work request for the control channel
> + * containing some data and block until the post completes.
> + */
> +static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
> + RDMAControlHeader *head)
> +{
> + int ret = 0;
> + RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_CONTROL_MAX_WR];
> + struct ibv_send_wr *bad_wr;
> + struct ibv_sge sge = {
> + .addr = (uint64_t)(wr->control),
> + .length = head->len + sizeof(RDMAControlHeader),
> + .lkey = wr->control_mr->lkey,
> + };
> + struct ibv_send_wr send_wr = {
> + .wr_id = RDMA_WRID_SEND_CONTROL,
> + .opcode = IBV_WR_SEND,
> + .send_flags = IBV_SEND_SIGNALED,
> + .sg_list = &sge,
> + .num_sge = 1,
> + };
> +
> + DPRINTF("CONTROL: sending %s..\n", control_desc[head->type]);
> +
> + /*
> + * We don't actually need to do a memcpy() in here if we used
> + * the "sge" properly, but since we're only sending control messages
> + * (not RAM in a performance-critical path), then its OK for now.
> + *
> + * The copy makes the RDMAControlHeader simpler to manipulate
> + * for the time being.
> + */
> + memcpy(wr->control, head, sizeof(RDMAControlHeader));
> + control_to_network((void *) wr->control);
> +
> + if (buf) {
> + memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
> + }
> +
> +
> + if (ibv_post_send(rdma->qp, &send_wr, &bad_wr)) {
> + return -1;
> + }
> +
> + if (ret < 0) {
> + fprintf(stderr, "Failed to use post IB SEND for control!\n");
> + return ret;
> + }
> +
> + ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL);
> + if (ret < 0) {
> + fprintf(stderr, "rdma migration: polling control error!");
> + }
> +
> + return ret;
> +}
> +
> +/*
> + * Post a RECV work request in anticipation of some future receipt
> + * of data on the control channel.
> + */
> +static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
> +{
> + struct ibv_recv_wr *bad_wr;
> + struct ibv_sge sge = {
> + .addr = (uint64_t)(rdma->wr_data[idx].control),
> + .length = RDMA_CONTROL_MAX_BUFFER,
> + .lkey = rdma->wr_data[idx].control_mr->lkey,
> + };
> +
> + struct ibv_recv_wr recv_wr = {
> + .wr_id = RDMA_WRID_RECV_CONTROL + idx,
> + .sg_list = &sge,
> + .num_sge = 1,
> + };
> +
> +
> + if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
> + return -1;
> + }
> +
> + return 0;
> +}
> +
> +/*
> + * Block and wait for a RECV control channel message to arrive.
> + */
> +static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
> + RDMAControlHeader *head, int expecting, int idx)
> +{
> + int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx);
> +
> + if (ret < 0) {
> + fprintf(stderr, "rdma migration: polling control error!\n");
> + return ret;
> + }
> +
> + network_to_control((void *) rdma->wr_data[idx].control);
> + memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
> +
> + DPRINTF("CONTROL: %s received\n", control_desc[expecting]);
> +
> + if ((expecting != RDMA_CONTROL_NONE && head->type != expecting)
> + || head->type == RDMA_CONTROL_ERROR) {
> + fprintf(stderr, "Was expecting a %s (%d) control message"
> + ", but got: %s (%d), length: %d\n",
> + control_desc[expecting], expecting,
> + control_desc[head->type], head->type, head->len);
> + return -EIO;
> + }
> +
> + return 0;
> +}
> +
> +/*
> + * When a RECV work request has completed, the work request's
> + * buffer is pointed at the header.
> + *
> + * This will advance the pointer to the data portion
> + * of the control message of the work request's buffer that
> + * was populated after the work request finished.
> + */
> +static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
> + RDMAControlHeader *head)
> +{
> + rdma->wr_data[idx].control_len = head->len;
> + rdma->wr_data[idx].control_curr =
> + rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
> +}
> +
> +/*
> + * This is an 'atomic' high-level operation to deliver a single, unified
> + * control-channel message.
> + *
> + * Additionally, if the user is expecting some kind of reply to this message,
> + * they can request a 'resp' response message be filled in by posting an
> + * additional work request on behalf of the user and waiting for an
> additional
> + * completion.
> + *
> + * The extra (optional) response is used during registration to us from
> having
> + * to perform an *additional* exchange of message just to provide a response
> by
> + * instead piggy-backing on the acknowledgement.
> + */
> +static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader
> *head,
> + uint8_t *data, RDMAControlHeader *resp,
> + int *resp_idx)
> +{
> + int ret = 0;
> + int idx = 0;
> +
> + /*
> + * Wait until the dest is ready before attempting to deliver the message
> + * by waiting for a READY message.
> + */
> + if (rdma->control_ready_expected) {
> + RDMAControlHeader resp;
> + ret = qemu_rdma_exchange_get_response(rdma,
> + &resp, RDMA_CONTROL_READY, idx);
> + if (ret < 0) {
> + return ret;
> + }
> + }
> +
> + /*
> + * If the user is expecting a response, post a WR in anticipation of it.
> + */
> + if (resp) {
> + ret = qemu_rdma_post_recv_control(rdma, idx + 1);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error posting"
> + " extra control recv for anticipated result!");
> + return ret;
> + }
> + }
> +
> + /*
> + * Post a WR to replace the one we just consumed for the READY message.
> + */
> + ret = qemu_rdma_post_recv_control(rdma, idx);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error posting first control recv!");
> + return ret;
> + }
> +
> + /*
> + * Deliver the control message that was requested.
> + */
> + ret = qemu_rdma_post_send_control(rdma, data, head);
> +
> + if (ret < 0) {
> + fprintf(stderr, "Failed to send control buffer!\n");
> + return ret;
> + }
> +
> + /*
> + * If we're expecting a response, block and wait for it.
> + */
> + if (resp) {
> + DPRINTF("Waiting for response %s\n", control_desc[resp->type]);
> + ret = qemu_rdma_exchange_get_response(rdma, resp, resp->type, idx +
> 1);
> +
> + if (ret < 0) {
> + return ret;
> + }
> +
> + qemu_rdma_move_header(rdma, idx + 1, resp);
> + *resp_idx = idx + 1;
> + DPRINTF("Response %s received.\n", control_desc[resp->type]);
> + }
> +
> + rdma->control_ready_expected = 1;
> +
> + return 0;
> +}
> +
> +/*
> + * This is an 'atomic' high-level operation to receive a single, unified
> + * control-channel message.
> + */
> +static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader
> *head,
> + int expecting)
> +{
> + RDMAControlHeader ready = {
> + .len = 0,
> + .type = RDMA_CONTROL_READY,
> + .repeat = 1,
> + };
> + int ret;
> + int idx = 0;
> +
> + /*
> + * Inform the source that we're ready to receive a message.
> + */
> + ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
> +
> + if (ret < 0) {
> + fprintf(stderr, "Failed to send control buffer!\n");
> + return ret;
> + }
> +
> + /*
> + * Block and wait for the message.
> + */
> + ret = qemu_rdma_exchange_get_response(rdma, head, expecting, idx);
> +
> + if (ret < 0) {
> + return ret;
> + }
> +
> + qemu_rdma_move_header(rdma, idx, head);
> +
> + /*
> + * Post a new RECV work request to replace the one we just consumed.
> + */
> + ret = qemu_rdma_post_recv_control(rdma, idx);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error posting second control
> recv!");
> + return ret;
> + }
> +
> + return 0;
> +}
> +
> +/*
> + * Write an actual chunk of memory using RDMA.
> + *
> + * If we're using dynamic registration on the dest-side, we have to
> + * send a registration command first.
> + */
> +static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
> + int current_index,
> + uint64_t offset, uint64_t length,
> + uint64_t wr_id, enum ibv_send_flags flag)
> +{
> + struct ibv_sge sge;
> + struct ibv_send_wr send_wr = { 0 };
> + struct ibv_send_wr *bad_wr;
> + RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
> + int chunk;
> + RDMARegister reg;
> + RDMARegisterResult *reg_result;
> + int reg_result_idx;
> + RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
> + RDMAControlHeader head = { .len = sizeof(RDMARegister),
> + .type = RDMA_CONTROL_REGISTER_REQUEST,
> + .repeat = 1,
> + };
> + int ret;
> +
> + sge.addr = (uint64_t)(block->local_host_addr + (offset - block->offset));
> + sge.length = length;
> +
> + if (rdma->chunk_register_destination) {
> + chunk = ram_chunk_index(block->local_host_addr, (uint8_t *)
> sge.addr);
> + if (!block->remote_keys[chunk]) {
> + /*
> + * This page has not yet been registered, so first check to see
> + * if the entire chunk is zero. If so, tell the other size to
> + * memset() + madvise() the entire chunk without RDMA.
> + */
> + if (can_use_buffer_find_nonzero_offset((void *)sge.addr, length)
> + && buffer_find_nonzero_offset((void *)sge.addr,
> + length) == length) {
> + RDMACompress comp = {
> + .offset = offset,
> + .value = 0,
> + .block_idx = current_index,
> + .length = length,
> + };
> +
> + head.len = sizeof(comp);
> + head.type = RDMA_CONTROL_COMPRESS;
> +
> + DPRINTF("Entire chunk is zero, sending compress: %d for %d "
> + "bytes, index: %d, offset: %" PRId64 "...\n",
> + chunk, sge.length, current_index, offset);
> +
> + ret = qemu_rdma_exchange_send(rdma, &head,
> + (uint8_t *) &comp, NULL, NULL);
> +
> + if (ret < 0) {
> + return -EIO;
> + }
> +
> + return 1;
> + }
> +
> + /*
> + * Otherwise, tell other side to register.
> + */
> + reg.len = sge.length;
> + reg.current_index = current_index;
> + reg.offset = offset;
> +
> + DPRINTF("Sending registration request chunk %d for %d "
> + "bytes, index: %d, offset: %" PRId64 "...\n",
> + chunk, sge.length, current_index, offset);
> +
> + ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®,
> + &resp, ®_result_idx);
> + if (ret < 0) {
> + return ret;
> + }
> +
> + reg_result = (RDMARegisterResult *)
> + rdma->wr_data[reg_result_idx].control_curr;
> +
> + DPRINTF("Received registration result:"
> + " my key: %x their key %x, chunk %d\n",
> + block->remote_keys[chunk], reg_result->rkey, chunk);
> +
> + block->remote_keys[chunk] = reg_result->rkey;
> + }
> +
> + send_wr.wr.rdma.rkey = block->remote_keys[chunk];
> + } else {
> + send_wr.wr.rdma.rkey = block->remote_rkey;
> + }
> +
> + if (qemu_rdma_register_and_get_keys(rdma, block, (uint8_t *)sge.addr,
> + &sge.lkey, NULL)) {
> + fprintf(stderr, "cannot get lkey!\n");
> + return -EINVAL;
> + }
> +
> + send_wr.wr_id = wr_id;
> + send_wr.opcode = IBV_WR_RDMA_WRITE;
> + send_wr.send_flags = flag;
> + send_wr.sg_list = &sge;
> + send_wr.num_sge = 1;
> + send_wr.wr.rdma.remote_addr = block->remote_host_addr +
> + (offset - block->offset);
> +
> + return ibv_post_send(rdma->qp, &send_wr, &bad_wr);
> +}
> +
> +/*
> + * Push out any unwritten RDMA operations.
> + *
> + * We support sending out multiple chunks at the same time.
> + * Not all of them need to get signaled in the completion queue.
> + */
> +static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
> +{
> + int ret;
> + enum ibv_send_flags flags = 0;
> +
> + if (!rdma->current_length) {
> + return 0;
> + }
> + if (rdma->num_unsignaled_send >=
> + RDMA_UNSIGNALED_SEND_MAX) {
> + flags = IBV_SEND_SIGNALED;
> + }
> +
> +retry:
> + ret = qemu_rdma_write_one(f, rdma,
> + rdma->current_index,
> + rdma->current_offset,
> + rdma->current_length,
> + RDMA_WRID_RDMA_WRITE, flags);
> +
> + if (ret < 0) {
> + if (ret == -ENOMEM) {
> + DPRINTF("send queue is full. wait a little....\n");
> + ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
> + if (ret >= 0) {
> + goto retry;
> + }
> + if (ret < 0) {
> + fprintf(stderr, "rdma migration: failed to make "
> + "room in full send queue! %d\n", ret);
> + return ret;
> + }
> + }
> + perror("write flush error");
> + return ret;
> + }
> +
> + if (ret == 0) {
> + if (rdma->num_unsignaled_send >=
> + RDMA_UNSIGNALED_SEND_MAX) {
> + rdma->num_unsignaled_send = 0;
> + rdma->num_signaled_send++;
> + DPRINTF("signaled total: %d\n", rdma->num_signaled_send);
> + } else {
> + rdma->num_unsignaled_send++;
> + }
> + }
> +
> + rdma->current_length = 0;
> + rdma->current_offset = 0;
> +
> + return 0;
> +}
> +
> +static inline int qemu_rdma_in_current_block(RDMAContext *rdma,
> + uint64_t offset, uint64_t len)
> +{
> + RDMALocalBlock *block =
> + &(rdma->local_ram_blocks.block[rdma->current_index]);
> + if (rdma->current_index < 0) {
> + return 0;
> + }
> + if (offset < block->offset) {
> + return 0;
> + }
> + if (offset + len > block->offset + block->length) {
> + return 0;
> + }
> + return 1;
> +}
> +
> +static inline int qemu_rdma_in_current_chunk(RDMAContext *rdma,
> + uint64_t offset, uint64_t len)
> +{
> + RDMALocalBlock *block =
> &(rdma->local_ram_blocks.block[rdma->current_index]);
> + uint8_t *chunk_start, *chunk_end, *host_addr;
> + if (rdma->current_chunk < 0) {
> + return 0;
> + }
> + host_addr = block->local_host_addr + (offset - block->offset);
> + chunk_start = ram_chunk_start(block, rdma->current_chunk);
> + if (chunk_start < block->local_host_addr) {
> + chunk_start = block->local_host_addr;
> + }
Drop this if (useless)...
> + if (host_addr < chunk_start) {
> + return 0;
> + }
> +
> + chunk_end = ram_chunk_end(block, rdma->current_chunk);
> +
> + if (chunk_end > chunk_start + block->length) {
> + chunk_end = chunk_start + block->length;
> + }
... and this one (wrong, should not use chunk_start, and the right one
will be done in ram_chunk_end).
> + if (host_addr + len > chunk_end) {
> + return 0;
> + }
> + return 1;
> +}
> +
> +static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
> + uint64_t offset, uint64_t len)
> +{
> + if (rdma->current_length == 0) {
> + return 0;
> + }
> + if (offset != rdma->current_offset + rdma->current_length) {
> + return 0;
> + }
> + if (!qemu_rdma_in_current_block(rdma, offset, len)) {
> + return 0;
> + }
> +#ifdef RDMA_CHUNK_REGISTRATION
> + if (!qemu_rdma_in_current_chunk(rdma, offset, len)) {
> + return 0;
> + }
> +#endif
> + return 1;
> +}
> +
> +/*
> + * We're not actually writing here, but doing three things:
> + *
> + * 1. Identify the chunk the buffer belongs to.
> + * 2. If the chunk is full or the buffer doesn't belong to the current
> + * chunk, then start a new chunk and flush() the old chunk.
> + * 3. To keep the hardware busy, we also group chunks into batches
> + * and only require that a batch gets acknowledged in the completion
> + * qeueue instead of each individual chunk.
> + */
> +static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
> + uint64_t offset, uint64_t len)
> +{
> + int index = rdma->current_index;
> + int chunk_index = rdma->current_chunk;
> + int ret;
> +
> + /* If we cannot merge it, we flush the current buffer first. */
> + if (!qemu_rdma_buffer_mergable(rdma, offset, len)) {
> + ret = qemu_rdma_write_flush(f, rdma);
> + if (ret) {
> + return ret;
> + }
> + rdma->current_length = 0;
> + rdma->current_offset = offset;
> +
> + ret = qemu_rdma_search_ram_block(offset, len,
> + &rdma->local_ram_blocks, &index, &chunk_index);
> + if (ret) {
> + fprintf(stderr, "ram block search failed\n");
> + return ret;
> + }
> + rdma->current_index = index;
> + rdma->current_chunk = chunk_index;
> + }
> +
> + /* merge it */
> + rdma->current_length += len;
> +
> + /* flush it if buffer is too large */
> + if (rdma->current_length >= RDMA_MERGE_MAX) {
> + return qemu_rdma_write_flush(f, rdma);
> + }
> +
> + return 0;
> +}
> +
> +static void qemu_rdma_cleanup(RDMAContext *rdma)
> +{
> + struct rdma_cm_event *cm_event;
> + int ret, idx;
> +
> + if (rdma->cm_id) {
> + if(rdma->error_state) {
> + RDMAControlHeader head = { .len = 0,
> + .type = RDMA_CONTROL_ERROR,
> + .repeat = 1,
> + };
> + fprintf(stderr, "Early error. Sending error.\n");
> + qemu_rdma_post_send_control(rdma, NULL, &head);
> + }
> +
> + ret = rdma_disconnect(rdma->cm_id);
> + if (!ret) {
> + printf("waiting for disconnect\n");
> + ret = rdma_get_cm_event(rdma->channel, &cm_event);
> + if (!ret) {
> + rdma_ack_cm_event(cm_event);
> + }
> + }
> + printf("Disconnected.\n");
> + rdma->cm_id = 0;
> + }
> +
> + if (rdma->remote_ram_blocks.remote_area) {
> + g_free(rdma->remote_ram_blocks.remote_area);
> + rdma->remote_ram_blocks.remote_area = NULL;
> + }
> +
> + for (idx = 0; idx < (RDMA_CONTROL_MAX_WR + 1); idx++) {
> + if (rdma->wr_data[idx].control_mr) {
> + qemu_rdma_dereg_control(rdma, idx);
> + }
> + rdma->wr_data[idx].control_mr = NULL;
> + }
> +
> + if (rdma->local_ram_blocks.block) {
> + qemu_rdma_dereg_ram_blocks(&rdma->local_ram_blocks);
> +
> + if (rdma->chunk_register_destination) {
> + for (idx = 0; idx < rdma->local_ram_blocks.num_blocks; idx++) {
> + RDMALocalBlock *block = &(rdma->local_ram_blocks.block[idx]);
> + if (block->remote_keys) {
> + g_free(block->remote_keys);
> + block->remote_keys = NULL;
> + }
> + }
> + }
> + g_free(rdma->local_ram_blocks.block);
> + rdma->local_ram_blocks.block = NULL;
> + }
> +
> + if (rdma->qp) {
> + ibv_destroy_qp(rdma->qp);
> + rdma->qp = NULL;
> + }
> + if (rdma->cq) {
> + ibv_destroy_cq(rdma->cq);
> + rdma->cq = NULL;
> + }
> + if (rdma->comp_channel) {
> + ibv_destroy_comp_channel(rdma->comp_channel);
> + rdma->comp_channel = NULL;
> + }
> + if (rdma->pd) {
> + ibv_dealloc_pd(rdma->pd);
> + rdma->pd = NULL;
> + }
> + if (rdma->listen_id) {
> + rdma_destroy_id(rdma->listen_id);
> + rdma->listen_id = 0;
> + }
> + if (rdma->cm_id) {
> + rdma_destroy_id(rdma->cm_id);
> + rdma->cm_id = 0;
> + }
> + if (rdma->channel) {
> + rdma_destroy_event_channel(rdma->channel);
> + rdma->channel = NULL;
> + }
> +}
> +
> +static void qemu_rdma_remote_ram_blocks_init(RDMAContext *rdma)
> +{
> + int remote_size = (sizeof(RDMARemoteBlock) *
> + rdma->local_ram_blocks.num_blocks)
> + + sizeof(*rdma->remote_ram_blocks.num_blocks);
> +
> + DPRINTF("Preparing %d bytes for remote info\n", remote_size);
> +
> + rdma->remote_ram_blocks.remote_area = g_malloc0(remote_size);
> + rdma->remote_ram_blocks.remote_size = remote_size;
> + rdma->remote_ram_blocks.num_blocks = rdma->remote_ram_blocks.remote_area;
> + rdma->remote_ram_blocks.block = (RDMARemoteBlock *)
> (rdma->remote_ram_blocks.num_blocks + 1);
> +}
> +
> +static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp,
> + bool chunk_register_destination)
> +{
> + int ret, idx;
> +
> + /*
> + * Will be validated against destination's actual capabilities
> + * after the connect() completes.
> + */
> + rdma->chunk_register_destination = chunk_register_destination;
> +
> + ret = qemu_rdma_resolve_host(rdma);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error resolving host!\n");
> + goto err_rdma_source_init;
> + }
> +
> + ret = qemu_rdma_alloc_pd_cq(rdma);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error allocating pd and cq!\n");
> + goto err_rdma_source_init;
> + }
> +
> + ret = qemu_rdma_alloc_qp(rdma);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error allocating qp!\n");
> + goto err_rdma_source_init;
> + }
> +
> + ret = qemu_rdma_init_ram_blocks(&rdma->local_ram_blocks);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error initializing ram blocks!\n");
> + goto err_rdma_source_init;
> + }
> +
> + ret = qemu_rdma_source_reg_ram_blocks(rdma, &rdma->local_ram_blocks);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error source registering ram
> blocks!\n");
> + goto err_rdma_source_init;
> + }
> +
> + for (idx = 0; idx < (RDMA_CONTROL_MAX_WR + 1); idx++) {
> + ret = qemu_rdma_reg_control(rdma, idx);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error registering %d
> control!\n",
> + idx);
> + goto err_rdma_source_init;
> + }
> + }
> +
> + qemu_rdma_remote_ram_blocks_init(rdma);
> + return 0;
> +
> +err_rdma_source_init:
> + qemu_rdma_cleanup(rdma);
> + return -1;
> +}
> +
> +static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
> +{
> + RDMAControlHeader head;
> + RDMACapabilities cap = {
> + .version = RDMA_CONTROL_VERSION_CURRENT,
> + .flags = 0,
> + };
> + struct rdma_conn_param conn_param = { .initiator_depth = 2,
> + .retry_count = 5,
> + .private_data = &cap,
> + .private_data_len = sizeof(cap),
> + };
> + struct rdma_cm_event *cm_event;
> + int ret;
> + int idx = 0;
> + int x;
> +
> + if (rdma->chunk_register_destination) {
> + printf("Server dynamic registration requested.\n");
> + cap.flags |= RDMA_CAPABILITY_CHUNK_REGISTER;
> + }
> +
> + caps_to_network(&cap);
> +
> + ret = rdma_connect(rdma->cm_id, &conn_param);
> + if (ret) {
> + perror("rdma_connect");
> + fprintf(stderr, "rdma migration: error connecting!\n");
> + rdma_destroy_id(rdma->cm_id);
> + rdma->cm_id = 0;
> + goto err_rdma_source_connect;
> + }
> +
> + ret = rdma_get_cm_event(rdma->channel, &cm_event);
> + if (ret) {
> + perror("rdma_get_cm_event after rdma_connect");
> + fprintf(stderr, "rdma migration: error connecting!\n");
> + rdma_ack_cm_event(cm_event);
> + rdma_destroy_id(rdma->cm_id);
> + rdma->cm_id = 0;
> + goto err_rdma_source_connect;
> + }
> +
> + if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
> + perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
> + fprintf(stderr, "rdma migration: error connecting!\n");
> + rdma_ack_cm_event(cm_event);
> + rdma_destroy_id(rdma->cm_id);
> + rdma->cm_id = 0;
> + goto err_rdma_source_connect;
> + }
> +
> + memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
> + network_to_caps(&cap);
> +
> + /*
> + * Verify that the destination can support the capabilities we requested.
> + */
> + if (!(cap.flags & RDMA_CAPABILITY_CHUNK_REGISTER) &&
> + rdma->chunk_register_destination) {
> + printf("Server cannot support dynamic registration. Will disable\n");
> + rdma->chunk_register_destination = false;
> + }
> +
> + printf("Chunk registration %s\n",
> + rdma->chunk_register_destination ? "enabled" : "disabled");
> +
> + rdma_ack_cm_event(cm_event);
> +
> + ret = qemu_rdma_post_recv_control(rdma, idx + 1);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error posting second control
> recv!\n");
> + goto err_rdma_source_connect;
> + }
> +
> + ret = qemu_rdma_post_recv_control(rdma, idx);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error posting second control
> recv!\n");
> + goto err_rdma_source_connect;
> + }
> +
> + ret = qemu_rdma_exchange_get_response(rdma,
> + &head, RDMA_CONTROL_RAM_BLOCKS, idx + 1);
> +
> + if (ret < 0) {
> + fprintf(stderr, "rdma migration: error sending remote info!\n");
> + goto err_rdma_source_connect;
> + }
> +
> + qemu_rdma_move_header(rdma, idx + 1, &head);
> + memcpy(rdma->remote_ram_blocks.remote_area, rdma->wr_data[idx +
> 1].control_curr,
> + rdma->remote_ram_blocks.remote_size);
> +
> + ret = qemu_rdma_process_remote_ram_blocks(
> + &rdma->local_ram_blocks,
> &rdma->remote_ram_blocks);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error processing"
> + " remote ram blocks!\n");
> + goto err_rdma_source_connect;
> + }
> +
> + if (rdma->chunk_register_destination) {
> + for (x = 0; x < rdma->local_ram_blocks.num_blocks; x++) {
> + RDMALocalBlock *block = &(rdma->local_ram_blocks.block[x]);
> + int num_chunks = ram_chunk_count(block);
> + /* allocate memory to store remote rkeys */
> + block->remote_keys = g_malloc0(num_chunks * sizeof(uint32_t));
> + }
> + }
> + rdma->control_ready_expected = 1;
> + rdma->num_signaled_send = 0;
> + return 0;
> +
> +err_rdma_source_connect:
> + qemu_rdma_cleanup(rdma);
> + return -1;
> +}
> +
> +static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
> +{
> + int ret = -EINVAL, idx;
> + struct sockaddr_in sin;
> + struct rdma_cm_id *listen_id;
> + char ip[40] = "unknown";
> +
> + for (idx = 0; idx < RDMA_CONTROL_MAX_WR; idx++) {
> + rdma->wr_data[idx].control_len = 0;
> + rdma->wr_data[idx].control_curr = NULL;
> + }
> +
> + if (rdma->host == NULL) {
> + fprintf(stderr, "Error: RDMA host is not set!");
> + rdma->error_state = -EINVAL;
> + return -1;
> + }
> + /* create CM channel */
> + rdma->channel = rdma_create_event_channel();
> + if (!rdma->channel) {
> + fprintf(stderr, "Error: could not create rdma event channel");
> + rdma->error_state = -EINVAL;
> + return -1;
> + }
> +
> + /* create CM id */
> + ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
> + if (ret) {
> + fprintf(stderr, "Error: could not create cm_id!");
> + goto err_dest_init_create_listen_id;
> + }
> +
> + memset(&sin, 0, sizeof(sin));
> + sin.sin_family = AF_INET;
> + sin.sin_port = htons(rdma->port);
> +
> + if (rdma->host && strcmp("", rdma->host)) {
> + struct hostent *dest_addr;
> + dest_addr = gethostbyname(rdma->host);
> + if (!dest_addr) {
> + fprintf(stderr, "Error: migration could not gethostbyname!");
> + ret = -EINVAL;
> + goto err_dest_init_bind_addr;
> + }
> + memcpy(&sin.sin_addr.s_addr, dest_addr->h_addr,
> + dest_addr->h_length);
> + inet_ntop(AF_INET, dest_addr->h_addr, ip, sizeof ip);
> + } else {
> + sin.sin_addr.s_addr = INADDR_ANY;
> + }
> +
> + DPRINTF("%s => %s\n", rdma->host, ip);
> +
> + ret = rdma_bind_addr(listen_id, (struct sockaddr *)&sin);
> + if (ret) {
> + fprintf(stderr, "Error: could not rdma_bind_addr!");
> + goto err_dest_init_bind_addr;
> + }
> +
> + rdma->listen_id = listen_id;
> + if (listen_id->verbs) {
> + rdma->verbs = listen_id->verbs;
> + }
> + qemu_rdma_dump_id("dest_init", rdma->verbs);
> + qemu_rdma_dump_gid("dest_init", listen_id);
> + return 0;
> +
> +err_dest_init_bind_addr:
> + rdma_destroy_id(listen_id);
> +err_dest_init_create_listen_id:
> + rdma_destroy_event_channel(rdma->channel);
> + rdma->channel = NULL;
> + rdma->error_state = ret;
> + return ret;
> +
> +}
> +
> +static int qemu_rdma_dest_prepare(RDMAContext *rdma, Error **errp)
> +{
> + int ret;
> + int idx;
> +
> + if (!rdma->verbs) {
> + fprintf(stderr, "rdma migration: no verbs context!");
> + return 0;
> + }
> +
> + ret = qemu_rdma_alloc_pd_cq(rdma);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error allocating pd and cq!");
> + goto err_rdma_dest_prepare;
> + }
> +
> + ret = qemu_rdma_init_ram_blocks(&rdma->local_ram_blocks);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error initializing ram blocks!");
> + goto err_rdma_dest_prepare;
> + }
> +
> + qemu_rdma_remote_ram_blocks_init(rdma);
> +
> + /* Extra one for the send buffer */
> + for (idx = 0; idx < (RDMA_CONTROL_MAX_WR + 1); idx++) {
> + ret = qemu_rdma_reg_control(rdma, idx);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error registering %d control!",
> + idx);
> + goto err_rdma_dest_prepare;
> + }
> + }
> +
> + ret = rdma_listen(rdma->listen_id, 5);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error listening on socket!");
> + goto err_rdma_dest_prepare;
> + }
> +
> + return 0;
> +
> +err_rdma_dest_prepare:
> + qemu_rdma_cleanup(rdma);
> + return -1;
> +}
> +
> +static void *qemu_rdma_data_init(const char *host_port, Error **errp)
> +{
> + RDMAContext *rdma = NULL;
> + InetSocketAddress *addr;
> +
> + if (host_port) {
> + rdma = g_malloc0(sizeof(RDMAContext));
> + memset(rdma, 0, sizeof(RDMAContext));
> + rdma->current_index = -1;
> + rdma->current_chunk = -1;
> +
> + addr = inet_parse(host_port, errp);
> + if (addr != NULL) {
> + rdma->port = atoi(addr->port);
> + rdma->host = g_strdup(addr->host);
> + } else {
> + error_setg(errp, "bad RDMA migration address '%s'", host_port);
> + g_free(rdma);
> + return NULL;
> + }
> + }
> +
> + return rdma;
> +}
> +
> +/*
> + * QEMUFile interface to the control channel.
> + * SEND messages for control only.
> + * pc.ram is handled with regular RDMA messages.
> + */
> +static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
> + int64_t pos, int size)
> +{
> + QEMUFileRDMA *r = opaque;
> + QEMUFile *f = r->file;
> + RDMAContext *rdma = r->rdma;
> + size_t remaining = size;
> + uint8_t * data = (void *) buf;
> + int ret;
> +
> + CHECK_ERROR_STATE();
> +
> + /*
> + * Push out any writes that
> + * we're queued up for pc.ram.
> + */
> + if (qemu_rdma_write_flush(f, rdma) < 0) {
> + rdma->error_state = -EIO;
> + return rdma->error_state;
> + }
> +
> + while (remaining) {
> + RDMAControlHeader head;
> +
> + r->len = MIN(remaining, RDMA_SEND_INCREMENT);
> + remaining -= r->len;
> +
> + head.len = r->len;
> + head.type = RDMA_CONTROL_QEMU_FILE;
> +
> + ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL);
> +
> + if (ret < 0) {
> + rdma->error_state = ret;
> + return ret;
> + }
> +
> + data += r->len;
> + }
> +
> + return size;
> +}
> +
> +static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
> + int size, int idx)
> +{
> + size_t len = 0;
> +
> + if (rdma->wr_data[idx].control_len) {
> + DPRINTF("RDMA %" PRId64 " of %d bytes already in buffer\n",
> + rdma->wr_data[idx].control_len, size);
> +
> + len = MIN(size, rdma->wr_data[idx].control_len);
> + memcpy(buf, rdma->wr_data[idx].control_curr, len);
> + rdma->wr_data[idx].control_curr += len;
> + rdma->wr_data[idx].control_len -= len;
> + }
> +
> + return len;
> +}
> +
> +/*
> + * QEMUFile interface to the control channel.
> + * RDMA links don't use bytestreams, so we have to
> + * return bytes to QEMUFile opportunistically.
> + */
> +static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
> + int64_t pos, int size)
> +{
> + QEMUFileRDMA *r = opaque;
> + RDMAContext *rdma = r->rdma;
> + RDMAControlHeader head;
> + int ret = 0;
> +
> + CHECK_ERROR_STATE();
> +
> + /*
> + * First, we hold on to the last SEND message we
> + * were given and dish out the bytes until we run
> + * out of bytes.
> + */
> + r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
> + if (r->len) {
> + return r->len;
> + }
> +
> + /*
> + * Once we run out, we block and wait for another
> + * SEND message to arrive.
> + */
> + ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
> +
> + if (ret < 0) {
> + rdma->error_state = ret;
> + return ret;
> + }
> +
> + /*
> + * SEND was received with new bytes, now try again.
> + */
> + return qemu_rdma_fill(r->rdma, buf, size, 0);
> +}
> +
> +/*
> + * Block until all the outstanding chunks have been delivered by the
> hardware.
> + */
> +static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
> +{
> + int ret;
> +
> + if (qemu_rdma_write_flush(f, rdma) < 0) {
> + return -EIO;
> + }
> +
> + while (rdma->num_signaled_send) {
> + ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
> + if (ret < 0) {
> + fprintf(stderr, "rdma migration: complete polling error!\n");
> + return -EIO;
> + }
> + }
> +
> + return 0;
> +}
> +
> +static int qemu_rdma_close(void *opaque)
> +{
> + QEMUFileRDMA *r = opaque;
> + if (r->rdma) {
> + qemu_rdma_cleanup(r->rdma);
> + g_free(r->rdma);
> + }
> + g_free(r);
> + return 0;
> +}
> +
> +static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
> + ram_addr_t block_offset, ram_addr_t offset, size_t size)
> +{
> + ram_addr_t current_addr = block_offset + offset;
> + QEMUFileRDMA *rfile = opaque;
> + RDMAContext *rdma = rfile->rdma;
> + int ret;
> +
> + CHECK_ERROR_STATE();
> +
> + qemu_fflush(f);
> +
> + /*
> + * Add this page to the current 'chunk'. If the chunk
> + * is full, or the page doen't belong to the current chunk,
> + * an actual RDMA write will occur and a new chunk will be formed.
> + */
> + ret = qemu_rdma_write(f, rdma, current_addr, size);
> + if (ret < 0) {
> + rdma->error_state = ret;
> + fprintf(stderr, "rdma migration: write error! %d\n", ret);
> + return ret;
> + }
> +
> + /*
> + * Drain the Completion Queue if possible, but do not block,
> + * just poll.
> + *
> + * If nothing to poll, the end of the iteration will do this
> + * again to make sure we don't overflow the request queue.
> + */
> + while (1) {
> + int ret = qemu_rdma_poll(rdma);
> + if (ret == RDMA_WRID_NONE) {
> + break;
> + }
> + if (ret < 0) {
> + rdma->error_state = ret;
> + fprintf(stderr, "rdma migration: polling error! %d\n", ret);
> + return ret;
> + }
> + }
> +
> + return size;
> +}
> +
> +static int qemu_rdma_accept(RDMAContext *rdma)
> +{
> + RDMAControlHeader head = { .len = rdma->remote_ram_blocks.remote_size,
> + .type = RDMA_CONTROL_RAM_BLOCKS,
> + .repeat = 1,
> + };
> + RDMACapabilities cap;
> + uint32_t requested_flags;
> + struct rdma_conn_param conn_param = {
> + .responder_resources = 2,
> + .private_data = &cap,
> + .private_data_len = sizeof(cap),
> + };
> + struct rdma_cm_event *cm_event;
> + struct ibv_context *verbs;
> + int ret = -EINVAL;
> +
> + ret = rdma_get_cm_event(rdma->channel, &cm_event);
> + if (ret) {
> + goto err_rdma_dest_wait;
> + }
> +
> + if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
> + rdma_ack_cm_event(cm_event);
> + goto err_rdma_dest_wait;
> + }
> +
> + memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
> +
> + network_to_caps(&cap);
> +
> + if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
> + fprintf(stderr, "Unknown source RDMA version: %d, bailing...\n",
> + cap.version);
> + rdma_ack_cm_event(cm_event);
> + goto err_rdma_dest_wait;
> + }
> +
> + if (cap.version == RDMA_CONTROL_VERSION_CURRENT) {
> + if (cap.flags & RDMA_CAPABILITY_CHUNK_REGISTER) {
> + rdma->chunk_register_destination = true;
> + }
> + } else {
> + fprintf(stderr, "Unknown source RDMA version: %d, bailing...\n",
> + cap.version);
> + rdma_ack_cm_event(cm_event);
> + goto err_rdma_dest_wait;
> + }
> +
> + rdma->cm_id = cm_event->id;
> + verbs = cm_event->id->verbs;
> +
> + rdma_ack_cm_event(cm_event);
> +
> + /*
> + * Respond to source with the capabilities we agreed to support.
> + */
> + requested_flags = cap.flags;
> + cap.flags = 0;
> +
> + if (rdma->chunk_register_destination &&
> + (requested_flags & RDMA_CAPABILITY_CHUNK_REGISTER)) {
> + cap.flags |= RDMA_CAPABILITY_CHUNK_REGISTER;
> + }
> +
> + DPRINTF("Chunk registration %s\n",
> + rdma->chunk_register_destination ? "enabled" : "disabled");
> +
> + caps_to_network(&cap);
> +
> + DPRINTF("verbs context after listen: %p\n", verbs);
> +
> + if (!rdma->verbs) {
> + rdma->verbs = verbs;
> + ret = qemu_rdma_dest_prepare(rdma, NULL);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error preparing dest!\n");
> + goto err_rdma_dest_wait;
> + }
> + } else if (rdma->verbs != verbs) {
> + fprintf(stderr, "ibv context not matching %p, %p!\n",
> + rdma->verbs, verbs);
> + goto err_rdma_dest_wait;
> + }
> +
> + /* xxx destroy listen_id ??? */
> +
> + qemu_set_fd_handler2(rdma->channel->fd, NULL, NULL, NULL, NULL);
> +
> + ret = qemu_rdma_alloc_qp(rdma);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error allocating qp!\n");
> + goto err_rdma_dest_wait;
> + }
> +
> + ret = rdma_accept(rdma->cm_id, &conn_param);
> + if (ret) {
> + fprintf(stderr, "rdma_accept returns %d!\n", ret);
> + goto err_rdma_dest_wait;
> + }
> +
> + ret = rdma_get_cm_event(rdma->channel, &cm_event);
> + if (ret) {
> + fprintf(stderr, "rdma_accept get_cm_event failed %d!\n", ret);
> + goto err_rdma_dest_wait;
> + }
> +
> + if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
> + fprintf(stderr, "rdma_accept not event established!\n");
> + rdma_ack_cm_event(cm_event);
> + goto err_rdma_dest_wait;
> + }
> +
> + rdma_ack_cm_event(cm_event);
> +
> + ret = qemu_rdma_post_recv_control(rdma, 0);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error posting second control
> recv!\n");
> + goto err_rdma_dest_wait;
> + }
> +
> + if (!rdma->chunk_register_destination) {
> + ret = qemu_rdma_reg_whole_ram_blocks(rdma, &rdma->local_ram_blocks);
> + if (ret) {
> + fprintf(stderr, "rdma migration: error dest "
> + "registering ram blocks!\n");
> + goto err_rdma_dest_wait;
> + }
> + }
> +
> + qemu_rdma_copy_to_remote_ram_blocks(rdma,
> + &rdma->local_ram_blocks, &rdma->remote_ram_blocks);
> +
> + ret = qemu_rdma_post_send_control(rdma,
> + (uint8_t *) rdma->remote_ram_blocks.remote_area, &head);
> +
> + if (ret < 0) {
> + fprintf(stderr, "rdma migration: error sending remote info!\n");
> + goto err_rdma_dest_wait;
> + }
> +
> + qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
> +
> + return 0;
> +
> +err_rdma_dest_wait:
> + rdma->error_state = ret;
> + qemu_rdma_cleanup(rdma);
> + return ret;
> +}
> +
> +/*
> + * During each iteration of the migration, we listen for instructions
> + * by the primary VM to perform dynamic page registrations before they
> + * can perform RDMA operations.
> + *
> + * We respond with the 'rkey'.
> + *
> + * Keep doing this until the primary tells us to stop.
> + */
> +static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
> + uint64_t flags)
> +{
> + RDMAControlHeader resp = { .len = sizeof(RDMARegisterResult),
> + .type = RDMA_CONTROL_REGISTER_RESULT,
> + .repeat = 0,
> + };
> + QEMUFileRDMA *rfile = opaque;
> + RDMAContext *rdma = rfile->rdma;
> + RDMAControlHeader head;
> + RDMARegister *reg, *registers;
> + RDMACompress *comp;
> + RDMARegisterResult *reg_result;
> + static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
> + RDMALocalBlock *block;
> + void *host_addr;
> + int ret = 0;
> + int idx = 0;
> + int count = 0;
> +
> + CHECK_ERROR_STATE();
> +
> + do {
> + DPRINTF("Waiting for next registration %d...\n", flags);
> +
> + ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
> +
> + if (ret < 0) {
> + break;
> + }
> +
> + if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
> + printf("Too many requests in this message (%d). Bailing.\n",
> + head.repeat);
> + ret = -EIO;
> + break;
> + }
> +
> + switch (head.type) {
> + case RDMA_CONTROL_COMPRESS:
> + comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
> +
> + DPRINTF("Zapping zero chunk: %" PRId64
> + " bytes, index %d, offset %" PRId64 "\n",
> + comp->length, comp->block_idx, comp->offset);
> + comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
> + block = &(rdma->local_ram_blocks.block[comp->block_idx]);
> +
> + host_addr = block->local_host_addr +
> + (comp->offset - block->offset);
> +
> + ram_handle_compressed(host_addr, comp->value, comp->length);
> + break;
> + case RDMA_CONTROL_REGISTER_FINISHED:
> + DPRINTF("Current registrations complete.\n");
> + goto out;
> + case RDMA_CONTROL_REGISTER_REQUEST:
> + DPRINTF("There are %d registration requests\n", head.repeat);
> +
> + resp.repeat = head.repeat;
> + registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
> +
> + for (count = 0; count < head.repeat; count++) {
> + reg = ®isters[count];
> + reg_result = &results[count];
> +
> + DPRINTF("Registration request (%d): %d"
> + " bytes, index %d, offset %" PRId64 "\n",
> + count, reg->len, reg->current_index, reg->offset);
> +
> + block = &(rdma->local_ram_blocks.block[reg->current_index]);
> + host_addr = (block->local_host_addr +
> + (reg->offset - block->offset));
> + if (qemu_rdma_register_and_get_keys(rdma, block,
> + (uint8_t *)host_addr, NULL, ®_result->rkey)) {
> + fprintf(stderr, "cannot get rkey!\n");
> + ret = -EINVAL;
> + goto out;
> + }
> +
> + DPRINTF("Registered rkey for this request: %x\n",
> + reg_result->rkey);
> + }
> +
> + ret = qemu_rdma_post_send_control(rdma,
> + (uint8_t *) results, &resp);
> +
> + if (ret < 0) {
> + fprintf(stderr, "Failed to send control buffer!\n");
> + goto out;
> + }
> + break;
> + case RDMA_CONTROL_REGISTER_RESULT:
> + fprintf(stderr, "Invalid RESULT message at dest.\n");
> + ret = -EIO;
> + goto out;
> + default:
> + fprintf(stderr, "Unknown control message %s\n",
> + control_desc[head.type]);
> + ret = -EIO;
> + goto out;
> + }
> + } while (1);
> +out:
> + if(ret < 0) {
> + rdma->error_state = ret;
> + }
> + return ret;
> +}
> +
> +static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
> + uint64_t flags)
> +{
> + QEMUFileRDMA *rfile = opaque;
> + RDMAContext *rdma = rfile->rdma;
> +
> + CHECK_ERROR_STATE();
> +
> + DPRINTF("start section: %" PRIu64 "\n", flags);
> + qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
> + qemu_fflush(f);
> + return 0;
> +}
> +
> +/*
> + * Inform dest that dynamic registrations are done for now.
> + * First, flush writes, if any.
> + */
> +static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
> + uint64_t flags)
> +{
> + QEMUFileRDMA *rfile = opaque;
> + RDMAContext *rdma = rfile->rdma;
> + RDMAControlHeader head = { .len = 0,
> + .type = RDMA_CONTROL_REGISTER_FINISHED,
> + .repeat = 1,
> + };
> +
> + CHECK_ERROR_STATE();
> +
> + qemu_fflush(f);
> + int ret = qemu_rdma_drain_cq(f, rdma);
> +
> + if (ret >= 0) {
> + DPRINTF("Sending registration finish %" PRIu64 "...\n", flags);
> +
> + ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL);
> + }
> +
> + if (ret < 0) {
> + rdma->error_state = ret;
> + }
> +
> + return ret;
> +}
> +
> +const QEMUFileOps rdma_read_ops = {
> + .get_buffer = qemu_rdma_get_buffer,
> + .close = qemu_rdma_close,
> + .hook_ram_load = qemu_rdma_registration_handle,
> +};
> +
> +const QEMUFileOps rdma_write_ops = {
> + .put_buffer = qemu_rdma_put_buffer,
> + .close = qemu_rdma_close,
> + .before_ram_iterate = qemu_rdma_registration_start,
> + .after_ram_iterate = qemu_rdma_registration_stop,
> + .save_page = qemu_rdma_save_page,
> +};
> +
> +static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
> +{
> + QEMUFileRDMA *r = g_malloc0(sizeof(QEMUFileRDMA));
> +
> + if (qemu_file_mode_is_not_valid(mode)) {
> + return NULL;
> + }
> +
> + r->rdma = rdma;
> +
> + if (mode[0] == 'w') {
> + r->file = qemu_fopen_ops(r, &rdma_write_ops);
> + } else {
> + r->file = qemu_fopen_ops(r, &rdma_read_ops);
> + }
> +
> + return r->file;
> +}
> +
> +static void rdma_accept_incoming_migration(void *opaque)
> +{
> + RDMAContext *rdma = opaque;
> + int ret;
> + QEMUFile *f;
> +
> + DPRINTF("Accepting rdma connection...\n");
> + ret = qemu_rdma_accept(rdma);
> + if (ret) {
> + fprintf(stderr, "RDMA Migration initialization failed!\n");
> + goto err;
> + }
> +
> + DPRINTF("Accepted migration\n");
> +
> + f = qemu_fopen_rdma(rdma, "rb");
> + if (f == NULL) {
> + fprintf(stderr, "could not qemu_fopen_rdma!\n");
> + goto err;
> + }
> +
> + process_incoming_migration(f);
> + return;
> +
> +err:
> + qemu_rdma_cleanup(rdma);
> +}
> +
> +void rdma_start_incoming_migration(const char *host_port, Error **errp)
> +{
> + int ret;
> + RDMAContext *rdma;
> +
> + DPRINTF("Starting RDMA-based incoming migration\n");
> + rdma = qemu_rdma_data_init(host_port, errp);
> + if (rdma == NULL) {
> + return;
> + }
> +
> + ret = qemu_rdma_dest_init(rdma, NULL);
> +
> + if (!ret) {
> + DPRINTF("qemu_rdma_dest_init success\n");
> + ret = qemu_rdma_dest_prepare(rdma, NULL);
> +
> + if (!ret) {
> + DPRINTF("qemu_rdma_dest_prepare success\n");
> +
> + qemu_set_fd_handler2(rdma->channel->fd, NULL,
> + rdma_accept_incoming_migration, NULL,
> + (void *)(intptr_t) rdma);
> + return;
> + }
> + }
> +
> + g_free(rdma);
> +}
> +
> +void rdma_start_outgoing_migration(void *opaque,
> + const char *host_port, Error **errp)
> +{
> + MigrationState *s = opaque;
> + RDMAContext *rdma = qemu_rdma_data_init(host_port, errp);
> + int ret;
> +
> + if (rdma == NULL) {
> + goto err;
> + }
> +
> + ret = qemu_rdma_source_init(rdma, NULL,
> +
> s->enabled_capabilities[MIGRATION_CAPABILITY_X_CHUNK_REGISTER_DESTINATION]);
> +
> + if (!ret) {
> + DPRINTF("qemu_rdma_source_init success\n");
> + ret = qemu_rdma_connect(rdma, NULL);
> +
> + if (!ret) {
> + DPRINTF("qemu_rdma_source_connect success\n");
> + s->file = qemu_fopen_rdma(rdma, "wb");
> + migrate_fd_connect(s);
> + return;
> + }
> + }
> +err:
> + g_free(rdma);
> + migrate_fd_error(s);
> + error_setg(errp, "Error connecting using rdma! %d\n", ret);
> +}
> +
> diff --git a/migration.c b/migration.c
> index 5afd9b8..0ee36a0 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -78,6 +78,10 @@ void qemu_start_incoming_migration(const char *uri, Error
> **errp)
>
> if (strstart(uri, "tcp:", &p))
> tcp_start_incoming_migration(p, errp);
> +#ifdef CONFIG_RDMA
> + else if (strstart(uri, "x-rdma:", &p))
> + rdma_start_incoming_migration(p, errp);
> +#endif
> #if !defined(WIN32)
> else if (strstart(uri, "exec:", &p))
> exec_start_incoming_migration(p, errp);
> @@ -121,7 +125,6 @@ void process_incoming_migration(QEMUFile *f)
> Coroutine *co = qemu_coroutine_create(process_incoming_migration_co);
> int fd = qemu_get_fd(f);
>
> - assert(fd != -1);
> qemu_set_nonblock(fd);
So in the end you are setting the fd to non-blocking. :)
Please add a get_fd implementation to QEMUFile that returns
rdma->comp_channel->fd, and leave the assertion in place. The fd will
be set twice to nonblocking, but that's ok.
> qemu_coroutine_enter(co, f);
> }
> @@ -406,6 +409,10 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
>
> if (strstart(uri, "tcp:", &p)) {
> tcp_start_outgoing_migration(s, p, &local_err);
> +#ifdef CONFIG_RDMA
> + } else if (strstart(uri, "x-rdma:", &p)) {
> + rdma_start_outgoing_migration(s, p, &local_err);
> +#endif
> #if !defined(WIN32)
> } else if (strstart(uri, "exec:", &p)) {
> exec_start_outgoing_migration(s, p, &local_err);
>
Paolo