[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression thre
From: |
Dr. David Alan Gilbert |
Subject: |
Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads |
Date: |
Thu, 6 Nov 2014 15:41:24 +0000 |
User-agent: |
Mutt/1.5.23 (2014-03-12) |
* Li Liang (address@hidden) wrote:
> Instead of sending the guest memory directly, this solution compress
> the ram page before sending, after receiving, the data will be
> decompressed.
> This feature can help to reduce the data transferred about
> 60%, this is very useful when the network bandwidth is limited,
> and the migration time can also be reduced about 70%. The
> feature is off by default, following the document
> docs/multiple-compression-threads.txt for information to use it.
More technical comments below; but could you split the patch up a bit
more please - it's a bit daunting; probably with the commands in a separate
patch, and maybe split the compress stuff into one patch and the decompress
into another.
Another thing; I've not figured out how all of this gets cleaned up in
a migration_cancel or if the migration fails.
> Reviewed-by: Eric Blake <address@hidden>
> Signed-off-by: Li Liang <address@hidden>
> ---
> arch_init.c | 435
> ++++++++++++++++++++++++++++++++++++++++--
> hmp-commands.hx | 56 ++++++
> hmp.c | 57 ++++++
> hmp.h | 6 +
> include/migration/migration.h | 12 +-
> include/migration/qemu-file.h | 1 +
> migration.c | 99 ++++++++++
> monitor.c | 21 ++
> qapi-schema.json | 88 ++++++++-
> qmp-commands.hx | 131 +++++++++++++
> 10 files changed, 890 insertions(+), 16 deletions(-)
>
> diff --git a/arch_init.c b/arch_init.c
> index 88a5ba0..a27d87b 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -24,6 +24,7 @@
> #include <stdint.h>
> #include <stdarg.h>
> #include <stdlib.h>
> +#include <zlib.h>
> #ifndef _WIN32
> #include <sys/types.h>
> #include <sys/mman.h>
> @@ -126,6 +127,7 @@ static uint64_t bitmap_sync_count;
> #define RAM_SAVE_FLAG_CONTINUE 0x20
> #define RAM_SAVE_FLAG_XBZRLE 0x40
> /* 0x80 is reserved in migration.h start with 0x100 next */
> +#define RAM_SAVE_FLAG_COMPRESS_PAGE 0x100
>
> static struct defconfig_file {
> const char *filename;
> @@ -332,6 +334,177 @@ static uint64_t migration_dirty_pages;
> static uint32_t last_version;
> static bool ram_bulk_stage;
>
Magic 16 constnats - why?
> +#define COMPRESS_BUF_SIZE (TARGET_PAGE_SIZE + 16)
> +#define MIG_BUF_SIZE (COMPRESS_BUF_SIZE + 256 + 16)
> +struct MigBuf {
> + int buf_index;
> + uint8_t buf[MIG_BUF_SIZE];
> +};
> +
> +typedef struct MigBuf MigBuf;
> +
These functions look like they're recreating stuff in Qemufile - is there
no way to share anything?
> +static void migrate_put_byte(MigBuf *f, int v)
> +{
> + f->buf[f->buf_index] = v;
> + f->buf_index++;
> +}
> +
> +static void migrate_put_be16(MigBuf *f, unsigned int v)
> +{
> + migrate_put_byte(f, v >> 8);
> + migrate_put_byte(f, v);
> +}
> +
> +static void migrate_put_be32(MigBuf *f, unsigned int v)
> +{
> + migrate_put_byte(f, v >> 24);
> + migrate_put_byte(f, v >> 16);
> + migrate_put_byte(f, v >> 8);
> + migrate_put_byte(f, v);
> +}
> +
> +static void migrate_put_be64(MigBuf *f, uint64_t v)
> +{
> + migrate_put_be32(f, v >> 32);
> + migrate_put_be32(f, v);
> +}
> +
This feels like you're doing something very similar to
the buffered file code that recently went in; could you
reuse qemu_bufopen or the QEMUSizedBuffer?
I think if you could use the qemu_buf somehow (maybe with
modifications?) then you could avoid a lot of the 'if'd
code below, because you'd always be working with a QEMUFile,
it would just be a different QEMUFile.
> +static void migrate_put_buffer(MigBuf *f, const uint8_t *buf, int size)
> +{
> + int l;
> +
> + while (size > 0) {
> + l = MIG_BUF_SIZE - f->buf_index;
> + if (l > size) {
> + l = size;
> + }
> + memcpy(f->buf + f->buf_index, buf, l);
> + f->buf_index += l;
> + buf += l;
> + size -= l;
> + }
> +}
> +
> +static size_t migrate_save_block_hdr(MigBuf *f, RAMBlock *block,
> + ram_addr_t offset, int cont, int flag)
> +{
> + size_t size;
> +
> + migrate_put_be64(f, offset | cont | flag);
> + size = 8;
> +
> + if (!cont) {
> + migrate_put_byte(f, strlen(block->idstr));
> + migrate_put_buffer(f, (uint8_t *)block->idstr,
> + strlen(block->idstr));
> + size += 1 + strlen(block->idstr);
> + }
> + return size;
> +}
> +
> +static int migrate_qemu_add_compress(MigBuf *f, const uint8_t *p,
> + int size, int level)
> +{
> + uLong blen = COMPRESS_BUF_SIZE;
> + if (compress2(f->buf + f->buf_index + sizeof(int), &blen, (Bytef *)p,
> + size, level) != Z_OK) {
> + error_report("Compress Failed!\n");
> + return 0;
> + }
> + migrate_put_be32(f, blen);
> + f->buf_index += blen;
> + return blen + sizeof(int);
> +}
Please add a comment about what this is doing, and use size_t or
unsigned int for sizes.
Also error_report doesn't need the \n
> +enum {
> + COM_DONE = 0,
> + COM_START,
> +};
> +
> +static int compress_thread_count;
> +static int decompress_thread_count;
> +
> +struct compress_param {
> + int state;
> + MigBuf migbuf;
> + RAMBlock *block;
> + ram_addr_t offset;
> + bool last_stage;
> + int ret;
> + int bytes_sent;
> + uint8_t *p;
> + int cont;
> + bool bulk_stage;
> +};
> +
> +typedef struct compress_param compress_param;
> +compress_param *comp_param;
> +
> +struct decompress_param {
> + int state;
> + void *des;
> + uint8 compbuf[COMPRESS_BUF_SIZE];
> + int len;
> +};
> +typedef struct decompress_param decompress_param;
> +
> +static decompress_param *decomp_param;
> +bool incomming_migration_done;
> +static bool quit_thread;
> +
> +static int save_compress_ram_page(compress_param *param);
> +
> +
> +static void *do_data_compress(void *opaque)
> +{
> + compress_param *param = opaque;
> + while (!quit_thread) {
> + if (param->state == COM_START) {
> + save_compress_ram_page(param);
> + param->state = COM_DONE;
> + } else {
> + g_usleep(1);
There has to be a better way than heaving your thread spin
with sleeps; qemu_event or semaphore or something?
> + }
> + }
> +
> + return NULL;
> +}
> +
> +
> +void migrate_compress_threads_join(MigrationState *s)
> +{
> + int i;
> + if (!migrate_use_compress()) {
> + return;
> + }
> + quit_thread = true;
> + for (i = 0; i < compress_thread_count; i++) {
> + qemu_thread_join(s->compress_thread + i);
> + }
> + g_free(s->compress_thread);
> + g_free(comp_param);
> + s->compress_thread = NULL;
> + comp_param = NULL;
> +}
> +
> +void migrate_compress_threads_create(MigrationState *s)
> +{
> + int i;
> + if (!migrate_use_compress()) {
> + return;
> + }
> + quit_thread = false;
> + compress_thread_count = s->compress_thread_count;
> + s->compress_thread = g_malloc0(sizeof(QemuThread)
> + * s->compress_thread_count);
> + comp_param = g_malloc0(sizeof(compress_param) *
> s->compress_thread_count);
You might need to be careful about how quit_thread and comp_param
are accessed by the migration thread and your individual compression threads,
especially on those architectures that don't do ordering etc.
> + for (i = 0; i < s->compress_thread_count; i++) {
> + qemu_thread_create(s->compress_thread + i, "compress",
> + do_data_compress, comp_param + i, QEMU_THREAD_JOINABLE);
> +
> + }
> +}
> +
> /* Update the xbzrle cache to reflect a page that's been sent as all 0.
> * The important thing is that a stale (not-yet-0'd) page be replaced
> * by the new data.
> @@ -351,9 +524,10 @@ static void xbzrle_cache_zero_page(ram_addr_t
> current_addr)
>
> #define ENCODING_FLAG_XBZRLE 0x1
>
> -static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
> +static int save_xbzrle_page(void *f, uint8_t **current_data,
> ram_addr_t current_addr, RAMBlock *block,
> - ram_addr_t offset, int cont, bool last_stage)
> + ram_addr_t offset, int cont, bool last_stage,
> + bool save_to_buf)
> {
> int encoded_len = 0, bytes_sent = -1;
> uint8_t *prev_cached_page;
> @@ -401,10 +575,19 @@ static int save_xbzrle_page(QEMUFile *f, uint8_t
> **current_data,
> }
>
> /* Send XBZRLE based compressed page */
> - bytes_sent = save_block_hdr(f, block, offset, cont,
> RAM_SAVE_FLAG_XBZRLE);
> - qemu_put_byte(f, ENCODING_FLAG_XBZRLE);
> - qemu_put_be16(f, encoded_len);
> - qemu_put_buffer(f, XBZRLE.encoded_buf, encoded_len);
> + if (save_to_buf) {
> + bytes_sent = migrate_save_block_hdr((MigBuf *)f, block, offset,
> + cont, RAM_SAVE_FLAG_XBZRLE);
> + migrate_put_byte((MigBuf *)f, ENCODING_FLAG_XBZRLE);
> + migrate_put_be16((MigBuf *)f, encoded_len);
> + migrate_put_buffer((MigBuf *)f, XBZRLE.encoded_buf, encoded_len);
> + } else {
> + bytes_sent = save_block_hdr((QEMUFile *)f, block, offset,
> + cont, RAM_SAVE_FLAG_XBZRLE);
> + qemu_put_byte((QEMUFile *)f, ENCODING_FLAG_XBZRLE);
> + qemu_put_be16((QEMUFile *)f, encoded_len);
> + qemu_put_buffer((QEMUFile *)f, XBZRLE.encoded_buf, encoded_len);
> + }
So this in particular is the thing where I think using a qemu_buf file/qsb
would help; all that if would disappear.
> bytes_sent += encoded_len + 1 + 2;
> acct_info.xbzrle_pages++;
> acct_info.xbzrle_bytes += bytes_sent;
> @@ -609,7 +792,7 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block,
> ram_addr_t offset,
> xbzrle_cache_zero_page(current_addr);
> } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
> bytes_sent = save_xbzrle_page(f, &p, current_addr, block,
> - offset, cont, last_stage);
> + offset, cont, last_stage, false);
> if (!last_stage) {
> /* Can't send this cached data async, since the cache page
> * might get updated before it gets to the wire
> @@ -635,6 +818,90 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block,
> ram_addr_t offset,
> return bytes_sent;
> }
>
> +static int save_compress_ram_page(compress_param *param)
> +{
> + int bytes_sent = param->bytes_sent;
> + int blen = COMPRESS_BUF_SIZE;
> + int cont = param->cont;
> + uint8_t *p = param->p;
> + int ret = param->ret;
> + RAMBlock *block = param->block;
> + ram_addr_t offset = param->offset;
> + bool last_stage = param->last_stage;
> + /* In doubt sent page as normal */
> + XBZRLE_cache_lock();
> + ram_addr_t current_addr = block->offset + offset;
> + if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> + if (ret != RAM_SAVE_CONTROL_DELAYED) {
> + if (bytes_sent > 0) {
> + atomic_inc(&acct_info.norm_pages);
> + } else if (bytes_sent == 0) {
> + atomic_inc(&acct_info.dup_pages);
> + }
> + }
> + } else if (is_zero_range(p, TARGET_PAGE_SIZE)) {
> + atomic_inc(&acct_info.dup_pages);
> + bytes_sent = migrate_save_block_hdr(¶m->migbuf, block, offset,
> cont,
> + RAM_SAVE_FLAG_COMPRESS);
> + migrate_put_byte(¶m->migbuf, 0);
> + bytes_sent++;
> + /* Must let xbzrle know, otherwise a previous (now 0'd) cached
> + * page would be stale
> + */
> + xbzrle_cache_zero_page(current_addr);
> + } else if (!param->bulk_stage && migrate_use_xbzrle()) {
> + bytes_sent = save_xbzrle_page(¶m->migbuf, &p, current_addr,
> block,
> + offset, cont, last_stage, true);
> + }
> + XBZRLE_cache_unlock();
> + /* XBZRLE overflow or normal page */
I wonder if it's worth the complexity of doing the zero check
and the xbzrle if you're already doing compression? I assume
zlib is going to handle a zero page reasonably well anyway?
> + if (bytes_sent == -1) {
> + bytes_sent = migrate_save_block_hdr(¶m->migbuf, block,
> + offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
> + blen = migrate_qemu_add_compress(¶m->migbuf, p,
> + TARGET_PAGE_SIZE, migrate_compress_level());
> + bytes_sent += blen;
> + atomic_inc(&acct_info.norm_pages);
> + }
> + return bytes_sent;
> +}
> +
> +static uint64_t bytes_transferred;
> +
> +static void flush_compressed_data(QEMUFile *f)
> +{
> + int idx;
> + if (!migrate_use_compress()) {
> + return;
> + }
> +
> + for (idx = 0; idx < compress_thread_count; idx++) {
> + while (comp_param[idx].state != COM_DONE) {
> + g_usleep(0);
> + }
Again, some type of event/semaphore rather than busy sleeping;
and also I don't understand how the different threads keep everything
in order - can you add some comments (or maybe notes in the docs)
that explain how it all works?
> + if (comp_param[idx].migbuf.buf_index > 0) {
> + qemu_put_buffer(f, comp_param[idx].migbuf.buf,
> + comp_param[idx].migbuf.buf_index);
> + bytes_transferred += comp_param[idx].migbuf.buf_index;
> + comp_param[idx].migbuf.buf_index = 0;
> + }
> + }
> +}
> +static inline void set_common_compress_params(compress_param *param,
> + int ret, int bytes_sent, RAMBlock *block, ram_addr_t offset,
> + bool last_stage, int cont, uint8_t *p, bool bulk_stage)
> +{
> + param->ret = ret;
> + param->bytes_sent = bytes_sent;
> + param->block = block;
> + param->offset = offset;
> + param->last_stage = last_stage;
> + param->cont = cont;
> + param->p = p;
> + param->bulk_stage = bulk_stage;
> +}
> +
> /*
> * ram_find_and_save_block: Finds a page to send and sends it to f
> *
> @@ -649,6 +916,8 @@ static int ram_find_and_save_block(QEMUFile *f, bool
> last_stage)
> bool complete_round = false;
> int bytes_sent = 0;
> MemoryRegion *mr;
> + int cont, idx, ret, len = -1;
> + uint8_t *p;
>
> if (!block)
> block = QTAILQ_FIRST(&ram_list.blocks);
> @@ -667,14 +936,73 @@ static int ram_find_and_save_block(QEMUFile *f, bool
> last_stage)
> block = QTAILQ_FIRST(&ram_list.blocks);
> complete_round = true;
> ram_bulk_stage = false;
> + if (migrate_use_xbzrle()) {
> + /* terminate the used thread at this point*/
> + flush_compressed_data(f);
> + quit_thread = true;
> + }
> }
> } else {
> - bytes_sent = ram_save_page(f, block, offset, last_stage);
> -
> - /* if page is unmodified, continue to the next */
> - if (bytes_sent > 0) {
> - last_sent_block = block;
> - break;
> + if (!migrate_use_compress()) {
> + bytes_sent = ram_save_page(f, block, offset, last_stage);
> + /* if page is unmodified, continue to the next */
> + if (bytes_sent > 0) {
> + last_sent_block = block;
> + break;
> + }
> + } else {
> + cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE :
> 0;
> + p = memory_region_get_ram_ptr(block->mr) + offset;
> + ret = ram_control_save_page(f, block->offset,
> + offset, TARGET_PAGE_SIZE, &len);
> + if ((!ram_bulk_stage && migrate_use_xbzrle()) || cont == 0) {
> + if (cont == 0) {
> + flush_compressed_data(f);
> + }
> + set_common_compress_params(&comp_param[0],
> + ret, len, block, offset, last_stage, cont,
> + p, ram_bulk_stage);
> + bytes_sent = save_compress_ram_page(&comp_param[0]);
> + if (bytes_sent > 0) {
> + qemu_put_buffer(f, comp_param[0].migbuf.buf,
> + comp_param[0].migbuf.buf_index);
> + comp_param[0].migbuf.buf_index = 0;
> + last_sent_block = block;
> + break;
> + }
Is there no way to move this down into your save_compress_ram_page?
When I split the code into ram_find_and_save_block and ram_save_page
a few months ago, it meant that 'ram_find_and_save_block' only really
did the work of finding what to send, and 'ram_save_page' figured out
everything to do with sending it; it would be nice to keep all
the details of sending it separate still.
Since ram_bulk_stage is a static global in this file, why bother passing
it into the 'compress_params'? I think you could probably avoid a lot
of things like that.
> + } else {
> +retry:
> + for (idx = 0; idx < compress_thread_count; idx++) {
> + if (comp_param[idx].state == COM_DONE) {
> + bytes_sent = comp_param[idx].migbuf.buf_index;
> + if (bytes_sent == 0) {
> + set_common_compress_params(&comp_param[idx],
> + ret, len, block, offset, last_stage,
> + cont, p, ram_bulk_stage);
> + comp_param[idx].state = COM_START;
> + bytes_sent = 1;
> + bytes_transferred -= 1;
> + break;
> + } else if (bytes_sent > 0) {
> + qemu_put_buffer(f,
> comp_param[idx].migbuf.buf,
> + comp_param[idx].migbuf.buf_index);
> + comp_param[idx].migbuf.buf_index = 0;
> + set_common_compress_params(&comp_param[idx],
> + ret, len, block, offset, last_stage,
> + cont, p, ram_bulk_stage);
> + comp_param[idx].state = COM_START;
> + break;
> + }
> + }
> + }
> + if (idx < compress_thread_count) {
> + last_sent_block = block;
> + break;
> + } else {
> + g_usleep(0);
> + goto retry;
> + }
No; again this shouldn't be using usleep to do stuff between threads; do
stuff using proper safe thread ops, and probably a queue or something
that holds things to the different threads.
> + }
> }
> }
> }
> @@ -684,7 +1012,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool
> last_stage)
> return bytes_sent;
> }
>
> -static uint64_t bytes_transferred;
>
> void acct_update_position(QEMUFile *f, size_t size, bool zero)
> {
> @@ -892,6 +1219,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
> i++;
> }
>
> + flush_compressed_data(f);
> qemu_mutex_unlock_ramlist();
>
> /*
> @@ -938,6 +1266,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
> bytes_transferred += bytes_sent;
> }
>
> + flush_compressed_data(f);
> ram_control_after_iterate(f, RAM_CONTROL_FINISH);
> migration_end();
>
> @@ -1038,10 +1367,61 @@ void ram_handle_compressed(void *host, uint8_t ch,
> uint64_t size)
> }
> }
>
> +QemuThread *decompress_threads;
> +
> +static void *do_data_decompress(void *opaque)
> +{
> + decompress_param *param = opaque;
> + while (incomming_migration_done == false) {
> + if (param->state == COM_START) {
> + uLong pagesize = TARGET_PAGE_SIZE;
> + if (uncompress((Bytef *)param->des, &pagesize,
> + (const Bytef *)param->compbuf, param->len) != Z_OK) {
> + error_report("Uncompress Failed!\n");
Again \n on error_report.
> + break;
> + }
> + param->state = COM_DONE;
> + } else {
> + if (quit_thread) {
> + break;
> + }
> + g_usleep(1);
and the usleep.
> + }
> + }
> + return NULL;
> +}
> +
> +void migrate_decompress_threads_create(int count)
> +{
> + int i;
> + decompress_thread_count = count;
> + decompress_threads = g_malloc0(sizeof(QemuThread) * count);
> + decomp_param = g_malloc0(sizeof(decompress_param) * count);
> + quit_thread = false;
> + for (i = 0; i < count; i++) {
> + qemu_thread_create(decompress_threads + i, "decompress",
> + do_data_decompress, decomp_param + i, QEMU_THREAD_JOINABLE);
> + }
> +}
> +
> +void migrate_decompress_threads_join(void)
> +{
> + int i;
> + for (i = 0; i < decompress_thread_count; i++) {
> + qemu_thread_join(decompress_threads + i);
> + }
> + g_free(decompress_threads);
> + g_free(decomp_param);
> + decompress_threads = NULL;
> + decomp_param = NULL;
> +}
> +
> static int ram_load(QEMUFile *f, void *opaque, int version_id)
> {
> int flags = 0, ret = 0;
> static uint64_t seq_iter;
> + int len = 0;
> + uint8_t compbuf[COMPRESS_BUF_SIZE];
>
> seq_iter++;
>
> @@ -1106,6 +1486,7 @@ static int ram_load(QEMUFile *f, void *opaque, int
> version_id)
> ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
> break;
> case RAM_SAVE_FLAG_PAGE:
> + quit_thread = true;
> host = host_from_stream_offset(f, addr, flags);
> if (!host) {
> error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
> @@ -1115,6 +1496,32 @@ static int ram_load(QEMUFile *f, void *opaque, int
> version_id)
>
> qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
> break;
> + case RAM_SAVE_FLAG_COMPRESS_PAGE:
> + host = host_from_stream_offset(f, addr, flags);
> + if (!host) {
> + error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
> + ret = -EINVAL;
> + break;
> + }
> +
> + len = qemu_get_be32(f);
> + qemu_get_buffer(f, compbuf, len);
> + int idx;
> +retry:
> + for (idx = 0; idx < decompress_thread_count; idx++) {
> + if (decomp_param[idx].state == COM_DONE) {
> + memcpy(decomp_param[idx].compbuf, compbuf, len);
> + decomp_param[idx].des = host;
> + decomp_param[idx].len = len;
> + decomp_param[idx].state = COM_START;
> + break;
> + }
> + }
> + if (idx == decompress_thread_count) {
> + g_usleep(0);
> + goto retry;
> + }
> + break;
Same comments as above.
> case RAM_SAVE_FLAG_XBZRLE:
> host = host_from_stream_offset(f, addr, flags);
> if (!host) {
> diff --git a/hmp-commands.hx b/hmp-commands.hx
> index e37bc8b..8b93bed 100644
> --- a/hmp-commands.hx
> +++ b/hmp-commands.hx
> @@ -941,6 +941,56 @@ Set cache size to @var{value} (in bytes) for xbzrle
> migrations.
> ETEXI
>
> {
> + .name = "migrate_set_compress_level",
> + .args_type = "value:i",
> + .params = "value",
> + .help = "set compress level for compress migrations,"
> + "the level is a number between 0 and 9, 0 stands for "
> + "no compression.\n"
> + "1 stands for the fast compress speed while 9 stands
> for"
> + "the highest compress ratio.",
> + .mhandler.cmd = hmp_migrate_set_compress_level,
> + },
> +
> +STEXI
> address@hidden migrate_set_compress_level @var{value}
> address@hidden migrate_set_compress_level
> +Set compress level to @var{value} for compress migrations.
> +ETEXI
> +
> + {
> + .name = "migrate_set_compress_threads",
> + .args_type = "value:i",
> + .params = "value",
> + .help = "set compress thread count for migrations. "
> + "a proper thread count will accelerate the migration
> speed,"
> + "the threads should be between 1 and the CPUS of your
> system",
> + .mhandler.cmd = hmp_migrate_set_compress_threads,
> + },
> +
> +STEXI
> address@hidden migrate_set_compress_threads @var{value}
> address@hidden migrate_set_compress_threads
> +Set compress threads to @var{value} for compress migrations.
> +ETEXI
> +
> + {
> + .name = "migrate_set_decompress_threads",
> + .args_type = "value:i",
> + .params = "value",
> + .help = "set decompress thread count for migrations. "
> + "a proper thread count will accelerate the migration
> speed,"
> + "the threads should be between 1 and the CPUS of your
> system",
> + .mhandler.cmd = hmp_migrate_set_decompress_threads,
> + },
> +
> +STEXI
> address@hidden migrate_set_decompress_threads @var{value}
> address@hidden migrate_set_decompress_threads
> +Set decompress threads to @var{value} for compress migrations.
> +ETEXI
> +
> + {
> .name = "migrate_set_speed",
> .args_type = "value:o",
> .params = "value",
> @@ -1766,6 +1816,12 @@ show migration status
> show current migration capabilities
> @item info migrate_cache_size
> show current migration XBZRLE cache size
> address@hidden info migrate_compress_level
> +show current migration compress level
> address@hidden info migrate_compress_threads
> +show current migration compress threads
> address@hidden info migrate_decompress_threads
> +show current migration decompress threads
> @item info balloon
> show balloon information
> @item info qtree
> diff --git a/hmp.c b/hmp.c
> index 63d7686..b1936a3 100644
> --- a/hmp.c
> +++ b/hmp.c
> @@ -252,6 +252,24 @@ void hmp_info_migrate_cache_size(Monitor *mon, const
> QDict *qdict)
> qmp_query_migrate_cache_size(NULL) >> 10);
> }
>
> +void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict)
> +{
> + monitor_printf(mon, "compress level: %" PRId64 "\n",
> + qmp_query_migrate_compress_level(NULL));
> +}
> +
> +void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict)
> +{
> + monitor_printf(mon, "compress threads: %" PRId64 "\n",
> + qmp_query_migrate_compress_threads(NULL));
> +}
> +
> +void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict)
> +{
> + monitor_printf(mon, "decompress threads: %" PRId64 "\n",
> + qmp_query_migrate_decompress_threads(NULL));
> +}
> +
> void hmp_info_cpus(Monitor *mon, const QDict *qdict)
> {
> CpuInfoList *cpu_list, *cpu;
> @@ -1041,6 +1059,45 @@ void hmp_migrate_set_cache_size(Monitor *mon, const
> QDict *qdict)
> }
> }
>
> +void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict)
> +{
> + int64_t value = qdict_get_int(qdict, "value");
> + Error *err = NULL;
> +
> + qmp_migrate_set_compress_level(value, &err);
> + if (err) {
> + monitor_printf(mon, "%s\n", error_get_pretty(err));
> + error_free(err);
> + return;
> + }
> +}
> +
> +void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict)
> +{
> + int64_t value = qdict_get_int(qdict, "value");
> + Error *err = NULL;
> +
> + qmp_migrate_set_compress_threads(value, &err);
> + if (err) {
> + monitor_printf(mon, "%s\n", error_get_pretty(err));
> + error_free(err);
> + return;
> + }
> +}
> +
> +void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict)
> +{
> + int64_t value = qdict_get_int(qdict, "value");
> + Error *err = NULL;
> +
> + qmp_migrate_set_decompress_threads(value, &err);
> + if (err) {
> + monitor_printf(mon, "%s\n", error_get_pretty(err));
> + error_free(err);
> + return;
> + }
> +}
> +
> void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict)
> {
> int64_t value = qdict_get_int(qdict, "value");
> diff --git a/hmp.h b/hmp.h
> index 4bb5dca..b348806 100644
> --- a/hmp.h
> +++ b/hmp.h
> @@ -29,6 +29,9 @@ void hmp_info_mice(Monitor *mon, const QDict *qdict);
> void hmp_info_migrate(Monitor *mon, const QDict *qdict);
> void hmp_info_migrate_capabilities(Monitor *mon, const QDict *qdict);
> void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict);
> +void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict);
> +void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict);
> +void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict);
> void hmp_info_cpus(Monitor *mon, const QDict *qdict);
> void hmp_info_block(Monitor *mon, const QDict *qdict);
> void hmp_info_blockstats(Monitor *mon, const QDict *qdict);
> @@ -64,6 +67,9 @@ void hmp_migrate_set_downtime(Monitor *mon, const QDict
> *qdict);
> void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict);
> void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict);
> void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict);
> +void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict);
> +void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict);
> +void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict);
> void hmp_set_password(Monitor *mon, const QDict *qdict);
> void hmp_expire_password(Monitor *mon, const QDict *qdict);
> void hmp_eject(Monitor *mon, const QDict *qdict);
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 3cb5ba8..03c8e0d 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -49,6 +49,9 @@ struct MigrationState
> QemuThread thread;
> QEMUBH *cleanup_bh;
> QEMUFile *file;
> + QemuThread *compress_thread;
> + int compress_thread_count;
> + int compress_level;
>
> int state;
> MigrationParams params;
> @@ -64,6 +67,7 @@ struct MigrationState
> int64_t dirty_sync_count;
> };
>
> +extern bool incomming_migration_done;
> void process_incoming_migration(QEMUFile *f);
>
> void qemu_start_incoming_migration(const char *uri, Error **errp);
> @@ -107,6 +111,10 @@ bool migration_has_finished(MigrationState *);
> bool migration_has_failed(MigrationState *);
> MigrationState *migrate_get_current(void);
>
> +void migrate_compress_threads_create(MigrationState *s);
> +void migrate_compress_threads_join(MigrationState *s);
> +void migrate_decompress_threads_create(int count);
> +void migrate_decompress_threads_join(void);
> uint64_t ram_bytes_remaining(void);
> uint64_t ram_bytes_transferred(void);
> uint64_t ram_bytes_total(void);
> @@ -144,7 +152,7 @@ void migrate_del_blocker(Error *reason);
>
> bool migrate_rdma_pin_all(void);
> bool migrate_zero_blocks(void);
> -
> +bool migrate_use_compress(void);
> bool migrate_auto_converge(void);
>
> int xbzrle_encode_buffer(uint8_t *old_buf, uint8_t *new_buf, int slen,
> @@ -153,6 +161,8 @@ int xbzrle_decode_buffer(uint8_t *src, int slen, uint8_t
> *dst, int dlen);
>
> int migrate_use_xbzrle(void);
> int64_t migrate_xbzrle_cache_size(void);
> +int migrate_compress_level(void);
> +int migrate_compress_threads(void);
>
> int64_t xbzrle_cache_resize(int64_t new_size);
>
> diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
> index 401676b..431e6cc 100644
> --- a/include/migration/qemu-file.h
> +++ b/include/migration/qemu-file.h
> @@ -112,6 +112,7 @@ QEMUFile *qemu_bufopen(const char *mode, QEMUSizedBuffer
> *input);
> int qemu_get_fd(QEMUFile *f);
> int qemu_fclose(QEMUFile *f);
> int64_t qemu_ftell(QEMUFile *f);
> +uint64_t qemu_add_compress(QEMUFile *f, const uint8_t *p, int size);
Huh? I don't see the code for this anywhere?
> void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size);
> void qemu_put_byte(QEMUFile *f, int v);
> /*
> diff --git a/migration.c b/migration.c
> index c49a05a..716de97 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -46,6 +46,12 @@ enum {
> /* Migration XBZRLE default cache size */
> #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024)
>
> +/* Migration compress default thread count */
> +#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
> +#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
> +/*0: means nocompress, 1: best speed, ... 9: best compress ratio */
> +#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
> +
> static NotifierList migration_state_notifiers =
> NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
>
> @@ -60,6 +66,8 @@ MigrationState *migrate_get_current(void)
> .bandwidth_limit = MAX_THROTTLE,
> .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
> .mbps = -1,
> + .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
> + .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
> };
>
> return ¤t_migration;
> @@ -101,6 +109,7 @@ static void process_incoming_migration_co(void *opaque)
> error_report("load of migration failed: %s", strerror(-ret));
> exit(EXIT_FAILURE);
> }
> + incomming_migration_done = true;
> qemu_announce_self();
>
> /* Make sure all file formats flush their mutable metadata */
> @@ -116,10 +125,14 @@ static void process_incoming_migration_co(void *opaque)
> } else {
> runstate_set(RUN_STATE_PAUSED);
> }
> + migrate_decompress_threads_join();
> }
>
> +static int uncompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT;
> void process_incoming_migration(QEMUFile *f)
> {
> + incomming_migration_done = false;
> + migrate_decompress_threads_create(uncompress_thread_count);
> Coroutine *co = qemu_coroutine_create(process_incoming_migration_co);
> int fd = qemu_get_fd(f);
>
> @@ -302,6 +315,7 @@ static void migrate_fd_cleanup(void *opaque)
> qemu_thread_join(&s->thread);
> qemu_mutex_lock_iothread();
>
> + migrate_compress_threads_join(s);
> qemu_fclose(s->file);
> s->file = NULL;
> }
> @@ -373,6 +387,8 @@ static MigrationState *migrate_init(const MigrationParams
> *params)
> int64_t bandwidth_limit = s->bandwidth_limit;
> bool enabled_capabilities[MIGRATION_CAPABILITY_MAX];
> int64_t xbzrle_cache_size = s->xbzrle_cache_size;
> + int compress_level = s->compress_level;
> + int compress_thread_count = s->compress_thread_count;
>
> memcpy(enabled_capabilities, s->enabled_capabilities,
> sizeof(enabled_capabilities));
> @@ -383,6 +399,8 @@ static MigrationState *migrate_init(const MigrationParams
> *params)
> sizeof(enabled_capabilities));
> s->xbzrle_cache_size = xbzrle_cache_size;
>
> + s->compress_level = compress_level;
> + s->compress_thread_count = compress_thread_count;
> s->bandwidth_limit = bandwidth_limit;
> s->state = MIG_STATE_SETUP;
> trace_migrate_set_state(MIG_STATE_SETUP);
> @@ -503,6 +521,59 @@ int64_t qmp_query_migrate_cache_size(Error **errp)
> return migrate_xbzrle_cache_size();
> }
>
> +void qmp_migrate_set_compress_level(int64_t value, Error **errp)
> +{
> + MigrationState *s = migrate_get_current();
> +
> + if (value > 9 || value < 0) {
> + error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress level",
> + "is invalid, please input a integer between 0 and 9. ");
> + return;
> + }
> +
> + s->compress_level = value;
> +}
> +
> +int64_t qmp_query_migrate_compress_level(Error **errp)
> +{
> + return migrate_compress_level();
> +}
> +
> +void qmp_migrate_set_compress_threads(int64_t value, Error **errp)
> +{
> + MigrationState *s = migrate_get_current();
> +
> + if (value > 255 || value < 1) {
> + error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread
> count",
> + "is invalid, please input a integer between 1 and 255. ");
> + return;
> + }
> +
> + s->compress_thread_count = value;
> +}
> +
> +void qmp_migrate_set_decompress_threads(int64_t value, Error **errp)
> +{
> +
> + if (value > 255 || value < 1) {
> + error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread
> count",
> + "is invalid, please input a integer between 1 and 255. ");
> + return;
> + }
> +
> + uncompress_thread_count = value;
> +}
> +
> +int64_t qmp_query_migrate_compress_threads(Error **errp)
> +{
> + return migrate_compress_threads();
> +}
> +
> +int64_t qmp_query_migrate_decompress_threads(Error **errp)
> +{
> + return uncompress_thread_count;
> +}
> +
> void qmp_migrate_set_speed(int64_t value, Error **errp)
> {
> MigrationState *s;
> @@ -555,6 +626,33 @@ bool migrate_zero_blocks(void)
> return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS];
> }
>
> +bool migrate_use_compress(void)
> +{
> + MigrationState *s;
> +
> + s = migrate_get_current();
> +
> + return s->enabled_capabilities[MIGRATION_CAPABILITY_COMPRESS];
> +}
> +
> +int migrate_compress_level(void)
> +{
> + MigrationState *s;
> +
> + s = migrate_get_current();
> +
> + return s->compress_level;
> +}
> +
> +int migrate_compress_threads(void)
> +{
> + MigrationState *s;
> +
> + s = migrate_get_current();
> +
> + return s->compress_thread_count;
> +}
> +
> int migrate_use_xbzrle(void)
> {
> MigrationState *s;
> @@ -697,4 +795,5 @@ void migrate_fd_connect(MigrationState *s)
>
> qemu_thread_create(&s->thread, "migration", migration_thread, s,
> QEMU_THREAD_JOINABLE);
> + migrate_compress_threads_create(s);
> }
> diff --git a/monitor.c b/monitor.c
> index 905d8cf..365547e 100644
> --- a/monitor.c
> +++ b/monitor.c
> @@ -2865,6 +2865,27 @@ static mon_cmd_t info_cmds[] = {
> .mhandler.cmd = hmp_info_migrate_cache_size,
> },
> {
> + .name = "migrate_compress_level",
> + .args_type = "",
> + .params = "",
> + .help = "show current migration compress level",
> + .mhandler.cmd = hmp_info_migrate_compress_level,
> + },
> + {
> + .name = "migrate_compress_threads",
> + .args_type = "",
> + .params = "",
> + .help = "show current migration compress thread count",
> + .mhandler.cmd = hmp_info_migrate_compress_threads,
> + },
> + {
> + .name = "migrate_decompress_threads",
> + .args_type = "",
> + .params = "",
> + .help = "show current migration decompress thread count",
> + .mhandler.cmd = hmp_info_migrate_decompress_threads,
> + },
> + {
> .name = "balloon",
> .args_type = "",
> .params = "",
> diff --git a/qapi-schema.json b/qapi-schema.json
> index 24379ab..71a9e0f 100644
> --- a/qapi-schema.json
> +++ b/qapi-schema.json
> @@ -491,13 +491,17 @@
> # to enable the capability on the source VM. The feature is
> disabled by
> # default. (since 1.6)
> #
> +# @compress: Using the multiple compression threads to accelerate live
> migration.
> +# This feature can help to reduce the migration traffic, by sending
> +# compressed pages. The feature is disabled by default. (since 2.3)
> +#
> # @auto-converge: If enabled, QEMU will automatically throttle down the guest
> # to speed up convergence of RAM migration. (since 1.6)
> #
> # Since: 1.2
> ##
> { 'enum': 'MigrationCapability',
> - 'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] }
> + 'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks',
> 'compress'] }
>
> ##
> # @MigrationCapabilityStatus
> @@ -1382,6 +1386,88 @@
> { 'command': 'query-migrate-cache-size', 'returns': 'int' }
>
> ##
> +# @migrate-set-compress-level
> +#
> +# Set compress level
> +#
> +# @value: compress level int
> +#
> +# The compress level will be an integer between 0 and 9.
> +# The compress level can be modified before and during ongoing migration
> +#
> +# Returns: nothing on success
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'migrate-set-compress-level', 'data': {'value': 'int'} }
> +
> +##
> +# @query-migrate-compress-level
> +#
> +# query compress level
> +#
> +# Returns: compress level int
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'query-migrate-compress-level', 'returns': 'int' }
> +
> +##
> +# @migrate-set-compress-threads
> +#
> +# Set compress threads
> +#
> +# @value: compress threads int
> +#
> +# The compress thread count is an integer between 1 and 255.
> +# The compress level can be modified only before migration
> +#
> +# Returns: nothing on success
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'migrate-set-compress-threads', 'data': {'value': 'int'} }
> +
> +##
> +# @query-migrate-compress-threads
> +#
> +# query compress threads
> +#
> +# Returns: compress threads int
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'query-migrate-compress-threads', 'returns': 'int' }
> +
> +##
> +##
> +# @migrate-set-decompress-threads
> +#
> +# Set decompress threads
> +#
> +# @value: decompress threads int
> +#
> +# The decompress thread count is an integer between 1 and 255.
> +# The decompress level can be modified only before migration
> +#
> +# Returns: nothing on success
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'migrate-set-decompress-threads', 'data': {'value': 'int'} }
> +
> +##
> +# @query-migrate-decompress-threads
> +#
> +# query decompress threads
> +#
> +# Returns: decompress threads int
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'query-migrate-decompress-threads', 'returns': 'int' }
> +
> +##
> # @ObjectPropertyInfo:
> #
> # @name: the name of the property
> diff --git a/qmp-commands.hx b/qmp-commands.hx
> index 1abd619..b60fdab 100644
> --- a/qmp-commands.hx
> +++ b/qmp-commands.hx
> @@ -705,7 +705,138 @@ Example:
> <- { "return": 67108864 }
>
> EQMP
> +{
> + .name = "migrate-set-compress-level",
> + .args_type = "value:i",
> + .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_level,
> + },
> +
> +SQMP
> +migrate-set-compress-level
> +----------------------
> +
> +Set compress level to be used by compress migration, the compress level is
> an integer
> +between 0 and 9
> +
> +Arguments:
> +
> +- "value": compress level (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-compress-level", "arguments": { "value":
> 536870912 } }
> +<- { "return": {} }
> +
> +EQMP
> + {
> + .name = "query-migrate-compress-level",
> + .args_type = "",
> + .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_level,
> + },
> +
> +SQMP
> +query-migrate-compress-level
> +------------------------
> +
> +Show compress level to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-level" }
> +<- { "return": 67108864 }
> +
> +EQMP
> +{
> + .name = "migrate-set-compress-threads",
> + .args_type = "value:i",
> + .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_threads,
> + },
> +
> +SQMP
> +migrate-set-compress-threads
> +----------------------
> +
> +Set compress thread count to be used by compress migration, the compress
> thread count is an integer
> +between 1 and 255
> +
> +Arguments:
> +
> +- "value": compress threads (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-compress-threads", "arguments": { "value":
> 536870912 } }
> +<- { "return": {} }
> +
> +EQMP
> + {
> + .name = "query-migrate-compress-threads",
> + .args_type = "",
> + .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_threads,
> + },
> +
> +SQMP
> +query-migrate-compress-threads
> +------------------------
> +
> +Show compress thread count to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-threads" }
> +<- { "return": 67108864 }
> +
> +EQMP
> +{
> + .name = "migrate-set-decompress-threads",
> + .args_type = "value:i",
> + .mhandler.cmd_new = qmp_marshal_input_migrate_set_decompress_threads,
> + },
> +
> +SQMP
> +migrate-set-decompress-threads
> +----------------------
> +
> +Set decompress thread count to be used by compress migration, the decompress
> thread count is an integer
> +between 1 and 255
> +
> +Arguments:
> +
> +- "value": decompress threads (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-decompress-threads", "arguments": { "value":
> 536870912 } }
> +<- { "return": {} }
>
> +EQMP
> + {
> + .name = "query-migrate-decompress-threads",
> + .args_type = "",
> + .mhandler.cmd_new =
> qmp_marshal_input_query_migrate_decompress_threads,
> + },
> +
> +SQMP
> +query-migrate-decompress-threads
> +------------------------
> +
> +Show decompress thread count to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-threads" }
> +<- { "return": 67108864 }
> +
> +EQMP
> {
> .name = "migrate_set_speed",
> .args_type = "value:o",
> --
> 1.9.1
>
>
--
Dr. David Alan Gilbert / address@hidden / Manchester, UK
- [Qemu-devel] [v2 1/2] docs: Add a doc about multiple compression threads, (continued)
- [Qemu-devel] [v2 1/2] docs: Add a doc about multiple compression threads, Li Liang, 2014/11/06
- [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads, Li Liang, 2014/11/06
- Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads,
Dr. David Alan Gilbert <=
- Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads, ChenLiang, 2014/11/21
- Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads, Li, Liang Z, 2014/11/21
- Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads, ChenLiang, 2014/11/21
- Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads, Li, Liang Z, 2014/11/21
- Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads, ChenLiang, 2014/11/21
- Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads, ChenLiang, 2014/11/21