qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH v2 10/43] Return path: Source handling of return pat


From: Dr. David Alan Gilbert (git)
Subject: [Qemu-devel] [PATCH v2 10/43] Return path: Source handling of return path
Date: Mon, 11 Aug 2014 15:29:26 +0100

From: "Dr. David Alan Gilbert" <address@hidden>

Open a return path, and handle messages that are received upon it.

Signed-off-by: Dr. David Alan Gilbert <address@hidden>
---
 include/migration/migration.h |  11 ++++
 migration.c                   | 145 +++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 155 insertions(+), 1 deletion(-)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index 12e640d..0c9055f 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -47,6 +47,15 @@ enum mig_rpcomm_cmd {
     MIG_RPCOMM_ACK,          /* data (seq: be32 ) */
     MIG_RPCOMM_AFTERLASTVALID
 };
+
+struct MigrationRetPathState {
+    uint16_t      header_com;   /* Headers of last (partially?) received cmd */
+    uint16_t      header_len;
+    uint32_t      latest_ack;
+    bool          error;        /* True if something bad happened on the RP */
+    QemuSemaphore finished;     /* When the RP co quits */
+};
+
 typedef struct MigrationState MigrationState;
 
 /* State for the incoming migration */
@@ -69,9 +78,11 @@ struct MigrationState
     QemuThread thread;
     QEMUBH *cleanup_bh;
     QEMUFile *file;
+    QEMUFile *return_path;
 
     int state;
     MigrationParams params;
+    struct MigrationRetPathState rp_state;
     double mbps;
     int64_t total_time;
     int64_t downtime;
diff --git a/migration.c b/migration.c
index 3e4e120..c54d59e 100644
--- a/migration.c
+++ b/migration.c
@@ -373,6 +373,15 @@ static void migrate_set_state(MigrationState *s, int 
old_state, int new_state)
     }
 }
 
+static void migrate_fd_cleanup_src_rp(MigrationState *ms)
+{
+    if (ms->return_path) {
+        DPRINTF("cleaning up return path\n");
+        qemu_fclose(ms->return_path);
+        ms->return_path = NULL;
+    }
+}
+
 static void migrate_fd_cleanup(void *opaque)
 {
     MigrationState *s = opaque;
@@ -380,6 +389,8 @@ static void migrate_fd_cleanup(void *opaque)
     qemu_bh_delete(s->cleanup_bh);
     s->cleanup_bh = NULL;
 
+    migrate_fd_cleanup_src_rp(s);
+
     if (s->file) {
         trace_migrate_fd_cleanup();
         qemu_mutex_unlock_iothread();
@@ -657,8 +668,140 @@ int64_t migrate_xbzrle_cache_size(void)
     return s->xbzrle_cache_size;
 }
 
-/* migration thread support */
+/*
+ * Something bad happened to the RP stream, mark an error
+ * The caller shall print something to indicate why
+ */
+static void source_return_path_bad(MigrationState *s)
+{
+    s->rp_state.error = true;
+    migrate_fd_cleanup_src_rp(s);
+}
+
+/*
+ * Handles messages sent on the return path towards the source VM
+ *
+ * This is a coroutine that sits around listening for messages as
+ * long as the return-path exists
+ */
+static void source_return_path_co(void *opaque)
+{
+    MigrationState *ms = opaque;
+    QEMUFile *rp = ms->return_path;
+    const int max_len = 512;
+    uint8_t buf[max_len];
+    uint32_t tmp32;
+    int res;
+
+    DPRINTF("RP: source_return_path_co entry");
+    while (rp && !qemu_file_get_error(rp)) {
+        DPRINTF("RP: source_return_path_co top of loop");
+        ms->rp_state.header_com = qemu_get_be16(rp);
+        ms->rp_state.header_len = qemu_get_be16(rp);
+
+        uint16_t expected_len;
+
+        switch (ms->rp_state.header_com) {
+        case MIG_RPCOMM_SHUT:
+        case MIG_RPCOMM_ACK:
+            expected_len = 4;
+            break;
+
+        default:
+            error_report("RP: Received invalid cmd 0x%04x length 0x%04x",
+                    ms->rp_state.header_com, ms->rp_state.header_len);
+            source_return_path_bad(ms);
+            goto out;
+        }
 
+        if (ms->rp_state.header_len > expected_len) {
+            error_report("RP: Received command 0x%04x with"
+                    "incorrect length %d expecting %d",
+                    ms->rp_state.header_com, ms->rp_state.header_len,
+                    expected_len);
+            source_return_path_bad(ms);
+            goto out;
+        }
+
+        /* We know we've got a valid header by this point */
+        res = qemu_get_buffer(rp, buf, ms->rp_state.header_len);
+        if (res != ms->rp_state.header_len) {
+            DPRINTF("RP: Failed to read command data");
+            source_return_path_bad(ms);
+            goto out;
+        }
+
+        /* OK, we have the command and the data */
+        switch (ms->rp_state.header_com) {
+        case MIG_RPCOMM_SHUT:
+            tmp32 = be32_to_cpup((uint32_t *)buf);
+            if (tmp32) {
+                error_report("RP: Sibling indicated error %d", tmp32);
+                source_return_path_bad(ms);
+            } else {
+                DPRINTF("RP: SHUT received");
+            }
+            /*
+             * We'll let the main thread deal with closing the RP
+             * we could do a shutdown(2) on it, but we're the only user
+             * anyway, so there's nothing gained.
+             */
+            goto out;
+
+        case MIG_RPCOMM_ACK:
+            tmp32 = be32_to_cpup((uint32_t *)buf);
+            DPRINTF("RP: Received ACK 0x%x", tmp32);
+            atomic_xchg(&ms->rp_state.latest_ack, tmp32);
+            break;
+
+        default:
+            /* This shouldn't happen because we should catch this above */
+            DPRINTF("RP: Bad header_com in dispatch");
+        }
+        /* Latest command processed, now leave a gap for the next one */
+        ms->rp_state.header_com = MIG_RPCOMM_INVALID;
+    }
+    if (rp && qemu_file_get_error(rp)) {
+        DPRINTF("source_report_path_co: rp bad at end");
+        source_return_path_bad(ms);
+    }
+
+    DPRINTF("source_report_path_co: Bottom exit");
+
+out:
+    /* For await_outgoing_return_path_close */
+    qemu_sem_post(&ms->rp_state.finished);
+}
+
+static int open_outgoing_return_path(MigrationState *ms)
+{
+    Coroutine *co = qemu_coroutine_create(source_return_path_co);
+    qemu_sem_init(&ms->rp_state.finished, 0);
+
+    ms->return_path = qemu_file_get_return_path(ms->file);
+    if (!ms->return_path) {
+        return -1;
+    }
+
+    DPRINTF("open_outgoing_return_path starting co");
+    qemu_coroutine_enter(co, ms);
+    DPRINTF("open_outgoing_return_path continuing");
+
+    return 0;
+}
+
+static void await_outgoing_return_path_close(MigrationState *ms)
+{
+    /* TODO: once the _co becomes a process we can replace this by a join */
+    DPRINTF("%s: Waiting", __func__);
+    qemu_sem_wait(&ms->rp_state.finished);
+    DPRINTF("%s: Exit", __func__);
+}
+
+/*
+ * Master migration thread on the source VM.
+ * It drives the migration and pumps the data down the outgoing channel.
+ */
 static void *migration_thread(void *opaque)
 {
     MigrationState *s = opaque;
-- 
1.9.3




reply via email to

[Prev in Thread] Current Thread [Next in Thread]