qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH COLO-Frame v8 08/34] COLO: Implement colo checkpoint


From: zhanghailiang
Subject: [Qemu-devel] [PATCH COLO-Frame v8 08/34] COLO: Implement colo checkpoint protocol
Date: Wed, 29 Jul 2015 16:45:18 +0800

We need communications protocol of user-defined to control the checkpoint
process.

The new checkpoint request is started by Primary VM, and the interactive process
like below:
Checkpoint synchronizing points,

                  Primary                 Secondary
  NEW             @
                                          Suspend
  SUSPENDED                               @
                  Suspend&Save state
  SEND            @
                  Send state              Receive state
  RECEIVED                                @
                  Flush network           Load state
  LOADED                                  @
                  Resume                  Resume

                  Start Comparing
NOTE:
 1) '@' who sends the message
 2) Every sync-point is synchronized by two sides with only
    one handshake(single direction) for low-latency.
    If more strict synchronization is required, a opposite direction
    sync-point should be added.
 3) Since sync-points are single direction, the remote side may
    go forward a lot when this side just receives the sync-point.

Signed-off-by: Yang Hongyang <address@hidden>
Signed-off-by: zhanghailiang <address@hidden>
Signed-off-by: Li Zhijian <address@hidden>
Signed-off-by: Gonglei <address@hidden>
---
 migration/colo.c | 248 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 trace-events     |   3 +-
 2 files changed, 248 insertions(+), 3 deletions(-)

diff --git a/migration/colo.c b/migration/colo.c
index 364e0dd..4ba6f65 100644
--- a/migration/colo.c
+++ b/migration/colo.c
@@ -14,6 +14,55 @@
 #include "migration/colo.h"
 #include "trace.h"
 #include "qemu/error-report.h"
+#include "qemu/sockets.h"
+
+/* Fix me: Convert to use QAPI */
+typedef enum COLOCommand {
+    COLO_CHECPOINT_READY = 0x46,
+
+    /*
+    * Checkpoint synchronizing points.
+    *
+    *                  Primary                 Secondary
+    *  NEW             @
+    *                                          Suspend
+    *  SUSPENDED                               @
+    *                  Suspend&Save state
+    *  SEND            @
+    *                  Send state              Receive state
+    *  RECEIVED                                @
+    *                  Flush network           Load state
+    *  LOADED                                  @
+    *                  Resume                  Resume
+    *
+    *                  Start Comparing
+    * NOTE:
+    * 1) '@' who sends the message
+    * 2) Every sync-point is synchronized by two sides with only
+    *    one handshake(single direction) for low-latency.
+    *    If more strict synchronization is required, a opposite direction
+    *    sync-point should be added.
+    * 3) Since sync-points are single direction, the remote side may
+    *    go forward a lot when this side just receives the sync-point.
+    */
+    COLO_CHECKPOINT_NEW,
+    COLO_CHECKPOINT_SUSPENDED,
+    COLO_CHECKPOINT_SEND,
+    COLO_CHECKPOINT_RECEIVED,
+    COLO_CHECKPOINT_LOADED,
+
+    COLO_CHECKPOINT_MAX
+} COLOCommand;
+
+const char * const COLOCommand_lookup[] = {
+    [COLO_CHECPOINT_READY] = "checkpoint-ready",
+    [COLO_CHECKPOINT_NEW] = "checkpoint-new",
+    [COLO_CHECKPOINT_SUSPENDED] = "checkpoint-suspend",
+    [COLO_CHECKPOINT_SEND] = "checheckpoint-send",
+    [COLO_CHECKPOINT_RECEIVED] = "checkpoint-received",
+    [COLO_CHECKPOINT_LOADED] = "checkpoint-loaded",
+    [COLO_CHECKPOINT_MAX] = NULL,
+};
 
 static QEMUBH *colo_bh;
 
@@ -36,20 +85,137 @@ bool migration_incoming_in_colo_state(void)
     return (mis && (mis->state == MIGRATION_STATUS_COLO));
 }
 
+/* colo checkpoint control helper */
+static int colo_ctl_put(QEMUFile *f, uint64_t request)
+{
+    int ret = 0;
+
+    qemu_put_be64(f, request);
+    qemu_fflush(f);
+
+    ret = qemu_file_get_error(f);
+    if (request < COLO_CHECKPOINT_MAX) {
+        trace_colo_ctl_put(COLOCommand_lookup[request]);
+    }
+    return ret;
+}
+
+static int colo_ctl_get_value(QEMUFile *f, uint64_t *value)
+{
+    int ret = 0;
+    uint64_t temp;
+
+    temp = qemu_get_be64(f);
+
+    ret = qemu_file_get_error(f);
+    if (ret < 0) {
+        return -1;
+    }
+
+    *value = temp;
+    return 0;
+}
+
+static int colo_ctl_get(QEMUFile *f, uint64_t require)
+{
+    int ret;
+    uint64_t value;
+
+    ret = colo_ctl_get_value(f, &value);
+    if (ret < 0) {
+        return ret;
+    }
+
+    if (value != require) {
+        error_report("unexpected state! expected: %"PRIu64
+                     ", received: %"PRIu64, require, value);
+        exit(1);
+    }
+
+    trace_colo_ctl_get(COLOCommand_lookup[require]);
+    return ret;
+}
+
+static int colo_do_checkpoint_transaction(MigrationState *s, QEMUFile *control)
+{
+    int ret;
+
+    ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW);
+    if (ret < 0) {
+        goto out;
+    }
+
+    ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED);
+    if (ret < 0) {
+        goto out;
+    }
+
+    /* TODO: suspend and save vm state to colo buffer */
+
+    ret = colo_ctl_put(s->file, COLO_CHECKPOINT_SEND);
+    if (ret < 0) {
+        goto out;
+    }
+
+    /* TODO: send vmstate to Secondary */
+
+    ret = colo_ctl_get(control, COLO_CHECKPOINT_RECEIVED);
+    if (ret < 0) {
+        goto out;
+    }
+
+    ret = colo_ctl_get(control, COLO_CHECKPOINT_LOADED);
+    if (ret < 0) {
+        goto out;
+    }
+
+    /* TODO: resume Primary */
+
+out:
+    return ret;
+}
+
 static void *colo_thread(void *opaque)
 {
     MigrationState *s = opaque;
+    QEMUFile *colo_control = NULL;
+    int ret;
+
+    colo_control = qemu_fopen_socket(qemu_get_fd(s->file), "rb");
+    if (!colo_control) {
+        error_report("Open colo_control failed!");
+        goto out;
+    }
+
+    /*
+     * Wait for Secondary finish loading vm states and enter COLO
+     * restore.
+     */
+    ret = colo_ctl_get(colo_control, COLO_CHECPOINT_READY);
+    if (ret < 0) {
+        goto out;
+    }
 
     qemu_mutex_lock_iothread();
     vm_start();
     qemu_mutex_unlock_iothread();
     trace_colo_vm_state_change("stop", "run");
 
-    /*TODO: COLO checkpoint savevm loop*/
+    while (s->state == MIGRATION_STATUS_COLO) {
+        /* start a colo checkpoint */
+        if (colo_do_checkpoint_transaction(s, colo_control)) {
+            goto out;
+        }
+    }
 
+out:
     migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
                       MIGRATION_STATUS_COMPLETED);
 
+    if (colo_control) {
+        qemu_fclose(colo_control);
+    }
+
     qemu_mutex_lock_iothread();
     qemu_bh_schedule(s->cleanup_bh);
     qemu_mutex_unlock_iothread();
@@ -83,15 +249,93 @@ void colo_init_checkpointer(MigrationState *s)
     qemu_bh_schedule(colo_bh);
 }
 
+/*
+ * return:
+ * 0: start a checkpoint
+ * -1: some error happened, exit colo restore
+ */
+static int colo_wait_handle_cmd(QEMUFile *f, int *checkpoint_request)
+{
+    int ret;
+    uint64_t cmd;
+
+    ret = colo_ctl_get_value(f, &cmd);
+    if (ret < 0) {
+        return -1;
+    }
+
+    switch (cmd) {
+    case COLO_CHECKPOINT_NEW:
+        *checkpoint_request = 1;
+        return 0;
+    default:
+        return -1;
+    }
+}
+
 void *colo_process_incoming_checkpoints(void *opaque)
 {
     MigrationIncomingState *mis = opaque;
+    QEMUFile *f = mis->file;
+    int fd = qemu_get_fd(f);
+    QEMUFile *ctl = NULL;
+    int ret;
 
     migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
                       MIGRATION_STATUS_COLO);
 
-    /* TODO: COLO checkpoint restore loop */
+    ctl = qemu_fopen_socket(fd, "wb");
+    if (!ctl) {
+        error_report("Can't open incoming channel!");
+        goto out;
+    }
+    ret = colo_ctl_put(ctl, COLO_CHECPOINT_READY);
+    if (ret < 0) {
+        goto out;
+    }
+    /* TODO: in COLO mode, Secondary is runing, so start the vm */
+    while (mis->state == MIGRATION_STATUS_COLO) {
+        int request = 0;
+        int ret = colo_wait_handle_cmd(f, &request);
+
+        if (ret < 0) {
+            break;
+        } else {
+            if (!request) {
+                continue;
+            }
+        }
 
+        /* TODO: suspend guest */
+        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_SUSPENDED);
+        if (ret < 0) {
+            goto out;
+        }
+
+        ret = colo_ctl_get(f, COLO_CHECKPOINT_SEND);
+        if (ret < 0) {
+            goto out;
+        }
+
+        /* TODO: read migration data into colo buffer */
+
+        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_RECEIVED);
+        if (ret < 0) {
+            goto out;
+        }
+
+        /* TODO: load vm state */
+
+        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_LOADED);
+        if (ret < 0) {
+            goto out;
+        }
+}
+
+out:
+    if (ctl) {
+        qemu_fclose(ctl);
+    }
     migration_incoming_exit_colo();
 
     return NULL;
diff --git a/trace-events b/trace-events
index 025d71c..4487633 100644
--- a/trace-events
+++ b/trace-events
@@ -1473,7 +1473,8 @@ 
rdma_start_outgoing_migration_after_rdma_source_init(void) ""
 
 # migration/colo.c
 colo_vm_state_change(const char *old, const char *new) "Change '%s' => '%s'"
-colo_receive_message(const char *msg) "Receive '%s'"
+colo_ctl_put(const char *msg) "Send '%s'"
+colo_ctl_get(const char *msg) "Receive '%s'"
 
 # kvm-all.c
 kvm_ioctl(int type, void *arg) "type 0x%x, arg %p"
-- 
1.8.3.1





reply via email to

[Prev in Thread] Current Thread [Next in Thread]