qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH COLO-Frame v5 29/29] COLO: Add block replication int


From: zhanghailiang
Subject: [Qemu-devel] [PATCH COLO-Frame v5 29/29] COLO: Add block replication into colo process
Date: Thu, 21 May 2015 16:13:21 +0800

From: Wen Congyang <address@hidden>

Make sure master start block replication after slave's block replication started

Signed-off-by: zhanghailiang <address@hidden>
Signed-off-by: Wen Congyang <address@hidden>
Signed-off-by: Yang Hongyang <address@hidden>
Signed-off-by: Li Zhijian <address@hidden>
---
 migration/colo.c | 138 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 trace-events     |   2 +
 2 files changed, 138 insertions(+), 2 deletions(-)

diff --git a/migration/colo.c b/migration/colo.c
index 111062f..5b600b1 100644
--- a/migration/colo.c
+++ b/migration/colo.c
@@ -19,6 +19,8 @@
 #include "migration/migration-failover.h"
 #include "net/colo-nic.h"
 #include "qmp-commands.h"
+#include "block/block.h"
+#include "sysemu/block-backend.h"
 
 /*
 * We should not do checkpoint one after another without any time interval,
@@ -102,6 +104,76 @@ static bool colo_runstate_is_stopped(void)
     return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
 }
 
+static void blk_start_replication(bool primary, Error **errp)
+{
+    ReplicationMode mode = primary ? REPLICATION_MODE_PRIMARY :
+                                     REPLICATION_MODE_SECONDARY;
+    BlockBackend *blk, *temp;
+    Error *local_err = NULL;
+
+    for (blk = blk_next(NULL); blk; blk = blk_next(blk)) {
+        if (blk_is_read_only(blk) || !blk_is_inserted(blk)) {
+            continue;
+        }
+
+        bdrv_start_replication(blk_bs(blk), mode, &local_err);
+        if (local_err) {
+            error_propagate(errp, local_err);
+            goto fail;
+        }
+    }
+
+    return;
+
+fail:
+    for (temp = blk_next(NULL); temp != blk; temp = blk_next(temp)) {
+        bdrv_stop_replication(blk_bs(temp), false, NULL);
+    }
+}
+
+static void blk_do_checkpoint(Error **errp)
+{
+    BlockBackend *blk;
+    Error *local_err = NULL;
+
+    for (blk = blk_next(NULL); blk; blk = blk_next(blk)) {
+        if (blk_is_read_only(blk) || !blk_is_inserted(blk)) {
+            continue;
+        }
+
+        bdrv_do_checkpoint(blk_bs(blk), &local_err);
+        if (local_err) {
+            error_propagate(errp, local_err);
+            return;
+        }
+    }
+}
+
+static void blk_stop_replication(bool failover, Error **errp)
+{
+    BlockBackend *blk;
+    Error *local_err = NULL;
+
+    for (blk = blk_next(NULL); blk; blk = blk_next(blk)) {
+        if (blk_is_read_only(blk) || !blk_is_inserted(blk)) {
+            continue;
+        }
+
+        bdrv_stop_replication(blk_bs(blk), failover, &local_err);
+        if (!errp) {
+            /*
+             * The caller doesn't care the result, they just
+             * want to stop all block's replication.
+             */
+            continue;
+        }
+        if (local_err) {
+            error_propagate(errp, local_err);
+            return;
+        }
+    }
+}
+
 /*
  * there are two way to entry this function
  * 1. From colo checkpoint incoming thread, in this case
@@ -112,6 +184,8 @@ static bool colo_runstate_is_stopped(void)
  */
 static void slave_do_failover(void)
 {
+    Error *local_err = NULL;
+
     /* Wait for incoming thread loading vmstate */
     while (vmstate_loading) {
         ;
@@ -121,6 +195,11 @@ static void slave_do_failover(void)
         error_report("colo proxy failed to do failover");
     }
     colo_proxy_destroy(COLO_SECONDARY_MODE);
+    blk_stop_replication(true, &local_err);
+    if (local_err) {
+        error_report_err(local_err);
+    }
+    trace_colo_stop_block_replication("failover");
 
     colo = NULL;
 
@@ -139,6 +218,7 @@ static void slave_do_failover(void)
 static void master_do_failover(void)
 {
     MigrationState *s = migrate_get_current();
+    Error *local_err = NULL;
 
     if (!colo_runstate_is_stopped()) {
         vm_stop_force_state(RUN_STATE_COLO);
@@ -150,6 +230,12 @@ static void master_do_failover(void)
         migrate_set_state(s, MIGRATION_STATUS_COLO, 
MIGRATION_STATUS_COMPLETED);
     }
 
+    blk_stop_replication(true, &local_err);
+    if (local_err) {
+        error_report_err(local_err);
+    }
+    trace_colo_stop_block_replication("failover");
+
     vm_start();
 }
 
@@ -223,6 +309,7 @@ static int colo_do_checkpoint_transaction(MigrationState 
*s, QEMUFile *control)
     int colo_shutdown, ret;
     size_t size;
     QEMUFile *trans = NULL;
+    Error *local_err = NULL;
 
     ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW);
     if (ret < 0) {
@@ -275,6 +362,16 @@ static int colo_do_checkpoint_transaction(MigrationState 
*s, QEMUFile *control)
         goto out;
     }
 
+    /* we call this api although this may do nothing on primary side */
+    qemu_mutex_lock_iothread();
+    blk_do_checkpoint(&local_err);
+    qemu_mutex_unlock_iothread();
+    if (local_err) {
+        error_report_err(local_err);
+        ret = -1;
+        goto out;
+    }
+
     ret = colo_ctl_put(s->file, COLO_CHECKPOINT_SEND);
     if (ret < 0) {
         goto out;
@@ -305,6 +402,10 @@ static int colo_do_checkpoint_transaction(MigrationState 
*s, QEMUFile *control)
     trace_colo_receive_message("COLO_CHECKPOINT_LOADED");
 
     if (colo_shutdown) {
+        qemu_mutex_lock_iothread();
+        blk_stop_replication(false, NULL);
+        trace_colo_stop_block_replication("shutdown");
+        qemu_mutex_unlock_iothread();
         colo_ctl_put(s->file, COLO_GUEST_SHUTDOWN);
         qemu_fflush(s->file);
         colo_shutdown_requested = 0;
@@ -336,6 +437,7 @@ static void *colo_thread(void *opaque)
     QEMUFile *colo_control = NULL;
     int64_t current_time, checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
     int ret;
+    Error *local_err = NULL;
 
     if (colo_proxy_init(COLO_PRIMARY_MODE) != 0) {
         error_report("Init colo proxy error");
@@ -367,6 +469,12 @@ static void *colo_thread(void *opaque)
     }
 
     qemu_mutex_lock_iothread();
+    /* start block replication */
+    blk_start_replication(true, &local_err);
+    if (local_err) {
+        goto out;
+    }
+    trace_colo_start_block_replication();
     vm_start();
     qemu_mutex_unlock_iothread();
     trace_colo_vm_state_change("stop", "run");
@@ -417,7 +525,11 @@ do_checkpoint:
     }
 
 out:
-    error_report("colo: some error happens in colo_thread");
+    if (local_err) {
+        error_report_err(local_err);
+    } else {
+        error_report("colo: some error happens in colo_thread");
+    }
     qemu_mutex_lock_iothread();
     if (!failover_request_is_set()) {
         error_report("master takeover from checkpoint channel");
@@ -498,6 +610,8 @@ static int colo_wait_handle_cmd(QEMUFile *f, int 
*checkpoint_request)
     case COLO_GUEST_SHUTDOWN:
         qemu_mutex_lock_iothread();
         vm_stop_force_state(RUN_STATE_COLO);
+        blk_stop_replication(false, NULL);
+        trace_colo_stop_block_replication("shutdown");
         qemu_system_shutdown_request_core();
         qemu_mutex_unlock_iothread();
         trace_colo_receive_message("COLO_GUEST_SHUTDOWN");
@@ -521,6 +635,7 @@ void *colo_process_incoming_checkpoints(void *opaque)
     QEMUFile *ctl = NULL, *fb = NULL;
     int ret;
     uint64_t total_size;
+    Error *local_err = NULL;
 
     qdev_hotplug = 0;
 
@@ -550,6 +665,15 @@ void *colo_process_incoming_checkpoints(void *opaque)
         goto out;
     }
 
+    qemu_mutex_lock_iothread();
+    /* start block replication */
+    blk_start_replication(false, &local_err);
+    if (local_err) {
+        goto out;
+    }
+    qemu_mutex_unlock_iothread();
+    trace_colo_start_block_replication();
+
     ret = colo_ctl_put(ctl, COLO_CHECPOINT_READY);
     if (ret < 0) {
         goto out;
@@ -636,7 +760,13 @@ void *colo_process_incoming_checkpoints(void *opaque)
         }
 
         vmstate_loading = false;
+
+        /* discard colo disk buffer */
+        blk_do_checkpoint(&local_err);
         qemu_mutex_unlock_iothread();
+        if (local_err) {
+            goto out;
+        }
 
         ret = colo_ctl_put(ctl, COLO_CHECKPOINT_LOADED);
         if (ret < 0) {
@@ -654,7 +784,11 @@ void *colo_process_incoming_checkpoints(void *opaque)
     }
 
 out:
-    error_report("Detect some error or get a failover request");
+    if (local_err) {
+        error_report_err(local_err);
+    } else {
+        error_report("Detect some error or get a failover request");
+    }
     /* determine whether we need to failover */
     if (!failover_request_is_set()) {
         /*
diff --git a/trace-events b/trace-events
index b1c263a..d0ffade 100644
--- a/trace-events
+++ b/trace-events
@@ -1451,6 +1451,8 @@ colo_vm_state_change(const char *old, const char *new) 
"Change '%s' => '%s'"
 colo_receive_message(const char *msg) "Receive '%s'"
 colo_do_failover(void) ""
 colo_rcv_pkt(int result) "Result of net packets comparing is different: %d"
+colo_start_block_replication(void) "Block replication is started"
+colo_stop_block_replication(const char *reason) "Block replication is 
stopped(reason: '%s')"
 
 # kvm-all.c
 kvm_ioctl(int type, void *arg) "type 0x%x, arg %p"
-- 
1.7.12.4





reply via email to

[Prev in Thread] Current Thread [Next in Thread]