qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH v3 17/17] migration/snapshot: Postcopy load implemented


From: nikita . lapshin
Subject: [PATCH v3 17/17] migration/snapshot: Postcopy load implemented
Date: Thu, 16 Jun 2022 13:28:11 +0300

From: Nikita Lapshin <nikita.lapshin@openvz.org>

It is a modified load part from previous patch.

Implemented new rp listen thread for snapshot-tool. Also implemented
functions for starting postcopy.

This mode can be turned on by specifying --postcopy flag.

Signed-off-by: Nikita Lapshin <nikita.lapshin@openvz.org>
---
 include/qemu-snapshot.h   |  12 ++
 migration/migration.c     | 123 +++++++++++++++++++
 migration/migration.h     |   1 +
 migration/qemu-snapshot.c | 249 +++++++++++++++++++++++++++++++++++++-
 migration/savevm.c        |  25 ++++
 migration/savevm.h        |   4 +
 qemu-snapshot.c           |  10 ++
 7 files changed, 422 insertions(+), 2 deletions(-)

diff --git a/include/qemu-snapshot.h b/include/qemu-snapshot.h
index 74885c03bb..b0f235747f 100644
--- a/include/qemu-snapshot.h
+++ b/include/qemu-snapshot.h
@@ -65,6 +65,15 @@ typedef struct StateLoadCtx {
     QEMUFile *f_fd;
     QEMUFile *f_vmstate;
 
+    /* Postcopy part */
+    bool postcopy;
+    bool in_postcopy;
+
+    /* Return path part */
+    QemuThread rp_listen_thread;
+    QEMUFile *f_rp_fd;
+    bool has_rp_listen_thread;
+
     StateInfo state_parameters;
 } StateLoadCtx;
 
@@ -76,6 +85,9 @@ int coroutine_fn save_state_main(StateSaveCtx *s);
 void save_vmstate(StateSaveCtx *s);
 int coroutine_fn load_state_main(StateLoadCtx *s);
 
+int queue_page_request(const char *idstr, uint64_t offset,
+                       uint32_t size);
+
 QEMUFile *qemu_fopen_bdrv_vmstate(BlockDriverState *bs, int is_writable);
 void qemu_fsplice(QEMUFile *f_dst, QEMUFile *f_src, size_t size);
 size_t qemu_fsplice_tail(QEMUFile *f_dst, QEMUFile *f_src);
diff --git a/migration/migration.c b/migration/migration.c
index 6528b3ad41..6f82e8ea48 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -61,6 +61,7 @@
 #include "sysemu/cpus.h"
 #include "yank_functions.h"
 #include "sysemu/qtest.h"
+#include "qemu-snapshot.h"
 
 #define MAX_THROTTLE  (128 << 20)      /* Migration transfer speed throttling 
*/
 
@@ -3517,6 +3518,128 @@ static MigThrError postcopy_pause(MigrationState *s)
     }
 }
 
+/*
+ * Return-path message processing thread for qemu-snapshot tool
+ */
+void *qemu_snapshot_rp_listen_thread(void *opaque)
+{
+    QEMUFile *f = (QEMUFile *) opaque;
+    int res = 0;
+    uint64_t pages = 0;
+
+    while (!res) {
+        uint8_t h_buf[512];
+        const int h_max_len = sizeof(h_buf);
+        int h_type;
+        int h_len;
+        size_t count;
+
+        h_type = qemu_get_be16(f);
+        h_len = qemu_get_be16(f);
+
+        /* Make early check for input errors */
+        res = qemu_file_get_error(f);
+        if (res) {
+            break;
+        }
+
+        /* Check message type */
+        if (h_type >= MIG_RP_MSG_MAX || h_type == MIG_RP_MSG_INVALID) {
+            error_report("RP: received invalid message type %d length %d",
+                         h_type, h_len);
+            res = -EINVAL;
+            break;
+        }
+
+        /* Check message length */
+        if (rp_cmd_args[h_type].len != -1 && h_len != rp_cmd_args[h_type].len) 
{
+            error_report("RP: received %s message len %d expected %ld",
+                         rp_cmd_args[h_type].name,
+                         h_len, rp_cmd_args[h_type].len);
+            res = -EINVAL;
+            break;
+        } else if (h_len > h_max_len) {
+            error_report("RP: received %s message len %d max_len %d",
+                         rp_cmd_args[h_type].name, h_len, h_max_len);
+            res = -EINVAL;
+            break;
+        }
+
+        count = qemu_get_buffer(f, h_buf, h_len);
+        if (count != h_len) {
+            break;
+        }
+
+        switch (h_type) {
+        case MIG_RP_MSG_SHUT:
+        {
+            int shut_error;
+
+            shut_error = be32_to_cpu(*(uint32_t *) h_buf);
+            if (shut_error) {
+                error_report("RP: sibling shutdown, error %d", shut_error);
+            }
+
+            /* Exit processing loop */
+            res = 1;
+            break;
+        }
+
+        case MIG_RP_MSG_REQ_PAGES:
+        case MIG_RP_MSG_REQ_PAGES_ID:
+        {
+            pages++;
+            uint64_t offset;
+            uint32_t size;
+            char *id_str = NULL;
+
+            offset = be64_to_cpu(*(uint64_t *) (h_buf + 0));
+            size = be32_to_cpu(*(uint32_t *) (h_buf + 8));
+
+            if (h_type == MIG_RP_MSG_REQ_PAGES_ID) {
+                int h_parsed_len = rp_cmd_args[MIG_RP_MSG_REQ_PAGES].len;
+
+                if (h_len > h_parsed_len) {
+                    int id_len;
+
+                    /* RAM block id string */
+                    id_len = h_buf[h_parsed_len];
+                    id_str = (char *) &h_buf[h_parsed_len + 1];
+                    id_str[id_len] = 0;
+
+                    h_parsed_len += id_len + 1;
+                }
+
+                if (h_parsed_len != h_len) {
+                    error_report("RP: received %s message len %d expected %d",
+                                 rp_cmd_args[MIG_RP_MSG_REQ_PAGES_ID].name,
+                                 h_len, h_parsed_len);
+                    res = -EINVAL;
+                    break;
+                }
+            }
+
+            res = queue_page_request(id_str, offset, size);
+            break;
+        }
+
+        default:
+            error_report("RP: received unexpected message type %d len %d",
+                         h_type, h_len);
+            res = -EINVAL;
+        }
+    }
+
+    if (res >= 0) {
+        res = qemu_file_get_error(f);
+    }
+    if (res) {
+        error_report("RP: listen thread exit, error %d", res);
+    }
+
+    return NULL;
+}
+
 static MigThrError migration_detect_error(MigrationState *s)
 {
     int ret;
diff --git a/migration/migration.h b/migration/migration.h
index 5c43788a2b..fd6f8d3083 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -433,4 +433,5 @@ void migration_cancel(const Error *error);
 void populate_vfio_info(MigrationInfo *info);
 void postcopy_temp_page_reset(PostcopyTmpPage *tmp_page);
 
+void *qemu_snapshot_rp_listen_thread(void *opaque);
 #endif
diff --git a/migration/qemu-snapshot.c b/migration/qemu-snapshot.c
index 280f5be25c..8090c7032d 100644
--- a/migration/qemu-snapshot.c
+++ b/migration/qemu-snapshot.c
@@ -44,6 +44,16 @@ typedef struct RAMBlock {
     char idstr[256];            /* RAM block id string */
 } RAMBlock;
 
+/* Page request from destination in postcopy */
+typedef struct RAMPageRequest {
+    RAMBlock *block;
+    int64_t offset;
+    unsigned size;
+
+    /* Link into ram_ctx.page_req */
+    QSIMPLEQ_ENTRY(RAMPageRequest) next;
+} RAMPageRequest;
+
 typedef struct RAMPage {
     RAMBlock *block;            /* RAM block containing the page */
     int64_t offset;             /* Page offset in RAM block */
@@ -57,6 +67,11 @@ typedef struct RAMCtx {
     QSIMPLEQ_HEAD(, RAMBlock) ram_block_list;
     RAMPage last_page;
     RAMBlock *last_sent_block;
+
+    /* Page request queue for postcopy */
+    QemuMutex page_req_mutex;
+    QSIMPLEQ_HEAD(, RAMPageRequest) page_req;
+    RAMBlock *last_req_block;
 } RAMCtx;
 
 static RAMCtx ram_ctx;
@@ -627,6 +642,143 @@ static int ram_send_setup(QEMUFile *f, void *opaque)
     return res;
 }
 
+static int send_each_ram_block_discard(QEMUFile *f)
+{
+    RAMBlock *block;
+    int res = 0;
+    uint64_t start;
+    uint64_t len;
+
+    QSIMPLEQ_FOREACH(block, &ram_ctx.ram_block_list, next) {
+        start = block->discard_offset;
+        len = block->length - block->discard_offset;
+        qemu_savevm_send_postcopy_ram_discard(f, block->idstr, 1, &start, 
&len);
+
+        res = qemu_file_get_error(f);
+        if (res) {
+            break;
+        }
+    }
+
+    return res;
+}
+
+static int prepare_postcopy(StateLoadCtx *s)
+{
+    int res = 0;
+
+    res = qemu_snapshot_postcopy_prepare(s->f_fd, page_size, page_size);
+
+    if (!res) {
+        qemu_thread_create(&s->rp_listen_thread, "rp_thread",
+                           qemu_snapshot_rp_listen_thread, s->f_rp_fd,
+                           QEMU_THREAD_JOINABLE);
+        s->has_rp_listen_thread = true;
+    }
+
+    return res;
+}
+
+static int start_postcopy(StateLoadCtx *s)
+{
+    QIOChannelBuffer *bioc;
+    QEMUFile *fb;
+    int eof_pos;
+    int res = 0;
+
+    /*
+     * Send RAM discards for each block's unsent part. Without discards,
+     * the userfault_fd code on destination will not trigger page requests
+     * as expected. Also, the UFFDIO_COPY ioctl that is used to place incoming
+     * page in postcopy would give an error if that page has not faulted
+     * with MISSING reason.
+     */
+    res = send_each_ram_block_discard(s->f_fd);
+    if (res) {
+        error_report("here");
+        return res;
+    }
+
+    /*
+     * To perform a switch to postcopy on destination, we need to send
+     * commands and the device state data in the following order:
+     *   * MIG_CMD_POSTCOPY_LISTEN
+     *   * Non-iterable device state sections
+     *   * MIG_CMD_POSTCOPY_RUN
+     *
+     * All this has to be packaged into a single blob using MIG_CMD_PACKAGED
+     * command. While loading the device state we may trigger page transfer
+     * requests and the fd must be free to process those, thus the destination
+     * must read the whole device state off the fd before it starts
+     * processing it. To wrap it up in a package, QEMU buffer channel is used.
+     */
+    bioc = qio_channel_buffer_new(512 * 1024);
+    qio_channel_set_name(QIO_CHANNEL(bioc), "migration-postcopy-buffer");
+    fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
+    object_unref(OBJECT(bioc));
+
+    /* MIG_CMD_POSTCOPY_LISTEN command */
+    qemu_savevm_send_postcopy_listen(fb);
+
+    /* The rest of non-iterable device state with an optional vmdesc section */
+    send_vmstate(s->f_vmstate, fb);
+    qemu_fflush(fb);
+
+    /*
+     * vmdesc section may optionally be present at the end of the stream
+     * so we'll try to locate it and truncate the trailer.
+     */
+    eof_pos = bioc->usage - 1;
+
+    for (int offset = (bioc->usage - 11); offset >= 0; offset--) {
+        if (bioc->data[offset] == QEMU_VM_SECTION_FOOTER &&
+                bioc->data[offset + 5] == QEMU_VM_EOF &&
+                bioc->data[offset + 6] == QEMU_VM_VMDESCRIPTION) {
+            uint32_t expected_length = bioc->usage - (offset + 11);
+            uint32_t json_length;
+
+            json_length = be32_to_cpu(*(uint32_t  *) &bioc->data[offset + 7]);
+            if (json_length != expected_length) {
+                error_report("Corrupted vmdesc trailer: length %" PRIu32
+                             " expected %" PRIu32,
+                             json_length, expected_length);
+                res = -EINVAL;
+                goto fail;
+            }
+
+            eof_pos = offset + 5;
+            break;
+        }
+    }
+
+    /*
+     * When switching to postcopy we need to skip QEMU_VM_EOF token which
+     * normally is placed after the last non-iterable device state section
+     * (but before the vmdesc section).
+     *
+     * Skipping QEMU_VM_EOF is required to allow migration process to
+     * continue in postcopy. Vmdesc section also has to be skipped here.
+     */
+    if (eof_pos >= 0 && bioc->data[eof_pos] == QEMU_VM_EOF) {
+        bioc->usage = eof_pos;
+        bioc->offset = eof_pos;
+    }
+
+    /* Finally is the MIG_CMD_POSTCOPY_RUN command */
+    qemu_savevm_send_postcopy_run(fb);
+
+    /* Now send that blob */
+    qemu_savevm_send_packaged(s->f_fd, bioc->data, bioc->usage);
+    qemu_fflush(s->f_fd);
+
+    s->in_postcopy = true;
+fail:
+    qemu_fclose(fb);
+    load_state_check_errors(s, &res);
+
+    return res;
+}
+
 static bool find_next_page(RAMPage *page)
 {
     RAMCtx *ram = &ram_ctx;
@@ -709,6 +861,74 @@ void clear_page_range(RAMPage *page, int64_t length)
                  ((length - 1) >> slice_bits) + 1);
 }
 
+int queue_page_request(const char *idstr, uint64_t offset,
+                       uint32_t size)
+{
+    RAMCtx *ram = &ram_ctx;
+    RAMBlock *block;
+    RAMPageRequest *new_entry;
+
+    if (!idstr) {
+        block = ram->last_req_block;
+        if (!block) {
+            error_report("RP-REQ_PAGES: no previous block");
+            return -EINVAL;
+        }
+    } else {
+        block = ram_block_by_idstr(idstr);
+        if (!block) {
+            error_report("RP-REQ_PAGES: cannot find block %s", idstr);
+            return -EINVAL;
+        }
+
+        ram->last_req_block = block;
+    }
+
+    if (!ram_offset_in_block(block, offset)) {
+        error_report("RP-REQ_PAGES: offset 0x%" PRIx64 " out of RAM block %s",
+                     offset, idstr);
+        return -EINVAL;
+    }
+
+    new_entry = g_new0(RAMPageRequest, 1);
+    new_entry->block = block;
+    new_entry->offset = offset;
+    new_entry->size = size;
+
+    qemu_mutex_lock(&ram->page_req_mutex);
+    QSIMPLEQ_INSERT_TAIL(&ram->page_req, new_entry, next);
+    qemu_mutex_unlock(&ram->page_req_mutex);
+
+    return 0;
+}
+
+static bool get_queued_page(RAMPage *page) {
+    RAMCtx *ram = &ram_ctx;
+
+    if (QSIMPLEQ_EMPTY_ATOMIC(&ram->page_req)) {
+        return false;
+    }
+
+    QEMU_LOCK_GUARD(&ram->page_req_mutex);
+    if (!QSIMPLEQ_EMPTY(&ram->page_req)) {
+        RAMPageRequest *entry = QSIMPLEQ_FIRST(&ram->page_req);
+        RAMBlock *block = entry->block;
+        int64_t slice = entry->offset >> slice_bits;
+
+        QSIMPLEQ_REMOVE_HEAD(&ram->page_req, next);
+        g_free(entry);
+
+        if (test_bit(slice, block->bitmap)) {
+            page->block = block;
+            page->offset = slice << slice_bits;
+
+            return true;
+        }
+    }
+
+    return false;
+}
+
 static int coroutine_fn ram_load_pages(StateLoadCtx *s)
 {
     RAMCtx *ram = &ram_ctx;
@@ -718,8 +938,9 @@ static int coroutine_fn ram_load_pages(StateLoadCtx *s)
     int64_t blk_offset, slice_offset, bdrv_offset;
     ssize_t res;
     int64_t flags = RAM_SAVE_FLAG_CONTINUE;
+    bool urgent = get_queued_page(&page);
 
-    if (!find_next_page(&page)) {
+    if (!urgent && !find_next_page(&page)) {
         return 0;
     }
 
@@ -835,6 +1056,20 @@ int coroutine_fn load_state_main(StateLoadCtx *s)
         goto fail;
     }
 
+    if (s->postcopy) {
+        res = prepare_postcopy(s);
+        if (res) {
+            error_report("Prepare postcopy failed");
+            goto fail;
+        }
+        /* TODO: Add condition to start postcopy during the cycle below */
+        res = start_postcopy(s);
+        if (res) {
+            error_report("Postcopy start failed");
+            goto fail;
+        }
+    }
+
     do {
         res = qemu_savevm_state_iterate(s->f_fd, false);
         /* Check for file errors */
@@ -845,8 +1080,11 @@ int coroutine_fn load_state_main(StateLoadCtx *s)
         goto fail;
     }
 
+    /* If tool is in posctopy mode then vmstate have been already sent */
+    if (!s->in_postcopy) {
+        send_vmstate(s->f_vmstate, s->f_fd);
+    }
 
-    send_vmstate(s->f_vmstate, s->f_fd);
     qemu_put_byte(s->f_fd, QEMU_VM_EOF);
     qemu_fflush(s->f_fd);
 fail:
@@ -865,6 +1103,10 @@ void ram_init_state(void)
 
     /* Initialize RAM block list head */
     QSIMPLEQ_INIT(&ram->ram_block_list);
+
+    /* Initialize postcopy page request queue */
+    qemu_mutex_init(&ram->page_req_mutex);
+    QSIMPLEQ_INIT(&ram->page_req);
 }
 
 /* Destroy snapshot RAM state */
@@ -878,4 +1120,7 @@ void ram_destroy_state(void)
         g_free(block->bitmap);
         g_free(block);
     }
+
+    /* Destroy page request mutex */
+    qemu_mutex_destroy(&ram_ctx.page_req_mutex);
 }
diff --git a/migration/savevm.c b/migration/savevm.c
index b722e51163..b1320bd813 100644
--- a/migration/savevm.c
+++ b/migration/savevm.c
@@ -3356,3 +3356,28 @@ void qmp_snapshot_delete(const char *job_id,
 
     job_start(&s->common);
 }
+
+/* Do preparation before qemu-snapshot tool start postcopy */
+int qemu_snapshot_postcopy_prepare(QEMUFile *f_dest,
+                                   uint64_t sps,
+                                   uint64_t tps)
+{
+    uint64_t tmp[2];
+    int res;
+
+    /* Send POSTCOPY_ADVISE */
+    tmp[0] = cpu_to_be64(sps);
+    tmp[1] = cpu_to_be64(tps);
+    qemu_savevm_command_send(f_dest, MIG_CMD_POSTCOPY_ADVISE, 16, (uint8_t *) 
tmp);
+
+    /* Open return path on destination */
+    qemu_savevm_command_send(f_dest, MIG_CMD_OPEN_RETURN_PATH, 0, NULL);
+
+    /*
+     * Check for file errors after sending POSTCOPY_ADVISE command
+     * since destination may already have closed input pipe in case
+     * postcopy had not been enabled in advance.
+     */
+    res = qemu_file_get_error(f_dest);
+    return res;
+}
diff --git a/migration/savevm.h b/migration/savevm.h
index 9abfcd88e5..94b6f60496 100644
--- a/migration/savevm.h
+++ b/migration/savevm.h
@@ -72,4 +72,8 @@ int qemu_loadvm_section_part_end(QEMUFile *f, 
MigrationIncomingState *mis);
 
 int qemu_replace_ram_handler(void *ram_ops, void *opaque);
 
+int qemu_snapshot_postcopy_prepare(QEMUFile *f_dest,
+                                   uint64_t sps,
+                                   uint64_t tps);
+
 #endif
diff --git a/qemu-snapshot.c b/qemu-snapshot.c
index 04bda74fb4..893086eb8a 100644
--- a/qemu-snapshot.c
+++ b/qemu-snapshot.c
@@ -239,6 +239,7 @@ static void coroutine_fn snapshot_load_co(void *opaque)
     StateLoadCtx *s = get_load_context();
     int res;
     QIOChannel *ioc_fd;
+    QIOChannel *ioc_rp_fd;
 
     init_load_context();
 
@@ -260,8 +261,17 @@ static void coroutine_fn snapshot_load_co(void *opaque)
     object_unref(OBJECT(ioc_fd));
     qemu_file_set_blocking(s->f_fd, false);
 
+    /* qemufile on return path fd if we are going to use postcopy */
+    if (params.postcopy) {
+        ioc_rp_fd = qio_channel_new_fd(params.rp_fd, NULL);
+        qio_channel_set_name(QIO_CHANNEL(ioc_fd), "migration-channel-rp");
+        s->f_rp_fd = qemu_fopen_channel_input(ioc_rp_fd);
+        object_unref(OBJECT(ioc_rp_fd));
+    }
+
     s->state_parameters = *((StateInfo *) opaque);
 
+    s->postcopy = params.postcopy;
     res = load_state_main(s);
     if (res) {
         error_report("Failed to load snapshot: %s", strerror(-res));
-- 
2.31.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]