qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [RFC PATCH v2 11/23] COLO ctl: implement colo checkpoint pr


From: Yang Hongyang
Subject: [Qemu-devel] [RFC PATCH v2 11/23] COLO ctl: implement colo checkpoint protocol
Date: Tue, 23 Sep 2014 17:23:43 +0800

implement colo checkpoint protocol.

Checkpoint synchronzing points.

                  Primary                 Secondary
  NEW             @
                                          Suspend
  SUSPENDED                               @
                  Suspend&Save state
  SEND            @
                  Send state              Receive state
  RECEIVED                                @
                  Flush network           Load state
  LOADED                                  @
                  Resume                  Resume

                  Start Comparing
NOTE:
 1) '@' who sends the message
 2) Every sync-point is synchronized by two sides with only
    one handshake(single direction) for low-latency.
    If more strict synchronization is required, a opposite direction
    sync-point should be added.
 3) Since sync-points are single direction, the remote side may
    go forward a lot when this side just receives the sync-point.

Signed-off-by: Yang Hongyang <address@hidden>
---
 migration-colo.c | 250 +++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 244 insertions(+), 6 deletions(-)

diff --git a/migration-colo.c b/migration-colo.c
index 37003f5..2e478e9 100644
--- a/migration-colo.c
+++ b/migration-colo.c
@@ -24,6 +24,41 @@
  */
 #define CHKPOINT_TIMER 10000
 
+enum {
+    COLO_READY = 0x46,
+
+    /*
+     * Checkpoint synchronzing points.
+     *
+     *                  Primary                 Secondary
+     *  NEW             @
+     *                                          Suspend
+     *  SUSPENDED                               @
+     *                  Suspend&Save state
+     *  SEND            @
+     *                  Send state              Receive state
+     *  RECEIVED                                @
+     *                  Flush network           Load state
+     *  LOADED                                  @
+     *                  Resume                  Resume
+     *
+     *                  Start Comparing
+     * NOTE:
+     * 1) '@' who sends the message
+     * 2) Every sync-point is synchronized by two sides with only
+     *    one handshake(single direction) for low-latency.
+     *    If more strict synchronization is required, a opposite direction
+     *    sync-point should be added.
+     * 3) Since sync-points are single direction, the remote side may
+     *    go forward a lot when this side just receives the sync-point.
+     */
+    COLO_CHECKPOINT_NEW,
+    COLO_CHECKPOINT_SUSPENDED,
+    COLO_CHECKPOINT_SEND,
+    COLO_CHECKPOINT_RECEIVED,
+    COLO_CHECKPOINT_LOADED,
+};
+
 static QEMUBH *colo_bh;
 
 bool colo_supported(void)
@@ -81,28 +116,159 @@ static __attribute__((unused)) int 
colo_compare_resume(void)
     return ioctl(comp_fd, COMP_IOCTRESUME, 1);
 }
 
+/* colo checkpoint control helper */
+static bool colo_is_master(void);
+static bool colo_is_slave(void);
+
+static void ctl_error_handler(void *opaque, int err)
+{
+    if (colo_is_slave()) {
+        /* TODO: determine whether we need to failover */
+        /* FIXME: we will not failover currently, just kill slave */
+        error_report("error: colo transmission failed!");
+        exit(1);
+    } else if (colo_is_master()) {
+        /* Master still alive, do not failover */
+        error_report("error: colo transmission failed!");
+        return;
+    } else {
+        error_report("COLO: Unexpected error happend!");
+        exit(EXIT_FAILURE);
+    }
+}
+
+static int colo_ctl_put(QEMUFile *f, uint64_t request)
+{
+    int ret = 0;
+
+    qemu_put_be64(f, request);
+    qemu_fflush(f);
+
+    ret = qemu_file_get_error(f);
+    if (ret < 0) {
+        ctl_error_handler(f, ret);
+        return 1;
+    }
+
+    return ret;
+}
+
+static int colo_ctl_get_value(QEMUFile *f, uint64_t *value)
+{
+    int ret = 0;
+    uint64_t temp;
+
+    temp = qemu_get_be64(f);
+
+    ret = qemu_file_get_error(f);
+    if (ret < 0) {
+        ctl_error_handler(f, ret);
+        return 1;
+    }
+
+    *value = temp;
+    return 0;
+}
+
+static int colo_ctl_get(QEMUFile *f, uint64_t require)
+{
+    int ret;
+    uint64_t value;
+
+    ret = colo_ctl_get_value(f, &value);
+    if (ret) {
+        return ret;
+    }
+
+    if (value != require) {
+        error_report("unexpected state! expected: %"PRIu64
+                     ", received: %"PRIu64, require, value);
+        exit(1);
+    }
+
+    return ret;
+}
+
 /* save */
 
-static __attribute__((unused)) bool colo_is_master(void)
+static bool colo_is_master(void)
 {
     MigrationState *s = migrate_get_current();
     return (s->state == MIG_STATE_COLO);
 }
 
+static int do_colo_transaction(MigrationState *s, QEMUFile *control)
+{
+    int ret;
+
+    ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW);
+    if (ret) {
+        goto out;
+    }
+
+    ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED);
+    if (ret) {
+        goto out;
+    }
+
+    /* TODO: suspend and save vm state to colo buffer */
+
+    ret = colo_ctl_put(s->file, COLO_CHECKPOINT_SEND);
+    if (ret) {
+        goto out;
+    }
+
+    /* TODO: send vmstate to slave */
+
+    ret = colo_ctl_get(control, COLO_CHECKPOINT_RECEIVED);
+    if (ret) {
+        goto out;
+    }
+
+    /* TODO: Flush network etc. */
+
+    ret = colo_ctl_get(control, COLO_CHECKPOINT_LOADED);
+    if (ret) {
+        goto out;
+    }
+
+    /* TODO: resume master */
+
+out:
+    return ret;
+}
+
 static void *colo_thread(void *opaque)
 {
     MigrationState *s = opaque;
     int dev_hotplug = qdev_hotplug, wait_cp = 0;
     int64_t start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
     int64_t current_time;
+    QEMUFile *colo_control = NULL;
+    int ret;
 
     if (colo_compare_init() < 0) {
         error_report("Init colo compare error");
         goto out;
     }
 
+    colo_control = qemu_fopen_socket(qemu_get_fd(s->file), "rb");
+    if (!colo_control) {
+        error_report("Open colo_control failed!");
+        goto out;
+    }
+
     qdev_hotplug = 0;
 
+    /*
+     * Wait for slave finish loading vm states and enter COLO
+     * restore.
+     */
+    ret = colo_ctl_get(colo_control, COLO_READY);
+    if (ret) {
+        goto out;
+    }
+
     while (s->state == MIG_STATE_COLO) {
         /* wait for a colo checkpoint */
         wait_cp = colo_compare();
@@ -124,12 +290,18 @@ static void *colo_thread(void *opaque)
 
         /* start a colo checkpoint */
 
-        /*TODO: COLO save */
+        if (do_colo_transaction(s, colo_control)) {
+            goto out;
+        }
 
         start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
     }
 
 out:
+    if (colo_control) {
+        qemu_fclose(colo_control);
+    }
+
     colo_compare_destroy();
 
     migrate_set_state(s, MIG_STATE_COLO, MIG_STATE_COMPLETED);
@@ -172,7 +344,7 @@ void colo_init_checkpointer(MigrationState *s)
 
 static Coroutine *colo;
 
-static __attribute__((unused)) bool colo_is_slave(void)
+static bool colo_is_slave(void)
 {
     return colo != NULL;
 }
@@ -184,13 +356,32 @@ static __attribute__((unused)) bool colo_is_slave(void)
  */
 static int slave_wait_new_checkpoint(QEMUFile *f)
 {
-    /* TODO: wait checkpoint start command from master */
-    return 1;
+    int fd = qemu_get_fd(f);
+    int ret;
+    uint64_t cmd;
+
+    yield_until_fd_readable(fd);
+
+    ret = colo_ctl_get_value(f, &cmd);
+    if (ret) {
+        return 1;
+    }
+
+    if (cmd == COLO_CHECKPOINT_NEW) {
+        return 0;
+    } else {
+        /* Unexpected data received */
+        ctl_error_handler(f, ret);
+        return 1;
+    }
 }
 
 void colo_process_incoming_checkpoints(QEMUFile *f)
 {
+    int fd = qemu_get_fd(f);
     int dev_hotplug = qdev_hotplug;
+    QEMUFile *ctl = NULL;
+    int ret;
 
     if (!restore_use_colo()) {
         return;
@@ -201,15 +392,62 @@ void colo_process_incoming_checkpoints(QEMUFile *f)
     colo = qemu_coroutine_self();
     assert(colo != NULL);
 
+    ctl = qemu_fopen_socket(fd, "wb");
+    if (!ctl) {
+        error_report("Can't open incoming channel!");
+        goto out;
+    }
+
+    ret = colo_ctl_put(ctl, COLO_READY);
+    if (ret) {
+        goto out;
+    }
+
+    /* TODO: in COLO mode, slave is runing, so start the vm */
+
     while (true) {
         if (slave_wait_new_checkpoint(f)) {
             break;
         }
 
-        /* TODO: COLO restore */
+        /* start colo checkpoint */
+
+        /* TODO: suspend guest */
+
+        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_SUSPENDED);
+        if (ret) {
+            goto out;
+        }
+
+        ret = colo_ctl_get(f, COLO_CHECKPOINT_SEND);
+        if (ret) {
+            goto out;
+        }
+
+        /* TODO: read migration data into colo buffer */
+
+        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_RECEIVED);
+        if (ret) {
+            goto out;
+        }
+
+        /* TODO: load vm state */
+
+        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_LOADED);
+        if (ret) {
+            goto out;
+        }
+
+        /* TODO: resume guest */
     }
 
+out:
     colo = NULL;
+
+    if (ctl) {
+        qemu_fclose(ctl);
+    }
+
     restore_exit_colo();
 
     qdev_hotplug = dev_hotplug;
-- 
1.9.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]