qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v5 17/17] migration: Flush receive queue


From: Dr. David Alan Gilbert
Subject: Re: [Qemu-devel] [PATCH v5 17/17] migration: Flush receive queue
Date: Thu, 20 Jul 2017 12:45:08 +0100
User-agent: Mutt/1.8.3 (2017-05-23)

* Juan Quintela (address@hidden) wrote:
> Each time that we sync the bitmap, it is a possiblity that we receive
> a page that is being processed by a different thread.  We fix this
> problem just making sure that we wait for all receiving threads to
> finish its work before we procedeed with the next stage.
> 
> We are low on page flags, so we use a combination that is not valid to
> emit that message:  MULTIFD_PAGE and COMPRESSED.
> 
> I tried to make a migration command for it, but it don't work because
> we sync the bitmap sometimes when we have already sent the beggining
> of the section, so I just added a new page flag.
> 
> Signed-off-by: Juan Quintela <address@hidden>
> ---
>  migration/ram.c | 57 
> ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 56 insertions(+), 1 deletion(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index c78b286..bffe204 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -71,6 +71,12 @@
>  #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
>  #define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
>  
> +/* We are getting low on pages flags, so we start using combinations
> +   When we need to flush a page, we sent it as
> +   RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_COMPRESS_PAGE
> +   We don't allow that combination
> +*/
> +
>  static inline bool is_zero_range(uint8_t *p, uint64_t size)
>  {
>      return buffer_is_zero(p, size);
> @@ -193,6 +199,9 @@ struct RAMState {
>      uint64_t iterations_prev;
>      /* Iterations since start */
>      uint64_t iterations;
> +    /* Indicates if we have synced the bitmap and we need to assure that
> +       target has processeed all previous pages */
> +    bool multifd_needs_flush;
>      /* protects modification of the bitmap */
>      uint64_t migration_dirty_pages;
>      /* number of dirty bits in the bitmap */
> @@ -363,7 +372,6 @@ static void compress_threads_save_setup(void)
>  
>  /* Multiple fd's */
>  
> -
>  typedef struct {
>      int num;
>      int size;
> @@ -595,9 +603,11 @@ struct MultiFDRecvParams {
>      QIOChannel *c;
>      QemuSemaphore ready;
>      QemuSemaphore sem;
> +    QemuCond cond_sync;
>      QemuMutex mutex;
>      /* proteced by param mutex */
>      bool quit;
> +    bool sync;
>      multifd_pages_t pages;
>      bool done;
>  };
> @@ -637,6 +647,7 @@ void multifd_load_cleanup(void)
>          qemu_thread_join(&p->thread);
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        qemu_cond_destroy(&p->cond_sync);
>          socket_recv_channel_destroy(p->c);
>          g_free(p);
>          multifd_recv_state->params[i] = NULL;
> @@ -675,6 +686,10 @@ static void *multifd_recv_thread(void *opaque)
>                  return NULL;
>              }
>              p->done = true;
> +            if (p->sync) {
> +                qemu_cond_signal(&p->cond_sync);
> +                p->sync = false;
> +            }
>              qemu_mutex_unlock(&p->mutex);
>              qemu_sem_post(&p->ready);
>              continue;
> @@ -724,9 +739,11 @@ gboolean multifd_new_channel(QIOChannel *ioc)
>      qemu_mutex_init(&p->mutex);
>      qemu_sem_init(&p->sem, 0);
>      qemu_sem_init(&p->ready, 0);
> +    qemu_cond_init(&p->cond_sync);
>      p->quit = false;
>      p->id = id;
>      p->done = false;
> +    p->sync = false;
>      multifd_init_group(&p->pages);
>      p->c = ioc;
>      atomic_set(&multifd_recv_state->params[id], p);
> @@ -792,6 +809,27 @@ static void multifd_recv_page(uint8_t *address, uint16_t 
> fd_num)
>      qemu_sem_post(&p->sem);
>  }
>  
> +static int multifd_flush(void)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_use_multifd()) {
> +        return 0;
> +    }
> +    thread_count = migrate_multifd_threads();
> +    for (i = 0; i < thread_count; i++) {
> +        MultiFDRecvParams *p = multifd_recv_state->params[i];
> +
> +        qemu_mutex_lock(&p->mutex);
> +        while (!p->done) {
> +            p->sync = true;
> +            qemu_cond_wait(&p->cond_sync, &p->mutex);
> +        }

I don't think I understand how that works in the case where the
recv_thread has already 'done' by the point you set sync=true; how does
it get back to the check and do the signal?

> +        qemu_mutex_unlock(&p->mutex);
> +    }
> +    return 0;
> +}
> +
>  /**
>   * save_page_header: write page header to wire
>   *
> @@ -809,6 +847,12 @@ static size_t save_page_header(RAMState *rs, QEMUFile 
> *f,  RAMBlock *block,
>  {
>      size_t size, len;
>  
> +    if (rs->multifd_needs_flush &&
> +        (offset & RAM_SAVE_FLAG_MULTIFD_PAGE)) {
> +        offset |= RAM_SAVE_FLAG_ZERO;

In the comment near the top you say RAM_SAVE_FLAG_COMPRESS_PAGE;  it's
probably best to add an alias at the top to make it clear, e.g.
  #define RAM_SAVE_FLAG_MULTIFD_SYNC RAM_SAVE_FLAG_ZERO

  or maybe (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO)

> +        rs->multifd_needs_flush = false;
> +    }
> +
>      if (block == rs->last_sent_block) {
>          offset |= RAM_SAVE_FLAG_CONTINUE;
>      }
> @@ -2496,6 +2540,9 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>  
>      if (!migration_in_postcopy()) {
>          migration_bitmap_sync(rs);
> +        if (migrate_use_multifd()) {
> +            rs->multifd_needs_flush = true;
> +        }
>      }
>  
>      ram_control_before_iterate(f, RAM_CONTROL_FINISH);
> @@ -2538,6 +2585,9 @@ static void ram_save_pending(QEMUFile *f, void *opaque, 
> uint64_t max_size,
>          qemu_mutex_lock_iothread();
>          rcu_read_lock();
>          migration_bitmap_sync(rs);
> +        if (migrate_use_multifd()) {
> +            rs->multifd_needs_flush = true;
> +        }
>          rcu_read_unlock();
>          qemu_mutex_unlock_iothread();
>          remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
> @@ -3012,6 +3062,11 @@ static int ram_load(QEMUFile *f, void *opaque, int 
> version_id)
>              break;
>          }
>  
> +        if ((flags & (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO))
> +                  == (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO)) {
> +            multifd_flush();
> +            flags = flags & ~RAM_SAVE_FLAG_ZERO;
> +        }
>          if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
>                       RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
>                       RAM_SAVE_FLAG_MULTIFD_PAGE)) {

Dave

> -- 
> 2.9.4
> 
--
Dr. David Alan Gilbert / address@hidden / Manchester, UK



reply via email to

[Prev in Thread] Current Thread [Next in Thread]