qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH v3 4/4] Curling: the receiver


From: Jules Wang
Subject: [Qemu-devel] [PATCH v3 4/4] Curling: the receiver
Date: Tue, 15 Oct 2013 15:26:23 +0800

The receiver does migration loop until the migration connection is
lost. Then, it is started as a backup.

The receiver does not load vm state once the migration begins.
Instead, it perfetches one whole migration data into a buffer,
then loads vm state from that buffer afterwards.

Signed-off-by: Jules Wang <address@hidden>
---
 include/migration/qemu-file.h |   1 +
 include/sysemu/sysemu.h       |   2 +
 migration.c                   |  22 ++++--
 savevm.c                      | 158 ++++++++++++++++++++++++++++++++++++++++--
 4 files changed, 173 insertions(+), 10 deletions(-)

diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
index 0f757fb..f01ff10 100644
--- a/include/migration/qemu-file.h
+++ b/include/migration/qemu-file.h
@@ -92,6 +92,7 @@ typedef struct QEMUFileOps {
     QEMURamHookFunc *after_ram_iterate;
     QEMURamHookFunc *hook_ram_load;
     QEMURamSaveFunc *save_page;
+    QEMUFileGetBufferFunc *get_prefetch_buffer;
 } QEMUFileOps;
 
 QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops);
diff --git a/include/sysemu/sysemu.h b/include/sysemu/sysemu.h
index 31d5e3f..e94193c 100644
--- a/include/sysemu/sysemu.h
+++ b/include/sysemu/sysemu.h
@@ -87,6 +87,8 @@ void qemu_savevm_state_complete(QEMUFile *f,
 void qemu_savevm_state_cancel(void);
 uint64_t qemu_savevm_state_pending(QEMUFile *f, uint64_t max_size);
 int qemu_loadvm_state(QEMUFile *f);
+int qemu_loadvm_state_ft(QEMUFile *f);
+bool is_ft_migration(QEMUFile *f);
 
 /* SLIRP */
 void do_info_slirp(Monitor *mon);
diff --git a/migration.c b/migration.c
index 28acd05..e0734a7 100644
--- a/migration.c
+++ b/migration.c
@@ -19,6 +19,7 @@
 #include "monitor/monitor.h"
 #include "migration/qemu-file.h"
 #include "sysemu/sysemu.h"
+#include "sysemu/cpus.h"
 #include "block/block.h"
 #include "qemu/sockets.h"
 #include "migration/block.h"
@@ -101,13 +102,24 @@ static void process_incoming_migration_co(void *opaque)
 {
     QEMUFile *f = opaque;
     int ret;
+    int count = 0;
 
-    ret = qemu_loadvm_state(f);
-    qemu_fclose(f);
-    if (ret < 0) {
-        fprintf(stderr, "load of migration failed\n");
-        exit(EXIT_FAILURE);
+    if (is_ft_migration(f)) {
+        while (qemu_loadvm_state_ft(f) >= 0) {
+            count++;
+            DPRINTF("incoming count %d\r", count);
+        }
+        qemu_fclose(f);
+        DPRINTF("ft connection lost, launching self..\n");
+    } else {
+        ret = qemu_loadvm_state(f);
+        qemu_fclose(f);
+        if (ret < 0) {
+            fprintf(stderr, "load of migration failed\n");
+            exit(EXIT_FAILURE);
+        }
     }
+    cpu_synchronize_all_post_init();
     qemu_announce_self();
     DPRINTF("successfully loaded vm state\n");
 
diff --git a/savevm.c b/savevm.c
index e75d5d4..611fda2 100644
--- a/savevm.c
+++ b/savevm.c
@@ -52,6 +52,8 @@
 #define ARP_PTYPE_IP 0x0800
 #define ARP_OP_REQUEST_REV 0x3
 
+#define PREFETCH_BUFFER_SIZE 0x010000
+
 static int announce_self_create(uint8_t *buf,
                                uint8_t *mac_addr)
 {
@@ -135,6 +137,10 @@ struct QEMUFile {
     unsigned int iovcnt;
 
     int last_error;
+
+    uint8_t *prefetch_buf;
+    uint64_t prefetch_buf_index;
+    uint64_t prefetch_buf_size;
 };
 
 typedef struct QEMUFileStdio
@@ -193,6 +199,25 @@ static int socket_get_buffer(void *opaque, uint8_t *buf, 
int64_t pos, int size)
     return len;
 }
 
+static int socket_get_prefetch_buffer(void *opaque, uint8_t *buf,
+                                      int64_t pos, int size)
+{
+    QEMUFile *f = opaque;
+
+    if (f->prefetch_buf_size - pos <= 0) {
+        return 0;
+    }
+
+    if (f->prefetch_buf_size - pos < size) {
+        size = f->prefetch_buf_size - pos;
+    }
+
+    memcpy(buf, f->prefetch_buf + pos, size);
+
+    return size;
+}
+
+
 static int socket_close(void *opaque)
 {
     QEMUFileSocket *s = opaque;
@@ -440,6 +465,7 @@ QEMUFile *qemu_fdopen(int fd, const char *mode)
 static const QEMUFileOps socket_read_ops = {
     .get_fd =     socket_get_fd,
     .get_buffer = socket_get_buffer,
+    .get_prefetch_buffer = socket_get_prefetch_buffer,
     .close =      socket_close
 };
 
@@ -746,6 +772,8 @@ int qemu_fclose(QEMUFile *f)
     if (f->last_error) {
         ret = f->last_error;
     }
+
+    g_free(f->prefetch_buf);
     g_free(f);
     return ret;
 }
@@ -829,6 +857,14 @@ void qemu_put_byte(QEMUFile *f, int v)
 
 static void qemu_file_skip(QEMUFile *f, int size)
 {
+    if (f->prefetch_buf_index + size <= f->prefetch_buf_size) {
+        f->prefetch_buf_index += size;
+        return;
+    } else {
+        size -= f->prefetch_buf_size - f->prefetch_buf_index;
+        f->prefetch_buf_index = f->prefetch_buf_size;
+    }
+
     if (f->buf_index + size <= f->buf_size) {
         f->buf_index += size;
     }
@@ -838,6 +874,23 @@ static int qemu_peek_buffer(QEMUFile *f, uint8_t *buf, int 
size, size_t offset)
 {
     int pending;
     int index;
+    int done;
+
+    if (f->ops->get_prefetch_buffer) {
+        if (f->prefetch_buf_index + offset < f->prefetch_buf_size) {
+            done = f->ops->get_prefetch_buffer(f,
+                                               buf,
+                                               f->prefetch_buf_index + offset,
+                                               size);
+            if (done == size) {
+                return size;
+            }
+            size -= done;
+            buf  += done;
+        } else {
+            offset -= f->prefetch_buf_size - f->prefetch_buf_index;
+        }
+    }
 
     assert(!qemu_file_is_writable(f));
 
@@ -882,7 +935,15 @@ int qemu_get_buffer(QEMUFile *f, uint8_t *buf, int size)
 
 static int qemu_peek_byte(QEMUFile *f, int offset)
 {
-    int index = f->buf_index + offset;
+    int index;
+
+    if (f->prefetch_buf_index + offset < f->prefetch_buf_size) {
+        return f->prefetch_buf[f->prefetch_buf_index + offset];
+    } else {
+        offset -= f->prefetch_buf_size - f->prefetch_buf_index;
+    }
+
+    index = f->buf_index + offset;
 
     assert(!qemu_file_is_writable(f));
 
@@ -896,6 +957,16 @@ static int qemu_peek_byte(QEMUFile *f, int offset)
     return f->buf[index];
 }
 
+static unsigned int qemu_peek_be32(QEMUFile *f, int offset)
+{
+    unsigned int v;
+    v = qemu_peek_byte(f, offset) << 24;
+    v |= qemu_peek_byte(f, offset + 1) << 16;
+    v |= qemu_peek_byte(f, offset + 2) << 8;
+    v |= qemu_peek_byte(f, offset + 3);
+    return v;
+}
+
 int qemu_get_byte(QEMUFile *f)
 {
     int result;
@@ -983,7 +1054,6 @@ uint64_t qemu_get_be64(QEMUFile *f)
     return v;
 }
 
-
 /* timer */
 
 void timer_put(QEMUFile *f, QEMUTimer *ts)
@@ -2200,6 +2270,11 @@ static void vmstate_subsection_save(QEMUFile *f, const 
VMStateDescription *vmsd,
     }
 }
 
+bool is_ft_migration(QEMUFile *f)
+{
+    return (qemu_peek_be32(f, 0) == QEMU_VM_FILE_MAGIC_FT);
+}
+
 typedef struct LoadStateEntry {
     QLIST_ENTRY(LoadStateEntry) entry;
     SaveStateEntry *se;
@@ -2221,8 +2296,9 @@ int qemu_loadvm_state(QEMUFile *f)
     }
 
     v = qemu_get_be32(f);
-    if (v != QEMU_VM_FILE_MAGIC)
+    if (v != QEMU_VM_FILE_MAGIC && v != QEMU_VM_FILE_MAGIC_FT) {
         return -EINVAL;
+    }
 
     v = qemu_get_be32(f);
     if (v == QEMU_VM_FILE_VERSION_COMPAT) {
@@ -2309,8 +2385,6 @@ int qemu_loadvm_state(QEMUFile *f)
         }
     }
 
-    cpu_synchronize_all_post_init();
-
     ret = 0;
 
 out:
@@ -2326,6 +2400,79 @@ out:
     return ret;
 }
 
+int qemu_loadvm_state_ft(QEMUFile *f)
+{
+    int ret = 0;
+    int i   = 0;
+    int done = 0;
+    uint64_t size = 0;
+    uint64_t offset = 0;
+    uint8_t *prefetch_buf = NULL;
+    uint8_t *buf = NULL;
+
+    uint64_t max_mem = last_ram_offset() * 1.5;
+    uint64_t eof = htobe64((uint64_t)QEMU_VM_EOF_MAGIC << 32 |
+                                  QEMU_VM_FILE_MAGIC_FT);
+
+    if (!f->ops->get_prefetch_buffer) {
+        fprintf(stderr, "Fault tolerant is not supported by this protocol.\n");
+        return -EINVAL;
+    }
+
+    size = PREFETCH_BUFFER_SIZE;
+    prefetch_buf = g_malloc(size);
+
+    while (true) {
+        if (offset + TARGET_PAGE_SIZE >= size) {
+            if (size*2 > max_mem) {
+                fprintf(stderr, "qemu_loadvm_state_ft: warning:" \
+                       "Prefetch buffer becomes too large.\n" \
+                       "Fault tolerant is unstable when you see this,\n" \
+                       "please increase the bandwidth or increase " \
+                       "the max down time.\n");
+                break;
+            }
+            size = size * 2;
+            buf = g_try_realloc(prefetch_buf, size);
+            if (!buf) {
+                error_report("qemu_loadvm_state_ft: out of memory.\n");
+                g_free(prefetch_buf);
+                return -ENOMEM;
+            }
+
+            prefetch_buf = buf;
+        }
+
+        done = qemu_get_buffer(f, prefetch_buf + offset, TARGET_PAGE_SIZE);
+
+        ret = qemu_file_get_error(f);
+        if (ret != 0) {
+            g_free(prefetch_buf);
+            return ret;
+        }
+
+        buf = prefetch_buf + offset;
+        offset += done;
+        for (i = -7; i < done; i++) {
+            if (memcmp(buf + i, &eof, 8) == 0) {
+                goto out;
+            }
+        }
+    }
+ out:
+    g_free(f->prefetch_buf);
+    f->prefetch_buf_size = offset;
+    f->prefetch_buf_index = 0;
+    f->prefetch_buf = prefetch_buf;
+
+    ret = qemu_loadvm_state(f);
+
+    /* Skip magic number */
+    qemu_get_be32(f);
+
+    return ret;
+}
+
 static BlockDriverState *find_vmstate_bs(void)
 {
     BlockDriverState *bs = NULL;
@@ -2437,6 +2584,7 @@ void do_savevm(Monitor *mon, const QDict *qdict)
         goto the_end;
     }
     ret = qemu_savevm_state(f);
+    cpu_synchronize_all_post_init();
     vm_state_size = qemu_ftell(f);
     qemu_fclose(f);
     if (ret < 0) {
-- 
1.8.0.1





reply via email to

[Prev in Thread] Current Thread [Next in Thread]