qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH RFC 05/14] migration/rdma: Create the multifd channels for RDMA


From: Zhimin Feng
Subject: [PATCH RFC 05/14] migration/rdma: Create the multifd channels for RDMA
Date: Thu, 13 Feb 2020 17:37:46 +0800

In both sides. We still don't transmit anything through them,
and we only build the RDMA connections.

Signed-off-by: Zhimin Feng <address@hidden>
---
 migration/multifd.c | 103 ++++++++++++++++++++++++++++++++++++---
 migration/multifd.h |  10 ++++
 migration/rdma.c    | 115 ++++++++++++++++++++++++++++++++------------
 migration/rdma.h    |   4 +-
 4 files changed, 189 insertions(+), 43 deletions(-)

diff --git a/migration/multifd.c b/migration/multifd.c
index 63678d7fdd..acdfd3d5b3 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -248,6 +248,19 @@ struct {
     int exiting;
 } *multifd_send_state;
 
+int get_multifd_send_param(int id, MultiFDSendParams **param)
+{
+    int ret = 0;
+
+    if (id < 0 || id >= migrate_multifd_channels()) {
+        ret = -1;
+    } else {
+        *param = &(multifd_send_state->params[id]);
+    }
+
+    return ret;
+}
+
 /*
  * How we use multifd_send_state->pages and channel->pages?
  *
@@ -410,6 +423,9 @@ void multifd_save_cleanup(void)
         p->packet_len = 0;
         g_free(p->packet);
         p->packet = NULL;
+        if (migrate_use_rdma()) {
+            g_free(p->rdma);
+        }
     }
     qemu_sem_destroy(&multifd_send_state->channels_ready);
     g_free(multifd_send_state->params);
@@ -464,6 +480,27 @@ void multifd_send_sync_main(QEMUFile *f)
     trace_multifd_send_sync_main(multifd_send_state->packet_num);
 }
 
+static void *multifd_rdma_send_thread(void *opaque)
+{
+    MultiFDSendParams *p = opaque;
+
+    while (true) {
+        qemu_mutex_lock(&p->mutex);
+        if (p->quit) {
+            qemu_mutex_unlock(&p->mutex);
+            break;
+        }
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_wait(&p->sem);
+    }
+
+    qemu_mutex_lock(&p->mutex);
+    p->running = false;
+    qemu_mutex_unlock(&p->mutex);
+
+    return NULL;
+}
+
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
@@ -566,6 +603,12 @@ static void rdma_send_channel_create(MultiFDSendParams *p)
 {
     Error *local_err = NULL;
 
+    if (multifd_channel_rdma_connect(p)) {
+        error_setg(&local_err, "multifd: rdma channel %d not established",
+                   p->id);
+        return ;
+    }
+
     if (p->quit) {
         error_setg(&local_err, "multifd: send id %d already quit", p->id);
         return ;
@@ -654,6 +697,19 @@ struct {
     uint64_t packet_num;
 } *multifd_recv_state;
 
+int get_multifd_recv_param(int id, MultiFDRecvParams **param)
+{
+    int ret = 0;
+
+    if (id < 0 || id >= migrate_multifd_channels()) {
+        ret = -1;
+    } else {
+        *param = &(multifd_recv_state->params[id]);
+    }
+
+    return ret;
+}
+
 static void multifd_recv_terminate_threads(Error *err)
 {
     int i;
@@ -724,6 +780,9 @@ int multifd_load_cleanup(Error **errp)
         p->packet_len = 0;
         g_free(p->packet);
         p->packet = NULL;
+        if (migrate_use_rdma()) {
+            g_free(p->rdma);
+        }
     }
     qemu_sem_destroy(&multifd_recv_state->sem_sync);
     g_free(multifd_recv_state->params);
@@ -761,6 +820,27 @@ void multifd_recv_sync_main(void)
     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
 }
 
+static void *multifd_rdma_recv_thread(void *opaque)
+{
+    MultiFDRecvParams *p = opaque;
+
+    while (true) {
+        qemu_mutex_lock(&p->mutex);
+        if (p->quit) {
+            qemu_mutex_unlock(&p->mutex);
+            break;
+        }
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_wait(&p->sem_sync);
+    }
+
+    qemu_mutex_lock(&p->mutex);
+    p->running = false;
+    qemu_mutex_unlock(&p->mutex);
+
+    return NULL;
+}
+
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
@@ -880,18 +960,24 @@ bool multifd_recv_all_channels_created(void)
 bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
 {
     MultiFDRecvParams *p;
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
     Error *local_err = NULL;
     int id;
 
-    id = multifd_recv_initial_packet(ioc, &local_err);
-    if (id < 0) {
-        multifd_recv_terminate_threads(local_err);
-        error_propagate_prepend(errp, local_err,
-                                "failed to receive packet"
-                                " via multifd channel %d: ",
-                                atomic_read(&multifd_recv_state->count));
-        return false;
+    if (migrate_use_rdma()) {
+        id = multifd_recv_state->count;
+    } else {
+        id = multifd_recv_initial_packet(ioc, &local_err);
+        if (id < 0) {
+            multifd_recv_terminate_threads(local_err);
+            error_propagate_prepend(errp, local_err,
+                    "failed to receive packet"
+                    " via multifd channel %d: ",
+                    atomic_read(&multifd_recv_state->count));
+            return false;
+        }
     }
+
     trace_multifd_recv_new_channel(id);
 
     p = &multifd_recv_state->params[id];
@@ -903,6 +989,7 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
         return false;
     }
     p->c = ioc;
+    p->file = rioc->file;
     object_ref(OBJECT(ioc));
     /* initial packet */
     p->num_packets = 1;
diff --git a/migration/multifd.h b/migration/multifd.h
index c9c11ad140..1eae427f8c 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -67,6 +67,10 @@ typedef struct {
     char *name;
     /* channel thread id */
     QemuThread thread;
+    /* RDMAContext channel */
+    RDMAContext *rdma;
+    /* communication channel */
+    QEMUFile *file;
     /* communication channel */
     QIOChannel *c;
     /* sem where to wait for more work */
@@ -108,6 +112,10 @@ typedef struct {
     char *name;
     /* channel thread id */
     QemuThread thread;
+    /* RDMAContext channel */
+    RDMAContext *rdma;
+    /* communication channel */
+    QEMUFile *file;
     /* communication channel */
     QIOChannel *c;
     /* this mutex protects the following parameters */
@@ -137,5 +145,7 @@ typedef struct {
     QemuSemaphore sem_sync;
 } MultiFDRecvParams;
 
+int get_multifd_send_param(int id, MultiFDSendParams **param);
+int get_multifd_recv_param(int id, MultiFDRecvParams **param);
 #endif
 
diff --git a/migration/rdma.c b/migration/rdma.c
index a76823986e..48615fcaad 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -94,6 +94,8 @@ static const char *wrid_desc[] = {
     [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
 };
 
+static const char *rdma_host_port;
+
 /*
  * Negotiate RDMA capabilities during connection-setup time.
  */
@@ -3122,6 +3124,33 @@ static int qemu_rdma_accept(RDMAContext *rdma)
         qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
                             NULL,
                             (void *)(intptr_t)rdma->return_path);
+    } else if (migrate_use_multifd()) {
+        int thread_count;
+        int i;
+        MultiFDRecvParams *multifd_recv_param;
+        RDMAContext *multifd_rdma = NULL;
+        thread_count = migrate_multifd_channels();
+        /* create the multifd channels for RDMA */
+        for (i = 0; i < thread_count; i++) {
+            if (get_multifd_recv_param(i, &multifd_recv_param) < 0) {
+                error_report("rdma: error getting multifd_recv_param(%d)", i);
+                goto err_rdma_dest_wait;
+            }
+
+            if (multifd_recv_param->rdma->cm_id == NULL) {
+                multifd_rdma = multifd_recv_param->rdma;
+                break;
+            }
+        }
+
+        if (multifd_rdma) {
+            qemu_set_fd_handler(rdma->channel->fd,
+                                rdma_accept_incoming_migration,
+                                NULL, (void *)(intptr_t)multifd_rdma);
+        } else {
+            qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
+                                NULL, rdma);
+        }
     } else {
         qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
                             NULL, rdma);
@@ -3744,7 +3773,7 @@ static void migration_rdma_process_incoming(QEMUFile *f, 
Error **errp)
         mis->from_src_file = f;
         qemu_file_set_blocking(f, false);
 
-        start_migration = migrate_use_multifd();
+        start_migration = !migrate_use_multifd();
     } else {
         ioc = QIO_CHANNEL(getQIOChannel(f));
         /* Multiple connections */
@@ -3847,6 +3876,30 @@ void rdma_start_incoming_migration(const char 
*host_port, Error **errp)
         goto err;
     }
 
+    if (migrate_use_multifd()) {
+        int thread_count;
+        int i;
+        int idx;
+        MultiFDRecvParams *multifd_recv_param;
+        thread_count = migrate_multifd_channels();
+        for (i = 0; i < thread_count; i++) {
+            if (get_multifd_recv_param(i, &multifd_recv_param) < 0) {
+                error_report("rdma: error getting multifd_recv_param(%d)", i);
+                goto err;
+            }
+
+            multifd_recv_param->rdma = qemu_rdma_data_init(host_port,
+                                                           &local_err);
+            for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
+                multifd_recv_param->rdma->wr_data[idx].control_len = 0;
+                multifd_recv_param->rdma->wr_data[idx].control_curr = NULL;
+            }
+            /* the CM channel and CM id is shared */
+            multifd_recv_param->rdma->channel = rdma->channel;
+            multifd_recv_param->rdma->listen_id = rdma->listen_id;
+        }
+    }
+
     qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
                         NULL, (void *)(intptr_t)rdma);
     return;
@@ -3868,6 +3921,10 @@ void rdma_start_outgoing_migration(void *opaque,
         goto err;
     }
 
+    if (migrate_use_multifd()) {
+        rdma_host_port = g_strdup(host_port);
+    }
+
     ret = qemu_rdma_source_init(rdma,
         s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
 
@@ -3918,44 +3975,38 @@ err:
     g_free(rdma_return_path);
 }
 
-void *multifd_rdma_recv_thread(void *opaque)
+int multifd_channel_rdma_connect(void *opaque)
 {
-    MultiFDRecvParams *p = opaque;
+    MultiFDSendParams *p = opaque;
+    Error *local_err = NULL;
+    int ret = 0;
 
-    while (true) {
-        qemu_mutex_lock(&p->mutex);
-        if (p->quit) {
-            qemu_mutex_unlock(&p->mutex);
-            break;
-        }
-        qemu_mutex_unlock(&p->mutex);
-        qemu_sem_wait(&p->sem_sync);
+    p->rdma = qemu_rdma_data_init(rdma_host_port, &local_err);
+    if (p->rdma == NULL) {
+        goto out;
     }
 
-    qemu_mutex_lock(&p->mutex);
-    p->running = false;
-    qemu_mutex_unlock(&p->mutex);
-
-    return NULL;
-}
+    ret = qemu_rdma_source_init(p->rdma,
+                                migrate_use_rdma_pin_all(),
+                                &local_err);
+    if (ret) {
+        goto out;
+    }
 
-void *multifd_rdma_send_thread(void *opaque)
-{
-    MultiFDSendParams *p = opaque;
+    ret = qemu_rdma_connect(p->rdma, &local_err);
+    if (ret) {
+        goto out;
+    }
 
-    while (true) {
-        qemu_mutex_lock(&p->mutex);
-        if (p->quit) {
-            qemu_mutex_unlock(&p->mutex);
-            break;
-        }
-        qemu_mutex_unlock(&p->mutex);
-        qemu_sem_wait(&p->sem);
+    p->file = qemu_fopen_rdma(p->rdma, "wb");
+    if (p->file == NULL) {
+        goto out;
     }
 
-    qemu_mutex_lock(&p->mutex);
-    p->running = false;
-    qemu_mutex_unlock(&p->mutex);
+out:
+    if (local_err) {
+        trace_multifd_send_error(p->id);
+    }
 
-    return NULL;
+    return ret;
 }
diff --git a/migration/rdma.h b/migration/rdma.h
index cb206c7004..ace6e5be90 100644
--- a/migration/rdma.h
+++ b/migration/rdma.h
@@ -263,9 +263,7 @@ struct QIOChannelRDMA {
     bool blocking; /* XXX we don't actually honour this yet */
 };
 
-
-void *multifd_rdma_recv_thread(void *opaque);
-void *multifd_rdma_send_thread(void *opaque);
+int multifd_channel_rdma_connect(void *opaque);
 
 void rdma_start_outgoing_migration(void *opaque, const char *host_port,
                                    Error **errp);
-- 
2.19.1





reply via email to

[Prev in Thread] Current Thread [Next in Thread]