qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v8 19/23] qmp: isolate responses into io thread


From: Marc-André Lureau
Subject: Re: [Qemu-devel] [PATCH v8 19/23] qmp: isolate responses into io thread
Date: Thu, 22 Mar 2018 13:00:19 +0100

Hi

On Fri, Mar 9, 2018 at 10:00 AM, Peter Xu <address@hidden> wrote:
> For those monitors who have enabled IO thread, we'll offload the
> responding procedure into IO thread.  The main reason is that chardev is
> not thread safe, and we need to do all the read/write IOs in the same
> thread.  For use_io_thr=true monitors, that thread is the IO thread.

Actually, the chr write path is suppose to be somewhat thread safe
(chr_write_lock).

Secondly, the function responsible to write monitor data has some
thread-safety, it's called monitor_flush_locked(), because you need
the mon->out_lock.

I think that patch is making things more complicated than they need to
be. You should be able to call monitor_json_emitter/monitor_puts()
directly from any thread, it will queue the data, start writing, and
add a watch if necessary in the appropriate context.

Am I missing something?

> We do this isolation in similar pattern as what we have done to the
> request queue: we first create one response queue for each monitor, then
> instead of replying directly in the main thread, we queue the responses
> and kick the IO thread to do the rest of the job for us.
>
> A funny thing after doing this is that, when the QMP clients send "quit"
> to QEMU, it's possible that we close the IOThread even earlier than
> replying to that "quit".  So another thing we need to do before cleaning
> up the monitors is that we need to flush the response queue (we don't
> need to do that for command queue; after all we are quitting) to make
> sure replies for handled commands are always flushed back to clients.
>
> Reviewed-by: Fam Zheng <address@hidden>
> Reviewed-by: Stefan Hajnoczi <address@hidden>
> Signed-off-by: Peter Xu <address@hidden>
> ---
>  monitor.c | 96 
> ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 95 insertions(+), 1 deletion(-)
>
> diff --git a/monitor.c b/monitor.c
> index 5c8afe9f50..c6de5f123e 100644
> --- a/monitor.c
> +++ b/monitor.c
> @@ -179,6 +179,8 @@ typedef struct {
>      QemuMutex qmp_queue_lock;
>      /* Input queue that holds all the parsed QMP requests */
>      GQueue *qmp_requests;
> +    /* Output queue contains all the QMP responses in order */
> +    GQueue *qmp_responses;
>  } MonitorQMP;
>
>  /*
> @@ -205,6 +207,7 @@ struct Monitor {
>      bool skip_flush;
>      bool use_io_thr;
>
> +    /* We can't access guest memory when holding the lock */
>      QemuMutex out_lock;
>      QString *outbuf;
>      guint out_watch;
> @@ -227,6 +230,8 @@ static struct {
>      IOThread *mon_iothread;
>      /* Bottom half to dispatch the requests received from IO thread */
>      QEMUBH *qmp_dispatcher_bh;
> +    /* Bottom half to deliver the responses back to clients */
> +    QEMUBH *qmp_respond_bh;
>  } mon_global;
>
>  /* QMP checker flags */
> @@ -416,7 +421,8 @@ int monitor_fprintf(FILE *stream, const char *fmt, ...)
>      return 0;
>  }
>
> -static void monitor_json_emitter(Monitor *mon, const QObject *data)
> +static void monitor_json_emitter_raw(Monitor *mon,
> +                                     QObject *data)
>  {
>      QString *json;
>
> @@ -430,6 +436,71 @@ static void monitor_json_emitter(Monitor *mon, const 
> QObject *data)
>      QDECREF(json);
>  }
>
> +static void monitor_json_emitter(Monitor *mon, QObject *data)
> +{
> +    if (mon->use_io_thr) {
> +        /*
> +         * If using IO thread, we need to queue the item so that IO
> +         * thread will do the rest for us.  Take refcount so that
> +         * caller won't free the data (which will be finally freed in
> +         * responder thread).
> +         */
> +        qobject_incref(data);
> +        qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
> +        g_queue_push_tail(mon->qmp.qmp_responses, (void *)data);
> +        qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
> +        qemu_bh_schedule(mon_global.qmp_respond_bh);
> +    } else {
> +        /*
> +         * If not using monitor IO thread, then we are in main thread.
> +         * Do the emission right away.
> +         */
> +        monitor_json_emitter_raw(mon, data);
> +    }
> +}
> +
> +struct QMPResponse {
> +    Monitor *mon;
> +    QObject *data;
> +};
> +typedef struct QMPResponse QMPResponse;
> +
> +/*
> + * Return one QMPResponse.  The response is only valid if
> + * response.data is not NULL.
> + */
> +static QMPResponse monitor_qmp_response_pop_one(void)
> +{
> +    Monitor *mon;
> +    QObject *data = NULL;
> +
> +    qemu_mutex_lock(&monitor_lock);
> +    QTAILQ_FOREACH(mon, &mon_list, entry) {
> +        qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
> +        data = g_queue_pop_head(mon->qmp.qmp_responses);
> +        qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
> +        if (data) {
> +            break;
> +        }
> +    }
> +    qemu_mutex_unlock(&monitor_lock);
> +    return (QMPResponse) { .mon = mon, .data = data };
> +}
> +
> +static void monitor_qmp_bh_responder(void *opaque)
> +{
> +    QMPResponse response;
> +
> +    while (true) {
> +        response = monitor_qmp_response_pop_one();
> +        if (!response.data) {
> +            break;
> +        }
> +        monitor_json_emitter_raw(response.mon, response.data);
> +        qobject_decref(response.data);
> +    }
> +}
> +
>  static MonitorQAPIEventConf monitor_qapi_event_conf[QAPI_EVENT__MAX] = {
>      /* Limit guest-triggerable events to 1 per second */
>      [QAPI_EVENT_RTC_CHANGE]        = { 1000 * SCALE_MS },
> @@ -616,6 +687,7 @@ static void monitor_data_init(Monitor *mon, bool 
> skip_flush,
>      mon->skip_flush = skip_flush;
>      mon->use_io_thr = use_io_thr;
>      mon->qmp.qmp_requests = g_queue_new();
> +    mon->qmp.qmp_responses = g_queue_new();
>  }
>
>  static void monitor_data_destroy(Monitor *mon)
> @@ -630,6 +702,7 @@ static void monitor_data_destroy(Monitor *mon)
>      qemu_mutex_destroy(&mon->out_lock);
>      qemu_mutex_destroy(&mon->qmp.qmp_queue_lock);
>      g_queue_free(mon->qmp.qmp_requests);
> +    g_queue_free(mon->qmp.qmp_responses);
>  }
>
>  char *qmp_human_monitor_command(const char *command_line, bool has_cpu_index,
> @@ -4367,6 +4440,15 @@ static void monitor_iothread_init(void)
>      mon_global.qmp_dispatcher_bh = aio_bh_new(qemu_get_aio_context(),
>                                                monitor_qmp_bh_dispatcher,
>                                                NULL);
> +
> +    /*
> +     * Unlike the dispatcher BH, this must be run on the monitor IO
> +     * thread, so that monitors that are using IO thread will make
> +     * sure read/write operations are all done on the IO thread.
> +     */
> +    mon_global.qmp_respond_bh = aio_bh_new(monitor_get_aio_context(),
> +                                           monitor_qmp_bh_responder,
> +                                           NULL);
>  }
>
>  void monitor_init_globals(void)
> @@ -4505,9 +4587,19 @@ void monitor_cleanup(void)
>       */
>      iothread_stop(mon_global.mon_iothread);
>
> +    /*
> +     * After we have IOThread to send responses, it's possible that
> +     * when we stop the IOThread there are still replies queued in the
> +     * responder queue.  Flush all of them.  Note that even after this
> +     * flush it's still possible that out buffer is not flushed.
> +     * It'll be done in below monitor_flush() as the last resort.
> +     */
> +    monitor_qmp_bh_responder(NULL);
> +
>      qemu_mutex_lock(&monitor_lock);
>      QTAILQ_FOREACH_SAFE(mon, &mon_list, entry, next) {
>          QTAILQ_REMOVE(&mon_list, mon, entry);
> +        monitor_flush(mon);
>          monitor_data_destroy(mon);
>          g_free(mon);
>      }
> @@ -4516,6 +4608,8 @@ void monitor_cleanup(void)
>      /* QEMUBHs needs to be deleted before destroying the IOThread. */
>      qemu_bh_delete(mon_global.qmp_dispatcher_bh);
>      mon_global.qmp_dispatcher_bh = NULL;
> +    qemu_bh_delete(mon_global.qmp_respond_bh);
> +    mon_global.qmp_respond_bh = NULL;
>
>      iothread_destroy(mon_global.mon_iothread);
>      mon_global.mon_iothread = NULL;
> --
> 2.14.3
>
>



-- 
Marc-André Lureau



reply via email to

[Prev in Thread] Current Thread [Next in Thread]