qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH v2 4/5] migration: implement bi-directional RDMA QIO


From: Lidong Chen
Subject: [Qemu-devel] [PATCH v2 4/5] migration: implement bi-directional RDMA QIOChannel
Date: Wed, 25 Apr 2018 22:35:33 +0800

This patch implements bi-directional RDMA QIOChannel. Because different
threads may access RDMAQIOChannel concurrently, this patch use RCU to protect 
it.

Signed-off-by: Lidong Chen <address@hidden>
---
 migration/rdma.c | 162 +++++++++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 146 insertions(+), 16 deletions(-)

diff --git a/migration/rdma.c b/migration/rdma.c
index f5c1d02..0652224 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -86,6 +86,7 @@ static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
                                 " to abort!"); \
                 rdma->error_reported = 1; \
             } \
+            rcu_read_unlock(); \
             return rdma->error_state; \
         } \
     } while (0)
@@ -405,6 +406,7 @@ struct QIOChannelRDMA {
     RDMAContext *rdma;
     QEMUFile *file;
     bool blocking; /* XXX we don't actually honour this yet */
+    QemuMutex lock;
 };
 
 /*
@@ -2635,12 +2637,29 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
 {
     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
     QEMUFile *f = rioc->file;
-    RDMAContext *rdma = rioc->rdma;
+    RDMAContext *rdma;
     int ret;
     ssize_t done = 0;
     size_t i;
     size_t len = 0;
 
+    rcu_read_lock();
+    rdma = atomic_rcu_read(&rioc->rdma);
+
+    if (!rdma) {
+        rcu_read_unlock();
+        return -EIO;
+    }
+
+    if (rdma->listen_id) {
+        rdma = rdma->return_path;
+    }
+
+    if (!rdma) {
+        rcu_read_unlock();
+        return -EIO;
+    }
+
     CHECK_ERROR_STATE();
 
     /*
@@ -2650,6 +2669,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
     ret = qemu_rdma_write_flush(f, rdma);
     if (ret < 0) {
         rdma->error_state = ret;
+        rcu_read_unlock();
         return ret;
     }
 
@@ -2669,6 +2689,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
 
             if (ret < 0) {
                 rdma->error_state = ret;
+                rcu_read_unlock();
                 return ret;
             }
 
@@ -2677,6 +2698,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
         }
     }
 
+    rcu_read_unlock();
     return done;
 }
 
@@ -2710,12 +2732,29 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
                                       Error **errp)
 {
     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
-    RDMAContext *rdma = rioc->rdma;
+    RDMAContext *rdma;
     RDMAControlHeader head;
     int ret = 0;
     ssize_t i;
     size_t done = 0;
 
+    rcu_read_lock();
+    rdma = atomic_rcu_read(&rioc->rdma);
+
+    if (!rdma) {
+        rcu_read_unlock();
+        return -EIO;
+    }
+
+    if (!rdma->listen_id) {
+        rdma = rdma->return_path;
+    }
+
+    if (!rdma) {
+        rcu_read_unlock();
+        return -EIO;
+    }
+
     CHECK_ERROR_STATE();
 
     for (i = 0; i < niov; i++) {
@@ -2727,7 +2766,7 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
          * were given and dish out the bytes until we run
          * out of bytes.
          */
-        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+        ret = qemu_rdma_fill(rdma, data, want, 0);
         done += ret;
         want -= ret;
         /* Got what we needed, so go to next iovec */
@@ -2749,25 +2788,28 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
 
         if (ret < 0) {
             rdma->error_state = ret;
+            rcu_read_unlock();
             return ret;
         }
 
         /*
          * SEND was received with new bytes, now try again.
          */
-        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+        ret = qemu_rdma_fill(rdma, data, want, 0);
         done += ret;
         want -= ret;
 
         /* Still didn't get enough, so lets just return */
         if (want) {
             if (done == 0) {
+                rcu_read_unlock();
                 return QIO_CHANNEL_ERR_BLOCK;
             } else {
                 break;
             }
         }
     }
+    rcu_read_unlock();
     return done;
 }
 
@@ -2823,6 +2865,16 @@ qio_channel_rdma_source_prepare(GSource *source,
     GIOCondition cond = 0;
     *timeout = -1;
 
+    if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
+       (!rdma->listen_id && rsource->condition == G_IO_IN)) {
+        rdma = rdma->return_path;
+    }
+
+    if (!rdma) {
+        error_report("RDMAContext is NULL when prepare Gsource");
+        return FALSE;
+    }
+
     if (rdma->wr_data[0].control_len) {
         cond |= G_IO_IN;
     }
@@ -2838,6 +2890,16 @@ qio_channel_rdma_source_check(GSource *source)
     RDMAContext *rdma = rsource->rioc->rdma;
     GIOCondition cond = 0;
 
+    if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
+       (!rdma->listen_id && rsource->condition == G_IO_IN)) {
+        rdma = rdma->return_path;
+    }
+
+    if (!rdma) {
+        error_report("RDMAContext is NULL when check Gsource");
+        return FALSE;
+    }
+
     if (rdma->wr_data[0].control_len) {
         cond |= G_IO_IN;
     }
@@ -2856,6 +2918,16 @@ qio_channel_rdma_source_dispatch(GSource *source,
     RDMAContext *rdma = rsource->rioc->rdma;
     GIOCondition cond = 0;
 
+    if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
+       (!rdma->listen_id && rsource->condition == G_IO_IN)) {
+        rdma = rdma->return_path;
+    }
+
+    if (!rdma) {
+        error_report("RDMAContext is NULL when dispatch Gsource");
+        return FALSE;
+    }
+
     if (rdma->wr_data[0].control_len) {
         cond |= G_IO_IN;
     }
@@ -2905,15 +2977,29 @@ static int qio_channel_rdma_close(QIOChannel *ioc,
                                   Error **errp)
 {
     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+    RDMAContext *rdma;
     trace_qemu_rdma_close();
-    if (rioc->rdma) {
-        if (!rioc->rdma->error_state) {
-            rioc->rdma->error_state = qemu_file_get_error(rioc->file);
-        }
-        qemu_rdma_cleanup(rioc->rdma);
-        g_free(rioc->rdma);
-        rioc->rdma = NULL;
+
+    qemu_mutex_lock(&rioc->lock);
+    rdma = rioc->rdma;
+    if (!rdma) {
+        qemu_mutex_unlock(&rioc->lock);
+        return 0;
+    }
+    atomic_rcu_set(&rioc->rdma, NULL);
+    qemu_mutex_unlock(&rioc->lock);
+
+    if (!rdma->error_state) {
+        rdma->error_state = qemu_file_get_error(rioc->file);
+    }
+    qemu_rdma_cleanup(rdma);
+
+    if (rdma->return_path) {
+        qemu_rdma_cleanup(rdma->return_path);
+        g_free(rdma->return_path);
     }
+
+    g_free(rdma);
     return 0;
 }
 
@@ -2956,12 +3042,21 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void 
*opaque,
                                   size_t size, uint64_t *bytes_sent)
 {
     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
-    RDMAContext *rdma = rioc->rdma;
+    RDMAContext *rdma;
     int ret;
 
+    rcu_read_lock();
+    rdma = atomic_rcu_read(&rioc->rdma);
+
+    if (!rdma) {
+        rcu_read_unlock();
+        return -EIO;
+    }
+
     CHECK_ERROR_STATE();
 
     if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
+        rcu_read_unlock();
         return RAM_SAVE_CONTROL_NOT_SUPP;
     }
 
@@ -3046,9 +3141,11 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void 
*opaque,
         }
     }
 
+    rcu_read_unlock();
     return RAM_SAVE_CONTROL_DELAYED;
 err:
     rdma->error_state = ret;
+    rcu_read_unlock();
     return ret;
 }
 
@@ -3224,8 +3321,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, 
void *opaque)
     RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
                                  .repeat = 1 };
     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
-    RDMAContext *rdma = rioc->rdma;
-    RDMALocalBlocks *local = &rdma->local_ram_blocks;
+    RDMAContext *rdma;
+    RDMALocalBlocks *local;
     RDMAControlHeader head;
     RDMARegister *reg, *registers;
     RDMACompress *comp;
@@ -3238,8 +3335,17 @@ static int qemu_rdma_registration_handle(QEMUFile *f, 
void *opaque)
     int count = 0;
     int i = 0;
 
+    rcu_read_lock();
+    rdma = atomic_rcu_read(&rioc->rdma);
+
+    if (!rdma) {
+        rcu_read_unlock();
+        return -EIO;
+    }
+
     CHECK_ERROR_STATE();
 
+    local = &rdma->local_ram_blocks;
     do {
         trace_qemu_rdma_registration_handle_wait();
 
@@ -3469,6 +3575,7 @@ out:
     if (ret < 0) {
         rdma->error_state = ret;
     }
+    rcu_read_unlock();
     return ret;
 }
 
@@ -3525,11 +3632,19 @@ static int qemu_rdma_registration_start(QEMUFile *f, 
void *opaque,
                                         uint64_t flags, void *data)
 {
     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
-    RDMAContext *rdma = rioc->rdma;
+    RDMAContext *rdma;
+
+    rcu_read_lock();
+    rdma = atomic_rcu_read(&rioc->rdma);
+    if (!rdma) {
+        rcu_read_unlock();
+        return -EIO;
+    }
 
     CHECK_ERROR_STATE();
 
     if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
+        rcu_read_unlock();
         return 0;
     }
 
@@ -3537,6 +3652,7 @@ static int qemu_rdma_registration_start(QEMUFile *f, void 
*opaque,
     qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
     qemu_fflush(f);
 
+    rcu_read_unlock();
     return 0;
 }
 
@@ -3549,13 +3665,21 @@ static int qemu_rdma_registration_stop(QEMUFile *f, 
void *opaque,
 {
     Error *local_err = NULL, **errp = &local_err;
     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
-    RDMAContext *rdma = rioc->rdma;
+    RDMAContext *rdma;
     RDMAControlHeader head = { .len = 0, .repeat = 1 };
     int ret = 0;
 
+    rcu_read_lock();
+    rdma = atomic_rcu_read(&rioc->rdma);
+    if (!rdma) {
+        rcu_read_unlock();
+        return -EIO;
+    }
+
     CHECK_ERROR_STATE();
 
     if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
+        rcu_read_unlock();
         return 0;
     }
 
@@ -3587,6 +3711,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void 
*opaque,
                     qemu_rdma_reg_whole_ram_blocks : NULL);
         if (ret < 0) {
             ERROR(errp, "receiving remote info!");
+            rcu_read_unlock();
             return ret;
         }
 
@@ -3610,6 +3735,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void 
*opaque,
                         "not identical on both the source and destination.",
                         local->nb_blocks, nb_dest_blocks);
             rdma->error_state = -EINVAL;
+            rcu_read_unlock();
             return -EINVAL;
         }
 
@@ -3626,6 +3752,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void 
*opaque,
                             local->block[i].length,
                             rdma->dest_blocks[i].length);
                 rdma->error_state = -EINVAL;
+                rcu_read_unlock();
                 return -EINVAL;
             }
             local->block[i].remote_host_addr =
@@ -3643,9 +3770,11 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void 
*opaque,
         goto err;
     }
 
+    rcu_read_unlock();
     return 0;
 err:
     rdma->error_state = ret;
+    rcu_read_unlock();
     return ret;
 }
 
@@ -3707,6 +3836,7 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const 
char *mode)
 
     rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
     rioc->rdma = rdma;
+    qemu_mutex_init(&rioc->lock);
 
     if (mode[0] == 'w') {
         rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
-- 
1.8.3.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]