qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v5 12/45] Return path: Source handling of return


From: Dr. David Alan Gilbert
Subject: Re: [Qemu-devel] [PATCH v5 12/45] Return path: Source handling of return path
Date: Fri, 20 Mar 2015 18:17:31 +0000
User-agent: Mutt/1.5.23 (2014-03-12)

* David Gibson (address@hidden) wrote:
> On Wed, Feb 25, 2015 at 04:51:35PM +0000, Dr. David Alan Gilbert (git) wrote:
> > From: "Dr. David Alan Gilbert" <address@hidden>
> > 
> > Open a return path, and handle messages that are received upon it.
> > 
> > Signed-off-by: Dr. David Alan Gilbert <address@hidden>
> > ---
> >  include/migration/migration.h |   8 ++
> >  migration/migration.c         | 178 
> > +++++++++++++++++++++++++++++++++++++++++-
> >  trace-events                  |  13 +++
> >  3 files changed, 198 insertions(+), 1 deletion(-)
> > 
> > diff --git a/include/migration/migration.h b/include/migration/migration.h
> > index 6775747..5242ead 100644
> > --- a/include/migration/migration.h
> > +++ b/include/migration/migration.h
> > @@ -73,6 +73,14 @@ struct MigrationState
> >  
> >      int state;
> >      MigrationParams params;
> > +
> > +    /* State related to return path */
> > +    struct {
> > +        QEMUFile     *file;
> > +        QemuThread    rp_thread;
> > +        bool          error;
> > +    } rp_state;
> > +
> >      double mbps;
> >      int64_t total_time;
> >      int64_t downtime;
> > diff --git a/migration/migration.c b/migration/migration.c
> > index 80d234c..34cd4fe 100644
> > --- a/migration/migration.c
> > +++ b/migration/migration.c
> > @@ -237,6 +237,23 @@ MigrationCapabilityStatusList 
> > *qmp_query_migrate_capabilities(Error **errp)
> >      return head;
> >  }
> >  
> > +/*
> > + * Return true if we're already in the middle of a migration
> > + * (i.e. any of the active or setup states)
> > + */
> > +static bool migration_already_active(MigrationState *ms)
> > +{
> > +    switch (ms->state) {
> > +    case MIG_STATE_ACTIVE:
> > +    case MIG_STATE_SETUP:
> > +        return true;
> > +
> > +    default:
> > +        return false;
> > +
> > +    }
> > +}
> > +
> >  static void get_xbzrle_cache_stats(MigrationInfo *info)
> >  {
> >      if (migrate_use_xbzrle()) {
> > @@ -362,6 +379,21 @@ static void migrate_set_state(MigrationState *s, int 
> > old_state, int new_state)
> >      }
> >  }
> >  
> > +static void migrate_fd_cleanup_src_rp(MigrationState *ms)
> > +{
> > +    QEMUFile *rp = ms->rp_state.file;
> > +
> > +    /*
> > +     * When stuff goes wrong (e.g. failing destination) on the rp, it can 
> > get
> > +     * cleaned up from a few threads; make sure not to do it twice in 
> > parallel
> > +     */
> > +    rp = atomic_cmpxchg(&ms->rp_state.file, rp, NULL);
> 
> A cmpxchg seems dangerously subtle for such a basic and infrequent
> operation, but ok.

I'll take other suggestions; but I'm trying to just do
'if the qemu_file still exists close it', and it didn't seem
worth introducing another state variable to atomically update
when we've already got the file pointer itself.

> > +    if (rp) {
> > +        trace_migrate_fd_cleanup_src_rp();
> > +        qemu_fclose(rp);
> > +    }
> > +}
> > +
> >  static void migrate_fd_cleanup(void *opaque)
> >  {
> >      MigrationState *s = opaque;
> > @@ -369,6 +401,8 @@ static void migrate_fd_cleanup(void *opaque)
> >      qemu_bh_delete(s->cleanup_bh);
> >      s->cleanup_bh = NULL;
> >  
> > +    migrate_fd_cleanup_src_rp(s);
> > +
> >      if (s->file) {
> >          trace_migrate_fd_cleanup();
> >          qemu_mutex_unlock_iothread();
> > @@ -406,6 +440,11 @@ static void migrate_fd_cancel(MigrationState *s)
> >      QEMUFile *f = migrate_get_current()->file;
> >      trace_migrate_fd_cancel();
> >  
> > +    if (s->rp_state.file) {
> > +        /* shutdown the rp socket, so causing the rp thread to shutdown */
> > +        qemu_file_shutdown(s->rp_state.file);
> 
> I missed where qemu_file_shutdown() was implemented.  Does this
> introduce a leftover socket dependency?

No, it shouldn't.  The shutdown() causes a shutdown(2) syscall to
be issued on the socket stopping anything blocking on it; it then
gets closed at the end after the rp thread has exited.

> > +    }
> > +
> >      do {
> >          old_state = s->state;
> >          if (old_state != MIG_STATE_SETUP && old_state != MIG_STATE_ACTIVE) 
> > {
> > @@ -658,8 +697,145 @@ int64_t migrate_xbzrle_cache_size(void)
> >      return s->xbzrle_cache_size;
> >  }
> >  
> > -/* migration thread support */
> > +/*
> > + * Something bad happened to the RP stream, mark an error
> > + * The caller shall print something to indicate why
> > + */
> > +static void source_return_path_bad(MigrationState *s)
> > +{
> > +    s->rp_state.error = true;
> > +    migrate_fd_cleanup_src_rp(s);
> > +}
> > +
> > +/*
> > + * Handles messages sent on the return path towards the source VM
> > + *
> > + */
> > +static void *source_return_path_thread(void *opaque)
> > +{
> > +    MigrationState *ms = opaque;
> > +    QEMUFile *rp = ms->rp_state.file;
> > +    uint16_t expected_len, header_len, header_com;
> > +    const int max_len = 512;
> > +    uint8_t buf[max_len];
> > +    uint32_t tmp32;
> > +    int res;
> > +
> > +    trace_source_return_path_thread_entry();
> > +    while (rp && !qemu_file_get_error(rp) &&
> > +        migration_already_active(ms)) {
> > +        trace_source_return_path_thread_loop_top();
> > +        header_com = qemu_get_be16(rp);
> > +        header_len = qemu_get_be16(rp);
> > +
> > +        switch (header_com) {
> > +        case MIG_RP_CMD_SHUT:
> > +        case MIG_RP_CMD_PONG:
> > +            expected_len = 4;
> 
> Could the knowledge of expected lengths be folded into the switch
> below?  Switching twice on the same thing is a bit icky.

No, because the length at this point is used to valdiate the
length field in the header prior to reading the body.
The other switch processes the contents of the body that
have been read.

> > +            break;
> > +
> > +        default:
> > +            error_report("RP: Received invalid cmd 0x%04x length 0x%04x",
> > +                    header_com, header_len);
> > +            source_return_path_bad(ms);
> > +            goto out;
> > +        }
> >  
> > +        if (header_len > expected_len) {
> > +            error_report("RP: Received command 0x%04x with"
> > +                    "incorrect length %d expecting %d",
> > +                    header_com, header_len,
> > +                    expected_len);
> > +            source_return_path_bad(ms);
> > +            goto out;
> > +        }
> > +
> > +        /* We know we've got a valid header by this point */
> > +        res = qemu_get_buffer(rp, buf, header_len);
> > +        if (res != header_len) {
> > +            trace_source_return_path_thread_failed_read_cmd_data();
> > +            source_return_path_bad(ms);
> > +            goto out;
> > +        }
> > +
> > +        /* OK, we have the command and the data */
> > +        switch (header_com) {
> > +        case MIG_RP_CMD_SHUT:
> > +            tmp32 = be32_to_cpup((uint32_t *)buf);
> > +            trace_source_return_path_thread_shut(tmp32);
> > +            if (tmp32) {
> > +                error_report("RP: Sibling indicated error %d", tmp32);
> > +                source_return_path_bad(ms);
> > +            }
> > +            /*
> > +             * We'll let the main thread deal with closing the RP
> > +             * we could do a shutdown(2) on it, but we're the only user
> > +             * anyway, so there's nothing gained.
> > +             */
> > +            goto out;
> > +
> > +        case MIG_RP_CMD_PONG:
> > +            tmp32 = be32_to_cpup((uint32_t *)buf);
> > +            trace_source_return_path_thread_pong(tmp32);
> > +            break;
> > +
> > +        default:
> > +            /* This shouldn't happen because we should catch this above */
> > +            trace_source_return_path_bad_header_com();
> > +        }
> > +        /* Latest command processed, now leave a gap for the next one */
> > +        header_com = MIG_RP_CMD_INVALID;
> 
> This assignment will always get overwritten.

Thanks; gone - it's a left over from an old version.

> > +    }
> > +    if (rp && qemu_file_get_error(rp)) {
> > +        trace_source_return_path_thread_bad_end();
> > +        source_return_path_bad(ms);
> > +    }
> > +
> > +    trace_source_return_path_thread_end();
> > +out:
> > +    return NULL;
> > +}
> > +
> > +__attribute__ (( unused )) /* Until later in patch series */
> > +static int open_outgoing_return_path(MigrationState *ms)
> 
> Uh.. surely this should be open_incoming_return_path(); it's designed
> to be used on the source side, AFAICT.
> 
> > +{
> > +
> > +    ms->rp_state.file = qemu_file_get_return_path(ms->file);
> > +    if (!ms->rp_state.file) {
> > +        return -1;
> > +    }
> > +
> > +    trace_open_outgoing_return_path();
> > +    qemu_thread_create(&ms->rp_state.rp_thread, "return path",
> > +                       source_return_path_thread, ms, 
> > QEMU_THREAD_JOINABLE);
> > +
> > +    trace_open_outgoing_return_path_continue();
> > +
> > +    return 0;
> > +}
> > +
> > +__attribute__ (( unused )) /* Until later in patch series */
> > +static void await_outgoing_return_path_close(MigrationState *ms)
> 
> Likewise "incoming" here, surely.

I've changed those two  to open_source_return_path()  which seems less 
ambiguous;
that OK?

Dave

> 
> > +{
> > +    /*
> > +     * If this is a normal exit then the destination will send a SHUT and 
> > the
> > +     * rp_thread will exit, however if there's an error we need to cause
> > +     * it to exit, which we can do by a shutdown.
> > +     * (canceling must also shutdown to stop us getting stuck here if
> > +     * the destination died at just the wrong place)
> > +     */
> > +    if (qemu_file_get_error(ms->file) && ms->rp_state.file) {
> > +        qemu_file_shutdown(ms->rp_state.file);
> > +    }
> > +    trace_await_outgoing_return_path_joining();
> > +    qemu_thread_join(&ms->rp_state.rp_thread);
> > +    trace_await_outgoing_return_path_close();
> > +}
> > +
> > +/*
> > + * Master migration thread on the source VM.
> > + * It drives the migration and pumps the data down the outgoing channel.
> > + */
> >  static void *migration_thread(void *opaque)
> >  {
> >      MigrationState *s = opaque;
> > diff --git a/trace-events b/trace-events
> > index 4f3eff8..1951b25 100644
> > --- a/trace-events
> > +++ b/trace-events
> > @@ -1374,12 +1374,25 @@ flic_no_device_api(int err) "flic: no Device 
> > Contral API support %d"
> >  flic_reset_failed(int err) "flic: reset failed %d"
> >  
> >  # migration.c
> > +await_outgoing_return_path_close(void) ""
> > +await_outgoing_return_path_joining(void) ""
> >  migrate_set_state(int new_state) "new state %d"
> >  migrate_fd_cleanup(void) ""
> > +migrate_fd_cleanup_src_rp(void) ""
> >  migrate_fd_error(void) ""
> >  migrate_fd_cancel(void) ""
> >  migrate_pending(uint64_t size, uint64_t max) "pending size %" PRIu64 " max 
> > %" PRIu64
> >  migrate_send_rp_message(int cmd, uint16_t len) "cmd=%d, len=%d"
> > +open_outgoing_return_path(void) ""
> > +open_outgoing_return_path_continue(void) ""
> > +source_return_path_thread_bad_end(void) ""
> > +source_return_path_bad_header_com(void) ""
> > +source_return_path_thread_end(void) ""
> > +source_return_path_thread_entry(void) ""
> > +source_return_path_thread_failed_read_cmd_data(void) ""
> > +source_return_path_thread_loop_top(void) ""
> > +source_return_path_thread_pong(uint32_t val) "%x"
> > +source_return_path_thread_shut(uint32_t val) "%x"
> >  migrate_transferred(uint64_t tranferred, uint64_t time_spent, double 
> > bandwidth, uint64_t size) "transferred %" PRIu64 " time_spent %" PRIu64 " 
> > bandwidth %g max_size %" PRId64
> >  
> >  # migration/rdma.c
> 
> -- 
> David Gibson                  | I'll have my music baroque, and my code
> david AT gibson.dropbear.id.au        | minimalist, thank you.  NOT _the_ 
> _other_
>                               | _way_ _around_!
> http://www.ozlabs.org/~dgibson


--
Dr. David Alan Gilbert / address@hidden / Manchester, UK



reply via email to

[Prev in Thread] Current Thread [Next in Thread]