qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [RFC PATCH v2 1/3] separate thread for VM migration


From: Umesh Deshpande
Subject: [Qemu-devel] [RFC PATCH v2 1/3] separate thread for VM migration
Date: Fri, 29 Jul 2011 16:57:24 -0400

This patch creates a separate thread for the guest migration on the source side.

Signed-off-by: Umesh Deshpande <address@hidden>
---
 buffered_file.c |   28 ++++++++++++++++---------
 buffered_file.h |    4 +++
 migration.c     |   59 +++++++++++++++++++++++++++++++++++++++++++-----------
 migration.h     |    3 ++
 savevm.c        |   22 +-------------------
 savevm.h        |   29 +++++++++++++++++++++++++++
 6 files changed, 102 insertions(+), 43 deletions(-)
 create mode 100644 savevm.h

diff --git a/buffered_file.c b/buffered_file.c
index 41b42c3..d4146bf 100644
--- a/buffered_file.c
+++ b/buffered_file.c
@@ -16,12 +16,16 @@
 #include "qemu-timer.h"
 #include "qemu-char.h"
 #include "buffered_file.h"
+#include "migration.h"
+#include "savevm.h"
+#include "qemu-thread.h"
 
 //#define DEBUG_BUFFERED_FILE
 
 typedef struct QEMUFileBuffered
 {
     BufferedPutFunc *put_buffer;
+    BufferedBeginFunc *begin;
     BufferedPutReadyFunc *put_ready;
     BufferedWaitForUnfreezeFunc *wait_for_unfreeze;
     BufferedCloseFunc *close;
@@ -35,6 +39,7 @@ typedef struct QEMUFileBuffered
     size_t buffer_size;
     size_t buffer_capacity;
     QEMUTimer *timer;
+    QemuThread thread;
 } QEMUFileBuffered;
 
 #ifdef DEBUG_BUFFERED_FILE
@@ -181,8 +186,6 @@ static int buffered_close(void *opaque)
 
     ret = s->close(s->opaque);
 
-    qemu_del_timer(s->timer);
-    qemu_free_timer(s->timer);
     qemu_free(s->buffer);
     qemu_free(s);
 
@@ -228,17 +231,15 @@ static int64_t buffered_get_rate_limit(void *opaque)
     return s->xfer_limit;
 }
 
-static void buffered_rate_tick(void *opaque)
+void buffered_rate_tick(QEMUFile *file)
 {
-    QEMUFileBuffered *s = opaque;
+    QEMUFileBuffered *s = file->opaque;
 
     if (s->has_error) {
         buffered_close(s);
         return;
     }
 
-    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
-
     if (s->freeze_output)
         return;
 
@@ -250,9 +251,17 @@ static void buffered_rate_tick(void *opaque)
     s->put_ready(s->opaque);
 }
 
+static void *migrate_vm(void *opaque)
+{
+    QEMUFileBuffered *s = opaque;
+    s->begin(s->opaque);
+    return NULL;
+}
+
 QEMUFile *qemu_fopen_ops_buffered(void *opaque,
                                   size_t bytes_per_sec,
                                   BufferedPutFunc *put_buffer,
+                                  BufferedBeginFunc *begin,
                                   BufferedPutReadyFunc *put_ready,
                                   BufferedWaitForUnfreezeFunc 
*wait_for_unfreeze,
                                   BufferedCloseFunc *close)
@@ -264,6 +273,7 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque,
     s->opaque = opaque;
     s->xfer_limit = bytes_per_sec / 10;
     s->put_buffer = put_buffer;
+    s->begin = begin;
     s->put_ready = put_ready;
     s->wait_for_unfreeze = wait_for_unfreeze;
     s->close = close;
@@ -271,11 +281,9 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque,
     s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
                              buffered_close, buffered_rate_limit,
                              buffered_set_rate_limit,
-                            buffered_get_rate_limit);
-
-    s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
+                             buffered_get_rate_limit);
 
-    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+    qemu_thread_create(&s->thread, migrate_vm, s);
 
     return s->file;
 }
diff --git a/buffered_file.h b/buffered_file.h
index 98d358b..cfe2833 100644
--- a/buffered_file.h
+++ b/buffered_file.h
@@ -17,12 +17,16 @@
 #include "hw/hw.h"
 
 typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size);
+typedef void (BufferedBeginFunc)(void *opaque);
 typedef void (BufferedPutReadyFunc)(void *opaque);
 typedef void (BufferedWaitForUnfreezeFunc)(void *opaque);
 typedef int (BufferedCloseFunc)(void *opaque);
 
+void buffered_rate_tick(QEMUFile *file);
+
 QEMUFile *qemu_fopen_ops_buffered(void *opaque, size_t xfer_limit,
                                   BufferedPutFunc *put_buffer,
+                                  BufferedBeginFunc *begin,
                                   BufferedPutReadyFunc *put_ready,
                                   BufferedWaitForUnfreezeFunc 
*wait_for_unfreeze,
                                   BufferedCloseFunc *close);
diff --git a/migration.c b/migration.c
index af3a1f2..bf86067 100644
--- a/migration.c
+++ b/migration.c
@@ -31,6 +31,8 @@
     do { } while (0)
 #endif
 
+static int64_t expire_time;
+
 /* Migration speed throttling */
 static int64_t max_throttle = (32 << 20);
 
@@ -284,8 +286,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
 {
     int ret = 0;
 
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-
     if (s->file) {
         DPRINTF("closing file\n");
         if (qemu_fclose(s->file) != 0) {
@@ -310,8 +310,7 @@ int migrate_fd_cleanup(FdMigrationState *s)
 void migrate_fd_put_notify(void *opaque)
 {
     FdMigrationState *s = opaque;
-
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
+    s->callback = NULL;
     qemu_file_put_notify(s->file);
 }
 
@@ -328,7 +327,7 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void 
*data, size_t size)
         ret = -(s->get_error(s));
 
     if (ret == -EAGAIN) {
-        qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s);
+        s->callback = migrate_fd_put_notify;
     } else if (ret < 0) {
         if (s->mon) {
             monitor_resume(s->mon);
@@ -342,27 +341,66 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void 
*data, size_t size)
 
 void migrate_fd_connect(FdMigrationState *s)
 {
-    int ret;
-
+    s->callback = NULL;
     s->file = qemu_fopen_ops_buffered(s,
                                       s->bandwidth_limit,
                                       migrate_fd_put_buffer,
+                                      migrate_fd_begin,
                                       migrate_fd_put_ready,
                                       migrate_fd_wait_for_unfreeze,
                                       migrate_fd_close);
+}
+
+static int migrate_fd_check_expire(void)
+{
+    int64_t current_time = qemu_get_clock_ms(rt_clock);
+
+    if (expire_time > current_time) {
+        return 0;
+    } else {
+        expire_time = qemu_get_clock_ms(rt_clock) + 100;
+        return 1;
+    }
+}
+
+void migrate_fd_begin(void *arg)
+{
+    FdMigrationState *s = arg;
+    int ret;
 
+    qemu_mutex_lock_iothread();
     DPRINTF("beginning savevm\n");
     ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
                                   s->mig_state.shared);
     if (ret < 0) {
         DPRINTF("failed, %d\n", ret);
         migrate_fd_error(s);
-        return;
+        goto out;
     }
-    
+
+    expire_time = qemu_get_clock_ms(rt_clock) + 100;
     migrate_fd_put_ready(s);
+
+    while (s->state == MIG_STATE_ACTIVE) {
+        if (migrate_fd_check_expire()) {
+            buffered_rate_tick(s->file);
+        }
+
+        if (s->state != MIG_STATE_ACTIVE) {
+            break;
+        }
+
+        if (s->callback) {
+            migrate_fd_wait_for_unfreeze(s);
+            s->callback(s);
+        }
+    }
+
+out:
+    qemu_mutex_unlock_iothread();
 }
 
+
 void migrate_fd_put_ready(void *opaque)
 {
     FdMigrationState *s = opaque;
@@ -376,8 +414,6 @@ void migrate_fd_put_ready(void *opaque)
     if (qemu_savevm_state_iterate(s->mon, s->file) == 1) {
         int state;
         int old_vm_running = vm_running;
-
-        DPRINTF("done iterating\n");
         vm_stop(VMSTOP_MIGRATE);
 
         if ((qemu_savevm_state_complete(s->mon, s->file)) < 0) {
@@ -458,7 +494,6 @@ int migrate_fd_close(void *opaque)
 {
     FdMigrationState *s = opaque;
 
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
     return s->close(s);
 }
 
diff --git a/migration.h b/migration.h
index 050c56c..8ed34ab 100644
--- a/migration.h
+++ b/migration.h
@@ -48,6 +48,7 @@ struct FdMigrationState
     int (*get_error)(struct FdMigrationState*);
     int (*close)(struct FdMigrationState*);
     int (*write)(struct FdMigrationState*, const void *, size_t);
+    void (*callback)(void *);
     void *opaque;
 };
 
@@ -118,6 +119,8 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void 
*data, size_t size);
 
 void migrate_fd_connect(FdMigrationState *s);
 
+void migrate_fd_begin(void *opaque);
+
 void migrate_fd_put_ready(void *opaque);
 
 int migrate_fd_get_status(MigrationState *mig_state);
diff --git a/savevm.c b/savevm.c
index 8139bc7..4859a34 100644
--- a/savevm.c
+++ b/savevm.c
@@ -82,6 +82,7 @@
 #include "qemu_socket.h"
 #include "qemu-queue.h"
 #include "cpus.h"
+#include "savevm.h"
 
 #define SELF_ANNOUNCE_ROUNDS 5
 
@@ -155,27 +156,6 @@ void qemu_announce_self(void)
 /***********************************************************/
 /* savevm/loadvm support */
 
-#define IO_BUF_SIZE 32768
-
-struct QEMUFile {
-    QEMUFilePutBufferFunc *put_buffer;
-    QEMUFileGetBufferFunc *get_buffer;
-    QEMUFileCloseFunc *close;
-    QEMUFileRateLimit *rate_limit;
-    QEMUFileSetRateLimit *set_rate_limit;
-    QEMUFileGetRateLimit *get_rate_limit;
-    void *opaque;
-    int is_write;
-
-    int64_t buf_offset; /* start of buffer when writing, end of buffer
-                           when reading */
-    int buf_index;
-    int buf_size; /* 0 when writing */
-    uint8_t buf[IO_BUF_SIZE];
-
-    int has_error;
-};
-
 typedef struct QEMUFileStdio
 {
     FILE *stdio_file;
diff --git a/savevm.h b/savevm.h
new file mode 100644
index 0000000..954eded
--- /dev/null
+++ b/savevm.h
@@ -0,0 +1,29 @@
+#ifndef QEMU_SAVEVM_H
+#define QEMU_SAVEVM_H
+
+#include "hw/hw.h"
+
+#define IO_BUF_SIZE 32768
+
+struct QEMUFile {
+    QEMUFilePutBufferFunc *put_buffer;
+    QEMUFileGetBufferFunc *get_buffer;
+    QEMUFileCloseFunc *close;
+    QEMUFileRateLimit *rate_limit;
+    QEMUFileSetRateLimit *set_rate_limit;
+    QEMUFileGetRateLimit *get_rate_limit;
+    void *opaque;
+    int is_write;
+
+    int64_t buf_offset; /* start of buffer when writing, end of buffer
+                           when reading */
+    int buf_index;
+    int buf_size; /* 0 when
+     * writing
+     * */
+    uint8_t buf[IO_BUF_SIZE];
+
+    int has_error;
+};
+
+#endif
-- 
1.7.4.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]