qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH COLO-Frame v6 06/31] COLO: Implement colo checkpoint


From: zhanghailiang
Subject: [Qemu-devel] [PATCH COLO-Frame v6 06/31] COLO: Implement colo checkpoint protocol
Date: Thu, 18 Jun 2015 16:58:30 +0800

We need communications protocol of user-defined to control the checkpoint
process.

The new checkpoint request is started by Primary VM, and the interactive process
like below:
Checkpoint synchronizing points,

                  Primary                 Secondary
  NEW             @
                                          Suspend
  SUSPENDED                               @
                  Suspend&Save state
  SEND            @
                  Send state              Receive state
  RECEIVED                                @
                  Flush network           Load state
  LOADED                                  @
                  Resume                  Resume

                  Start Comparing
NOTE:
 1) '@' who sends the message
 2) Every sync-point is synchronized by two sides with only
    one handshake(single direction) for low-latency.
    If more strict synchronization is required, a opposite direction
    sync-point should be added.
 3) Since sync-points are single direction, the remote side may
    go forward a lot when this side just receives the sync-point.

Signed-off-by: Yang Hongyang <address@hidden>
Signed-off-by: Lai Jiangshan <address@hidden>
Signed-off-by: zhanghailiang <address@hidden>
Signed-off-by: Li Zhijian <address@hidden>
Signed-off-by: Gonglei <address@hidden>
---
 migration/colo.c | 237 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 235 insertions(+), 2 deletions(-)

diff --git a/migration/colo.c b/migration/colo.c
index 45f9efd..0f7c36b 100644
--- a/migration/colo.c
+++ b/migration/colo.c
@@ -15,6 +15,41 @@
 #include "trace.h"
 #include "qemu/error-report.h"
 
+enum {
+    COLO_CHECPOINT_READY = 0x46,
+
+    /*
+    * Checkpoint synchronizing points.
+    *
+    *                  Primary                 Secondary
+    *  NEW             @
+    *                                          Suspend
+    *  SUSPENDED                               @
+    *                  Suspend&Save state
+    *  SEND            @
+    *                  Send state              Receive state
+    *  RECEIVED                                @
+    *                  Flush network           Load state
+    *  LOADED                                  @
+    *                  Resume                  Resume
+    *
+    *                  Start Comparing
+    * NOTE:
+    * 1) '@' who sends the message
+    * 2) Every sync-point is synchronized by two sides with only
+    *    one handshake(single direction) for low-latency.
+    *    If more strict synchronization is required, a opposite direction
+    *    sync-point should be added.
+    * 3) Since sync-points are single direction, the remote side may
+    *    go forward a lot when this side just receives the sync-point.
+    */
+    COLO_CHECKPOINT_NEW,
+    COLO_CHECKPOINT_SUSPENDED,
+    COLO_CHECKPOINT_SEND,
+    COLO_CHECKPOINT_RECEIVED,
+    COLO_CHECKPOINT_LOADED,
+};
+
 static QEMUBH *colo_bh;
 static Coroutine *colo;
 
@@ -34,19 +69,136 @@ bool loadvm_in_colo_state(void)
     return colo != NULL;
 }
 
+/* colo checkpoint control helper */
+static int colo_ctl_put(QEMUFile *f, uint64_t request)
+{
+    int ret = 0;
+
+    qemu_put_be64(f, request);
+    qemu_fflush(f);
+
+    ret = qemu_file_get_error(f);
+
+    return ret;
+}
+
+static int colo_ctl_get_value(QEMUFile *f, uint64_t *value)
+{
+    int ret = 0;
+    uint64_t temp;
+
+    temp = qemu_get_be64(f);
+
+    ret = qemu_file_get_error(f);
+    if (ret < 0) {
+        return -1;
+    }
+
+    *value = temp;
+    return 0;
+}
+
+static int colo_ctl_get(QEMUFile *f, uint64_t require)
+{
+    int ret;
+    uint64_t value;
+
+    ret = colo_ctl_get_value(f, &value);
+    if (ret < 0) {
+        return ret;
+    }
+
+    if (value != require) {
+        error_report("unexpected state! expected: %"PRIu64
+                     ", received: %"PRIu64, require, value);
+        exit(1);
+    }
+
+    return ret;
+}
+
+static int colo_do_checkpoint_transaction(MigrationState *s, QEMUFile *control)
+{
+    int ret;
+
+    ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW);
+    if (ret < 0) {
+        goto out;
+    }
+
+    ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED);
+    if (ret < 0) {
+        goto out;
+    }
+
+    /* TODO: suspend and save vm state to colo buffer */
+
+    ret = colo_ctl_put(s->file, COLO_CHECKPOINT_SEND);
+    if (ret < 0) {
+        goto out;
+    }
+
+    /* TODO: send vmstate to Secondary */
+
+    ret = colo_ctl_get(control, COLO_CHECKPOINT_RECEIVED);
+    if (ret < 0) {
+        goto out;
+    }
+    trace_colo_receive_message("COLO_CHECKPOINT_RECEIVED");
+
+    ret = colo_ctl_get(control, COLO_CHECKPOINT_LOADED);
+    if (ret < 0) {
+        goto out;
+    }
+    trace_colo_receive_message("COLO_CHECKPOINT_LOADED");
+
+    /* TODO: resume Primary */
+
+out:
+    return ret;
+}
+
 static void *colo_thread(void *opaque)
 {
     MigrationState *s = opaque;
+    QEMUFile *colo_control = NULL;
+    int ret;
+
+    colo_control = qemu_fopen_socket(qemu_get_fd(s->file), "rb");
+    if (!colo_control) {
+        error_report("Open colo_control failed!");
+        goto out;
+    }
+
+    /*
+     * Wait for Secondary finish loading vm states and enter COLO
+     * restore.
+     */
+    ret = colo_ctl_get(colo_control, COLO_CHECPOINT_READY);
+    if (ret < 0) {
+        goto out;
+    }
+    trace_colo_receive_message("COLO_CHECPOINT_READY");
 
     qemu_mutex_lock_iothread();
     vm_start();
     qemu_mutex_unlock_iothread();
     trace_colo_vm_state_change("stop", "run");
 
-    /*TODO: COLO checkpoint savevm loop*/
+    while (s->state == MIGRATION_STATUS_COLO) {
+        /* start a colo checkpoint */
+        if (colo_do_checkpoint_transaction(s, colo_control)) {
+            goto out;
+        }
+    }
 
+out:
     migrate_set_state(s, MIGRATION_STATUS_COLO, MIGRATION_STATUS_COMPLETED);
 
+    if (colo_control) {
+        qemu_fclose(colo_control);
+    }
+
     qemu_mutex_lock_iothread();
     qemu_bh_schedule(s->cleanup_bh);
     qemu_mutex_unlock_iothread();
@@ -79,14 +231,95 @@ void colo_init_checkpointer(MigrationState *s)
     qemu_bh_schedule(colo_bh);
 }
 
+/*
+ * return:
+ * 0: start a checkpoint
+ * -1: some error happened, exit colo restore
+ */
+static int colo_wait_handle_cmd(QEMUFile *f, int *checkpoint_request)
+{
+    int ret;
+    uint64_t cmd;
+
+    ret = colo_ctl_get_value(f, &cmd);
+    if (ret < 0) {
+        return -1;
+    }
+
+    switch (cmd) {
+    case COLO_CHECKPOINT_NEW:
+        *checkpoint_request = 1;
+        return 0;
+    default:
+        return -1;
+    }
+}
+
 void *colo_process_incoming_checkpoints(void *opaque)
 {
+    struct colo_incoming *colo_in = opaque;
+    QEMUFile *f = colo_in->file;
+    int fd = qemu_get_fd(f);
+    QEMUFile *ctl = NULL;
+    int ret;
     colo = qemu_coroutine_self();
     assert(colo != NULL);
 
-    /* TODO: COLO checkpoint restore loop */
+    ctl = qemu_fopen_socket(fd, "wb");
+    if (!ctl) {
+        error_report("Can't open incoming channel!");
+        goto out;
+    }
+    ret = colo_ctl_put(ctl, COLO_CHECPOINT_READY);
+    if (ret < 0) {
+        goto out;
+    }
+    /* TODO: in COLO mode, Secondary is runing, so start the vm */
+    while (true) {
+        int request = 0;
+        int ret = colo_wait_handle_cmd(f, &request);
+
+        if (ret < 0) {
+            break;
+        } else {
+            if (!request) {
+                continue;
+            }
+        }
 
+        /* TODO: suspend guest */
+        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_SUSPENDED);
+        if (ret < 0) {
+            goto out;
+        }
+
+        ret = colo_ctl_get(f, COLO_CHECKPOINT_SEND);
+        if (ret < 0) {
+            goto out;
+        }
+        trace_colo_receive_message("COLO_CHECKPOINT_SEND");
+
+        /* TODO: read migration data into colo buffer */
+
+        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_RECEIVED);
+        if (ret < 0) {
+            goto out;
+        }
+        trace_colo_receive_message("COLO_CHECKPOINT_RECEIVED");
+
+        /* TODO: load vm state */
+
+        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_LOADED);
+        if (ret < 0) {
+            goto out;
+        }
+}
+
+out:
     colo = NULL;
+    if (ctl) {
+        qemu_fclose(ctl);
+    }
     loadvm_exit_colo();
 
     return NULL;
-- 
1.7.12.4





reply via email to

[Prev in Thread] Current Thread [Next in Thread]