qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [v6 08/14] migration: Add the core code of multi-thread


From: Juan Quintela
Subject: Re: [Qemu-devel] [v6 08/14] migration: Add the core code of multi-thread compression
Date: Fri, 27 Mar 2015 11:47:25 +0100
User-agent: Gnus/5.13 (Gnus v5.13) Emacs/24.4 (gnu/linux)

Liang Li <address@hidden> wrote:
> Implement the core logic of the multiple thread compression. At this
> point, multiple thread compression can't co-work with xbzrle yet.
>
> Signed-off-by: Liang Li <address@hidden>
> Signed-off-by: Yang Zhang <address@hidden>


Coming back to here, as we have the full code.

> ---
>  arch_init.c | 184 
> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
>  1 file changed, 177 insertions(+), 7 deletions(-)
>
> diff --git a/arch_init.c b/arch_init.c
> index 48cae22..9f63c0f 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -355,12 +355,33 @@ static DecompressParam *decomp_param;
>  static QemuThread *decompress_threads;
>  static uint8_t *compressed_data_buf;
>  
> +static int do_compress_ram_page(CompressParam *param);
> +
>  static void *do_data_compress(void *opaque)
>  {
> -    while (!quit_comp_thread) {
> +    CompressParam *param = opaque;
>  
> -    /* To be done */
Change this line to

> +    while (!quit_comp_thread) {

  while(true) {

> +        qemu_mutex_lock(&param->mutex);
> +        /* Re-check the quit_comp_thread in case of
> +         * terminate_compression_threads is called just before
> +         * qemu_mutex_lock(&param->mutex) and after
> +         * while(!quit_comp_thread), re-check it here can make
> +         * sure the compression thread terminate as expected.
> +         */
Change this

> +        while (!param->start && !quit_comp_thread) {

to

while (!param->start && !parm->quit) {

> +            qemu_cond_wait(&param->cond, &param->mutex);
> +        }

And this

> +        if (!quit_comp_thread) {

to

      if (!param->quit) {
> +            do_compress_ram_page(param);
> +        }

Take care here of exiting correctly of the loop.
Notice that the only case where we are not going to take the look is the
last iteration, so I think the optimization don't gives us nothing (in
this place), no?

> +        param->start = false;
> +        qemu_mutex_unlock(&param->mutex);
>  
> +        qemu_mutex_lock(comp_done_lock);
> +        param->done = true;
> +        qemu_cond_signal(comp_done_cond);
> +        qemu_mutex_unlock(comp_done_lock);
>      }


>  
>      return NULL;
> @@ -368,9 +389,15 @@ static void *do_data_compress(void *opaque)
>  
>  static inline void terminate_compression_threads(void)
>  {
> -    quit_comp_thread = true;
> +    int idx, thread_count;
>  
> -    /* To be done */
> +    thread_count = migrate_compress_threads();
> +    quit_comp_thread = true;


> +    for (idx = 0; idx < thread_count; idx++) {
> +        qemu_mutex_lock(&comp_param[idx].mutex);
Add this
           comp_param[idx].quit = true;


And for now on, quit_comp_thread is only used on migration_thread, so it
should be safe to use, no?

flush_compresed_data() is only ever called from the migration_thread, so
no lock there needed either.

> +        qemu_cond_signal(&comp_param[idx].cond);
> +        qemu_mutex_unlock(&comp_param[idx].mutex);
> +    }
>  }
>  
>  void migrate_compress_threads_join(void)
> @@ -420,6 +447,7 @@ void migrate_compress_threads_create(void)
>           * it's ops to empty.
>           */
>          comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
> +        comp_param[i].done = true;
>          qemu_mutex_init(&comp_param[i].mutex);
>          qemu_cond_init(&comp_param[i].cond);
>          qemu_thread_create(compress_threads + i, "compress",
> @@ -829,6 +857,97 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, 
> ram_addr_t offset,
>      return pages;
>  }
>  
> +static int do_compress_ram_page(CompressParam *param)
> +{
> +    int bytes_sent, blen;
> +    uint8_t *p;
> +    RAMBlock *block = param->block;
> +    ram_addr_t offset = param->offset;
> +
> +    p = memory_region_get_ram_ptr(block->mr) + (offset & TARGET_PAGE_MASK);
> +
> +    bytes_sent = save_page_header(param->file, block, offset |
> +                                  RAM_SAVE_FLAG_COMPRESS_PAGE);
> +    blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
> +                                     migrate_compress_level());
> +    bytes_sent += blen;
> +    atomic_inc(&acct_info.norm_pages);
> +
> +    return bytes_sent;
> +}
> +
> +static inline void start_compression(CompressParam *param)
> +{
> +    param->done = false;
> +    qemu_mutex_lock(&param->mutex);
> +    param->start = true;
> +    qemu_cond_signal(&param->cond);
> +    qemu_mutex_unlock(&param->mutex);
> +}
> +
> +
> +static uint64_t bytes_transferred;
> +
> +static void flush_compressed_data(QEMUFile *f)
> +{
> +    int idx, len, thread_count;
> +
> +    if (!migrate_use_compression()) {
> +        return;
> +    }
> +    thread_count = migrate_compress_threads();
> +    for (idx = 0; idx < thread_count; idx++) {
> +        if (!comp_param[idx].done) {
> +            qemu_mutex_lock(comp_done_lock);
> +            while (!comp_param[idx].done && !quit_comp_thread) {
> +                qemu_cond_wait(comp_done_cond, comp_done_lock);
> +            }
> +            qemu_mutex_unlock(comp_done_lock);
> +        }
> +        if (!quit_comp_thread) {
> +            len = qemu_put_qemu_file(f, comp_param[idx].file);
> +            bytes_transferred += len;
> +        }
> +    }
> +}
> +
> +static inline void set_compress_params(CompressParam *param, RAMBlock *block,
> +                                       ram_addr_t offset)
> +{
> +    param->block = block;
> +    param->offset = offset;
> +}
> +
> +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
> +                                           ram_addr_t offset,
> +                                           uint64_t *bytes_transferred)
> +{
> +    int idx, thread_count, bytes_xmit = -1, pages = -1;
> +
> +    thread_count = migrate_compress_threads();
> +    qemu_mutex_lock(comp_done_lock);
> +    while (true) {
> +        for (idx = 0; idx < thread_count; idx++) {
> +            if (comp_param[idx].done) {
> +                bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file);
> +                set_compress_params(&comp_param[idx], block, offset);
> +                start_compression(&comp_param[idx]);
> +                pages = 1;
> +                *bytes_transferred += bytes_xmit;
> +                break;
> +            }
> +        }
> +        if (pages > 0) {
> +            break;
> +        } else {
> +            qemu_cond_wait(comp_done_cond, comp_done_lock);
> +        }
> +    }
> +    qemu_mutex_unlock(comp_done_lock);
> +
> +    return pages;
> +}
> +
>  /**
>   * ram_save_compressed_page: compress the given page and send it to the 
> stream
>   *
> @@ -845,8 +964,59 @@ static int ram_save_compressed_page(QEMUFile *f, 
> RAMBlock *block,
>                                      uint64_t *bytes_transferred)
>  {
>      int pages = -1;
> +    uint64_t bytes_xmit;
> +    MemoryRegion *mr = block->mr;
> +    uint8_t *p;
> +    int ret;
> +
> +    p = memory_region_get_ram_ptr(mr) + offset;
>  
> -    /* To be done*/
> +    bytes_xmit = 0;
> +    ret = ram_control_save_page(f, block->offset,
> +                                offset, TARGET_PAGE_SIZE, &bytes_xmit);
> +    if (bytes_xmit) {
> +        *bytes_transferred += bytes_xmit;
> +        pages = 1;
> +    }
> +    if (block == last_sent_block) {
> +        offset |= RAM_SAVE_FLAG_CONTINUE;
> +    }
> +    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> +        if (ret != RAM_SAVE_CONTROL_DELAYED) {
> +            if (bytes_xmit > 0) {
> +                acct_info.norm_pages++;
> +            } else if (bytes_xmit == 0) {
> +                acct_info.dup_pages++;
> +            }
> +        }
> +    } else {
> +        /* When starting the process of a new block, the first page of
> +         * the block should be sent out before other pages in the same
> +         * block, and all the pages in last block should have been sent
> +         * out, keeping this order is important, because the 'cont' flag
> +         * is used to avoid resending the block name.
> +         */
> +        if (block != last_sent_block) {
> +            flush_compressed_data(f);
> +            pages = save_zero_page(f, block, offset, p, bytes_transferred);
> +            if (pages == -1) {
> +                set_compress_params(&comp_param[0], block, offset);
> +                /* Use the qemu thread to compress the data to make sure the
> +                 * first page is sent out before other pages
> +                 */
> +                bytes_xmit = do_compress_ram_page(&comp_param[0]);
> +                qemu_put_qemu_file(f, comp_param[0].file);
> +                *bytes_transferred += bytes_xmit;
> +                pages = 1;
> +            }
> +        } else {
> +            pages = save_zero_page(f, block, offset, p, bytes_transferred);
> +            if (pages == -1) {
> +                pages = compress_page_with_multi_thread(f, block, offset,
> +                                                        bytes_transferred);
> +            }
> +        }
> +    }
>  
>      return pages;
>  }
> @@ -914,8 +1084,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool 
> last_stage,
>      return pages;
>  }
>  
> -static uint64_t bytes_transferred;
> -
>  void acct_update_position(QEMUFile *f, size_t size, bool zero)
>  {
>      uint64_t pages = size / TARGET_PAGE_SIZE;
> @@ -1129,6 +1297,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>          }
>          i++;
>      }
> +    flush_compressed_data(f);
>      rcu_read_unlock();
>  
>      /*
> @@ -1170,6 +1339,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>          }
>      }
>  
> +    flush_compressed_data(f);
>      ram_control_after_iterate(f, RAM_CONTROL_FINISH);
>      migration_end();



reply via email to

[Prev in Thread] Current Thread [Next in Thread]