qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [RFC PATCH RDMA support v4: 08/10] introduce QEMUFileRDMA


From: mrhines
Subject: [Qemu-devel] [RFC PATCH RDMA support v4: 08/10] introduce QEMUFileRDMA
Date: Sun, 17 Mar 2013 23:19:01 -0400

From: "Michael R. Hines" <address@hidden>

This compiles with and without --enable-rdma.

Signed-off-by: Michael R. Hines <address@hidden>
---
 include/migration/qemu-file.h |   10 +++
 savevm.c                      |  172 ++++++++++++++++++++++++++++++++++++++---
 2 files changed, 172 insertions(+), 10 deletions(-)

diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
index df81261..9046751 100644
--- a/include/migration/qemu-file.h
+++ b/include/migration/qemu-file.h
@@ -51,23 +51,33 @@ typedef int (QEMUFileCloseFunc)(void *opaque);
  */
 typedef int (QEMUFileGetFD)(void *opaque);
 
+/* 
+ * 'drain' from a QEMUFile perspective means
+ * to flush the outbound send buffer
+ * (if one exists). (Only used by RDMA right now)
+ */
+typedef int (QEMUFileDrainFunc)(void *opaque);
+
 typedef struct QEMUFileOps {
     QEMUFilePutBufferFunc *put_buffer;
     QEMUFileGetBufferFunc *get_buffer;
     QEMUFileCloseFunc *close;
     QEMUFileGetFD *get_fd;
+    QEMUFileDrainFunc *drain;
 } QEMUFileOps;
 
 QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops);
 QEMUFile *qemu_fopen(const char *filename, const char *mode);
 QEMUFile *qemu_fdopen(int fd, const char *mode);
 QEMUFile *qemu_fopen_socket(int fd, const char *mode);
+QEMUFile *qemu_fopen_rdma(void *opaque, const char *mode);
 QEMUFile *qemu_popen_cmd(const char *command, const char *mode);
 int qemu_get_fd(QEMUFile *f);
 int qemu_fclose(QEMUFile *f);
 int64_t qemu_ftell(QEMUFile *f);
 void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size);
 void qemu_put_byte(QEMUFile *f, int v);
+int qemu_drain(QEMUFile *f);
 
 static inline void qemu_put_ubyte(QEMUFile *f, unsigned int v)
 {
diff --git a/savevm.c b/savevm.c
index 35c8d1e..9b90b7f 100644
--- a/savevm.c
+++ b/savevm.c
@@ -32,6 +32,7 @@
 #include "qemu/timer.h"
 #include "audio/audio.h"
 #include "migration/migration.h"
+#include "migration/rdma.h"
 #include "qemu/sockets.h"
 #include "qemu/queue.h"
 #include "sysemu/cpus.h"
@@ -143,6 +144,13 @@ typedef struct QEMUFileSocket
     QEMUFile *file;
 } QEMUFileSocket;
 
+typedef struct QEMUFileRDMA
+{
+    void *rdma;
+    size_t len;
+    QEMUFile *file;
+} QEMUFileRDMA;
+
 typedef struct {
     Coroutine *co;
     int fd;
@@ -178,6 +186,66 @@ static int socket_get_fd(void *opaque)
     return s->fd;
 }
 
+/*
+ * SEND messages for none-live state only.
+ * pc.ram is handled elsewhere...
+ */
+static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, 
int size)
+{
+    QEMUFileRDMA *r = opaque;
+    size_t remaining = size;
+    uint8_t * data = (void *) buf;
+
+    /*
+     * Although we're sending non-live
+     * state here, push out any writes that
+     * we're queued up for pc.ram anyway.
+     */
+    if (qemu_rdma_write_flush(r->rdma) < 0)
+        return -EIO;
+
+    while(remaining) {
+        r->len = MIN(remaining, RDMA_SEND_INCREMENT);
+        remaining -= r->len;
+
+        if(qemu_rdma_exchange_send(r->rdma, data, r->len) < 0)
+                return -EINVAL;
+
+        data += r->len;
+    }
+
+    return size;
+} 
+
+/*
+ * RDMA links don't use bytestreams, so we have to
+ * return bytes to QEMUFile opportunistically.
+ */
+static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf, int64_t pos, int 
size)
+{
+    QEMUFileRDMA *r = opaque;
+
+    /*
+     * First, we hold on to the last SEND message we 
+     * were given and dish out the bytes until we run 
+     * out of bytes.
+     */
+    if((r->len = qemu_rdma_fill(r->rdma, buf, size)))
+       return r->len; 
+
+     /*
+      * Once we run out, we block and wait for another
+      * SEND message to arrive.
+      */
+    if(qemu_rdma_exchange_recv(r->rdma) < 0)
+       return -EINVAL;
+
+    /*
+     * SEND was received with new bytes, now try again.
+     */
+    return qemu_rdma_fill(r->rdma, buf, size);
+} 
+
 static int socket_get_buffer(void *opaque, uint8_t *buf, int64_t pos, int size)
 {
     QEMUFileSocket *s = opaque;
@@ -390,16 +458,24 @@ static const QEMUFileOps socket_write_ops = {
     .close =      socket_close
 };
 
-QEMUFile *qemu_fopen_socket(int fd, const char *mode)
+static bool qemu_mode_is_not_valid(const char * mode)
 {
-    QEMUFileSocket *s = g_malloc0(sizeof(QEMUFileSocket));
-
     if (mode == NULL ||
         (mode[0] != 'r' && mode[0] != 'w') ||
         mode[1] != 'b' || mode[2] != 0) {
         fprintf(stderr, "qemu_fopen: Argument validity check failed\n");
-        return NULL;
+        return true;
     }
+    
+    return false;
+}
+
+QEMUFile *qemu_fopen_socket(int fd, const char *mode)
+{
+    QEMUFileSocket *s = g_malloc0(sizeof(QEMUFileSocket));
+
+    if(qemu_mode_is_not_valid(mode))
+       return NULL;
 
     s->fd = fd;
     if (mode[0] == 'w') {
@@ -411,16 +487,66 @@ QEMUFile *qemu_fopen_socket(int fd, const char *mode)
     return s->file;
 }
 
+static int qemu_rdma_close(void *opaque)
+{
+    QEMUFileRDMA *r = opaque;
+    if(r->rdma) {
+        qemu_rdma_cleanup(r->rdma);
+        g_free(r->rdma);
+    }
+    g_free(r);
+    return 0;
+}
+
+void * migrate_use_rdma(QEMUFile *f)
+{
+    QEMUFileRDMA *r = f->opaque;
+
+    return qemu_rdma_enabled(r->rdma) ? r->rdma : NULL;
+}
+
+static int qemu_rdma_drain_completion(void *opaque)
+{
+    QEMUFileRDMA *r = opaque;
+    r->len = 0;
+    return qemu_rdma_drain_cq(r->rdma);
+}
+
+static const QEMUFileOps rdma_read_ops = {
+    .get_buffer = qemu_rdma_get_buffer,
+    .close =      qemu_rdma_close,
+};
+
+static const QEMUFileOps rdma_write_ops = {
+    .put_buffer = qemu_rdma_put_buffer,
+    .close =      qemu_rdma_close,
+    .drain =     qemu_rdma_drain_completion,
+};
+
+QEMUFile *qemu_fopen_rdma(void *opaque, const char * mode)
+{
+    QEMUFileRDMA *r = g_malloc0(sizeof(QEMUFileRDMA));
+
+    if(qemu_mode_is_not_valid(mode))
+       return NULL;
+
+    r->rdma = opaque;
+
+    if (mode[0] == 'w') {
+        r->file = qemu_fopen_ops(r, &rdma_write_ops);
+    } else {
+        r->file = qemu_fopen_ops(r, &rdma_read_ops);
+    }
+
+    return r->file;
+}
+
 QEMUFile *qemu_fopen(const char *filename, const char *mode)
 {
     QEMUFileStdio *s;
 
-    if (mode == NULL ||
-       (mode[0] != 'r' && mode[0] != 'w') ||
-       mode[1] != 'b' || mode[2] != 0) {
-        fprintf(stderr, "qemu_fopen: Argument validity check failed\n");
-        return NULL;
-    }
+    if(qemu_mode_is_not_valid(mode))
+       return NULL;
 
     s = g_malloc0(sizeof(QEMUFileStdio));
 
@@ -497,6 +623,24 @@ static void qemu_file_set_error(QEMUFile *f, int ret)
     }
 }
 
+/*
+ * Called only for RDMA right now at the end 
+ * of each live iteration of memory.
+ *
+ * 'drain' from a QEMUFile perspective means
+ * to flush the outbound send buffer
+ * (if one exists). 
+ *
+ * For RDMA, this means to make sure we've
+ * received completion queue (CQ) messages
+ * successfully for all of the RDMA writes
+ * that we requested.
+ */ 
+int qemu_drain(QEMUFile *f)
+{
+    return f->ops->drain ? f->ops->drain(f->opaque) : 0;
+}
+
 /** Flushes QEMUFile buffer
  *
  */
@@ -723,6 +867,8 @@ int qemu_get_byte(QEMUFile *f)
 int64_t qemu_ftell(QEMUFile *f)
 {
     qemu_fflush(f);
+    if(migrate_use_rdma(f))
+       return delta_norm_mig_bytes_transferred();
     return f->pos;
 }
 
@@ -1737,6 +1883,12 @@ void qemu_savevm_state_complete(QEMUFile *f)
         }
     }
 
+    if ((ret = qemu_drain(f)) < 0) {
+       fprintf(stderr, "failed to drain RDMA first!\n");
+        qemu_file_set_error(f, ret);
+       return;
+    }
+
     QTAILQ_FOREACH(se, &savevm_handlers, entry) {
         int len;
 
-- 
1.7.10.4




reply via email to

[Prev in Thread] Current Thread [Next in Thread]