[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [v3 08/13] migration: Add the core code of multi-thread
From: |
Dr. David Alan Gilbert |
Subject: |
Re: [Qemu-devel] [v3 08/13] migration: Add the core code of multi-thread compresion |
Date: |
Fri, 23 Jan 2015 13:39:38 +0000 |
User-agent: |
Mutt/1.5.23 (2014-03-12) |
* Liang Li (address@hidden) wrote:
> At this point, multiple thread compression can't co-work with xbzrle.
>
> Signed-off-by: Liang Li <address@hidden>
> Signed-off-by: Yang Zhang <address@hidden>
> ---
> arch_init.c | 164
> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
> 1 file changed, 157 insertions(+), 7 deletions(-)
>
> diff --git a/arch_init.c b/arch_init.c
> index 0a575ed..4109ad7 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -369,23 +369,43 @@ static QemuMutex *mutex;
> static QemuCond *cond;
> static QEMUFileOps *empty_ops;
> static bool quit_thread;
> +static int one_byte_count;
> static decompress_param *decomp_param;
> static QemuThread *decompress_threads;
>
> +static int do_compress_ram_page(compress_param *param);
> +
> static void *do_data_compress(void *opaque)
> {
> + compress_param *param = opaque;
> while (!quit_thread) {
> -
> - /* To be done */
> -
> + qemu_mutex_lock(¶m->mutex);
> + while (param->state != START) {
> + qemu_cond_wait(¶m->cond, ¶m->mutex);
> + if (quit_thread) {
> + break;
> + }
> + do_compress_ram_page(param);
> + qemu_mutex_lock(mutex);
> + param->state = DONE;
> + qemu_cond_signal(cond);
> + qemu_mutex_unlock(mutex);
> + }
> + qemu_mutex_unlock(¶m->mutex);
> }
> +
> return NULL;
> }
>
> static inline void terminate_compression_threads(void)
> {
> + int idx, thread_count;
> +
> + thread_count = migrate_compress_threads();
> quit_thread = true;
> - /* To be done */
> + for (idx = 0; idx < thread_count; idx++) {
> + qemu_cond_signal(&comp_param[idx].cond);
> + }
> }
>
> void migrate_compress_threads_join(MigrationState *s)
> @@ -770,13 +790,142 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block,
> ram_addr_t offset,
> return bytes_sent;
> }
>
> +static int do_compress_ram_page(compress_param *param)
> +{
> + int bytes_sent;
> + int blen = COMPRESS_BUF_SIZE;
> + int cont;
> + uint8_t *p;
> + RAMBlock *block = param->block;
> + ram_addr_t offset = param->offset;
> +
> + cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> + p = memory_region_get_ram_ptr(block->mr) + offset;
> +
> + bytes_sent = save_block_hdr(param->file, block,
> + offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
> + blen = migrate_qemu_add_compression_data(param->file, p,
> + TARGET_PAGE_SIZE, migrate_compress_level());
> + bytes_sent += blen;
> + atomic_inc(&acct_info.norm_pages);
> +
> + return bytes_sent;
> +}
> +
> +static inline void start_compression(compress_param *param)
> +{
> + qemu_mutex_lock(¶m->mutex);
> + param->state = START;
> + qemu_cond_signal(¶m->cond);
> + qemu_mutex_unlock(¶m->mutex);
> +}
> +
> +
> +static uint64_t bytes_transferred;
> +
> +static void flush_compressed_data(QEMUFile *f)
> +{
> + int idx, len, thread_count;
> +
> + if (!migrate_use_compression()) {
> + return;
> + }
> + thread_count = migrate_compress_threads();
> + for (idx = 0; idx < thread_count; idx++) {
> + if (comp_param[idx].state != DONE) {
> + qemu_mutex_lock(mutex);
> + while (comp_param[idx].state != DONE) {
> + qemu_cond_wait(cond, mutex);
> + }
> + qemu_mutex_unlock(mutex);
> + }
> + len = migrate_qemu_flush(f, comp_param[idx].file);
> + bytes_transferred += len;
> + }
> + if ((one_byte_count > 0) && (bytes_transferred > one_byte_count)) {
> + bytes_transferred -= one_byte_count;
> + one_byte_count = 0;
> + }
> +}
> +
> +static inline void set_compress_params(compress_param *param,
> + RAMBlock *block, ram_addr_t offset)
> +{
> + param->block = block;
> + param->offset = offset;
> +}
> +
> +
> +static int compress_page_with_multi_thread(QEMUFile *f,
> + RAMBlock *block, ram_addr_t offset)
> +{
> + int idx, thread_count, bytes_sent = 0;
> +
> + thread_count = migrate_compress_threads();
> + qemu_mutex_lock(mutex);
> + while (true) {
> + for (idx = 0; idx < thread_count; idx++) {
> + if (comp_param[idx].state == DONE) {
> + bytes_sent = migrate_qemu_flush(f, comp_param[idx].file);
> + set_compress_params(&comp_param[idx],
> + block, offset);
> + start_compression(&comp_param[idx]);
> + if (bytes_sent == 0) {
> + /* set bytes_sent to 1 in this case to prevent migration
> + * from terminating, this 1 byte whill be added to
> + * bytes_transferred later, minus 1 to keep the
> + * bytes_transferred accurate */
> + bytes_sent = 1;
> + if (bytes_transferred <= 0) {
> + one_byte_count++;
> + } else {
> + bytes_transferred -= 1;
> + }
> + }
> + break;
> + }
> + }
> + if (bytes_sent > 0) {
> + break;
> + } else {
> + qemu_cond_wait(cond, mutex);
> + }
> + }
> + qemu_mutex_unlock(mutex);
> + return bytes_sent;
> +}
> +
> static int ram_save_compressed_page(QEMUFile *f, RAMBlock* block,
> ram_addr_t offset, bool last_stage)
> {
> int bytes_sent = 0;
>
> - /* To be done*/
> -
> + /* When starting the process of a new block, the first page of
> + * the block should be sent out before other pages in the same
> + * block, and all the pages in last block should have been sent
> + * out, keeping this order is important.
Why? Is this just because of the 'cont' flag used to avoid sending the
block names again?
Dave
> + */
> + if (block != last_sent_block) {
> + flush_compressed_data(f);
> + bytes_sent = save_zero_and_xbzrle_page(f, block, offset,
> + last_stage, NULL);
> + if (bytes_sent == -1) {
> + set_compress_params(&comp_param[0], block, offset);
> + /* Use the qemu thread to compress the data to make sure the
> + * first page is sent out before other pages
> + */
> + bytes_sent = do_compress_ram_page(&comp_param[0]);
> + if (bytes_sent > 0) {
> + migrate_qemu_flush(f, comp_param[0].file);
> + }
> + }
> + } else {
> + bytes_sent = save_zero_and_xbzrle_page(f, block, offset,
> + last_stage, NULL);
> + if (bytes_sent == -1) {
> + bytes_sent = compress_page_with_multi_thread(f, block, offset);
> + }
> + }
> return bytes_sent;
> }
>
> @@ -834,7 +983,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool
> last_stage)
> return bytes_sent;
> }
>
> -static uint64_t bytes_transferred;
>
> void acct_update_position(QEMUFile *f, size_t size, bool zero)
> {
> @@ -1043,6 +1191,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
> i++;
> }
>
> + flush_compressed_data(f);
> qemu_mutex_unlock_ramlist();
>
> /*
> @@ -1089,6 +1238,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
> bytes_transferred += bytes_sent;
> }
>
> + flush_compressed_data(f);
> ram_control_after_iterate(f, RAM_CONTROL_FINISH);
> migration_end();
>
> --
> 1.8.3.1
>
--
Dr. David Alan Gilbert / address@hidden / Manchester, UK
- Re: [Qemu-devel] [v3 08/13] migration: Add the core code of multi-thread compresion,
Dr. David Alan Gilbert <=