qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH V3 2/3] qemu: Generic asynchronous threading fr


From: Corentin Chary
Subject: Re: [Qemu-devel] [PATCH V3 2/3] qemu: Generic asynchronous threading framework to offload tasks
Date: Sat, 5 Jun 2010 09:03:42 +0200

On Fri, Jun 4, 2010 at 3:16 PM, Anthony Liguori <address@hidden> wrote:
> On 06/03/2010 03:56 AM, Gautham R Shenoy wrote:
>>
>> From: Aneesh Kumar K.V<address@hidden>
>>
>> This patch creates a generic asynchronous-task-offloading infrastructure.
>> It's
>> extracted out of the threading framework that is being used by paio.
>>
>> The reason for extracting out this generic infrastructure of the
>> posix-aio-compat.c is so that other subsystems, such as virtio-9p could
>> make use
>> of it for offloading tasks that could block.
>>
>> address@hidden: work_item_pool, async_work_init, async_work_release,
>> async_cancel_work]
>>
>> Signed-off-by: Aneesh Kumar K.V<address@hidden>
>> Signed-off-by: Gautham R Shenoy<address@hidden>
>> ---
>>  Makefile.objs |    3 +
>>  async-work.c  |  136
>> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>>  async-work.h  |   85 ++++++++++++++++++++++++++++++++++++
>>  3 files changed, 223 insertions(+), 1 deletions(-)
>>  create mode 100644 async-work.c
>>  create mode 100644 async-work.h
>>
>> diff --git a/Makefile.objs b/Makefile.objs
>> index ecdd53e..fd5ea4d 100644
>> --- a/Makefile.objs
>> +++ b/Makefile.objs
>> @@ -9,6 +9,8 @@ qobject-obj-y += qerror.o
>>
>>  block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
>>  block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
>> +block-obj-y += qemu-thread.o
>> +block-obj-y += async-work.o
>>  block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
>>  block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>>
>> @@ -108,7 +110,6 @@ common-obj-y += iov.o
>>  common-obj-$(CONFIG_VNC_TLS) += vnc-tls.o vnc-auth-vencrypt.o
>>  common-obj-$(CONFIG_VNC_SASL) += vnc-auth-sasl.o
>>  common-obj-$(CONFIG_COCOA) += cocoa.o
>> -common-obj-$(CONFIG_IOTHREAD) += qemu-thread.o
>>  common-obj-y += notify.o event_notifier.o
>>  common-obj-y += qemu-timer.o
>>
>> diff --git a/async-work.c b/async-work.c
>> new file mode 100644
>> index 0000000..0675732
>> --- /dev/null
>> +++ b/async-work.c
>> @@ -0,0 +1,136 @@
>> +/*
>> + * Async work support
>> + *
>> + * Copyright IBM, Corp. 2010
>> + *
>> + * Authors:
>> + *  Aneesh Kumar K.V<address@hidden>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2.  See
>> + * the COPYING file in the top-level directory.
>>
>
> Please preserve the original copyright of the copied code.
>
>> + */
>> +#include<stdio.h>
>> +#include<errno.h>
>> +#include<string.h>
>> +#include<stdlib.h>
>> +#include<signal.h>
>>
>
> qemu-common.h should have all of these.  Generally, you should avoid
> including system headers because qemu headers take care of portability.
>
>> +#include "async-work.h"
>> +#include "osdep.h"
>> +
>> +static void async_abort(int err, const char *what)
>> +{
>> +    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
>> +    abort();
>> +}
>> +
>> +static void *async_worker_thread(void *data)
>> +{
>> +    struct async_queue *queue = data;
>> +
>> +    while (1) {
>> +        struct work_item *work;
>> +        int ret = 0;
>> +        qemu_mutex_lock(&(queue->lock));
>> +
>> +        while (QTAILQ_EMPTY(&(queue->request_list))&&
>> +               (ret != ETIMEDOUT)) {
>> +            ret = qemu_cond_timedwait(&(queue->cond),
>> +                                       &(queue->lock), 10*100000);
>> +        }
>> +
>> +        if (QTAILQ_EMPTY(&(queue->request_list)))
>> +            goto check_exit;
>> +
>> +        work = QTAILQ_FIRST(&(queue->request_list));
>> +        QTAILQ_REMOVE(&(queue->request_list), work, node);
>> +        queue->idle_threads--;
>> +        qemu_mutex_unlock(&(queue->lock));
>> +
>> +        /* execute the work function */
>> +        work->func(work);
>> +        async_work_release(queue, work);
>> +
>> +        qemu_mutex_lock(&(queue->lock));
>> +        queue->idle_threads++;
>> +
>> +check_exit:
>> +        if ((queue->idle_threads>  0)&&
>> +            (queue->cur_threads>  queue->min_threads)) {
>> +            /* we retain minimum number of threads */
>> +            break;
>> +        }
>> +        qemu_mutex_unlock(&(queue->lock));
>> +    }
>> +
>> +    queue->idle_threads--;
>> +    queue->cur_threads--;
>> +    qemu_mutex_unlock(&(queue->lock));
>> +
>> +    return NULL;
>> +}
>> +
>> +static void spawn_async_thread(struct async_queue *queue)
>> +{
>> +    QemuThreadAttr attr;
>> +    QemuThread thread;
>> +    sigset_t set, oldset;
>> +
>> +    queue->cur_threads++;
>> +    queue->idle_threads++;
>> +
>> +    qemu_thread_attr_init(&attr);
>> +
>> +    /* create a detached thread so that we don't need to wait on it */
>> +    qemu_thread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
>> +
>> +    /* block all signals */
>> +    if (sigfillset(&set)) {
>> +        async_abort(errno, "sigfillset");
>> +    }
>> +
>> +    if (sigprocmask(SIG_SETMASK,&set,&oldset)) {
>> +        async_abort(errno, "sigprocmask");
>> +    }
>> +
>> +    qemu_thread_create_attr(&thread,&attr, async_worker_thread, queue);
>> +
>> +    if (sigprocmask(SIG_SETMASK,&oldset, NULL)) {
>> +        async_abort(errno, "sigprocmask restore");
>> +    }
>> +}
>> +
>> +void qemu_async_submit(struct async_queue *queue, struct work_item *work)
>> +{
>> +    qemu_mutex_lock(&(queue->lock));
>> +    if (queue->idle_threads == 0&&  queue->cur_threads<
>>  queue->max_threads) {
>> +        spawn_async_thread(queue);
>> +    }
>> +    QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
>> +    qemu_mutex_unlock(&(queue->lock));
>> +    qemu_cond_signal(&(queue->cond));
>> +}
>> +
>> +int qemu_async_cancel_work(struct async_queue *queue, struct work_item
>> *work)
>> +{
>> +    struct work_item *ret_work;
>> +    int found = 0;
>> +
>> +    qemu_mutex_lock(&(queue->lock));
>> +    QTAILQ_FOREACH(ret_work,&(queue->request_list), node) {
>> +        if (ret_work == work) {
>> +            QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
>> +            found = 1;
>> +            break;
>> +        }
>> +    }
>> +    qemu_mutex_unlock(&(queue->lock));
>> +
>> +    if (found) {
>> +        async_work_release(queue, work);
>> +        return 0;
>> +    }
>> +
>> +    return 1;
>> +}
>> +
>> diff --git a/async-work.h b/async-work.h
>> new file mode 100644
>> index 0000000..8389f56
>> --- /dev/null
>> +++ b/async-work.h
>> @@ -0,0 +1,85 @@
>> +/*
>> + * Async work support
>> + *
>> + * Copyright IBM, Corp. 2010
>> + *
>> + * Authors:
>> + *  Aneesh Kumar K.V<address@hidden>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2.  See
>> + * the COPYING file in the top-level directory.
>> + *
>> + */
>> +#ifndef QEMU_ASYNC_WORK_H
>> +#define QEMU_ASYNC_WORK_H
>> +
>> +#include "qemu-queue.h"
>> +#include "qemu-common.h"
>> +#include "qemu-thread.h"
>> +
>> +struct async_queue
>> +{
>> +    QemuMutex lock;
>> +    QemuCond cond;
>> +    int max_threads;
>> +    int min_threads;
>> +    int cur_threads;
>> +    int idle_threads;
>> +    QTAILQ_HEAD(, work_item) request_list;
>> +    QTAILQ_HEAD(, work_item) work_item_pool;
>> +};
>> +
>> +struct work_item
>> +{
>> +    QTAILQ_ENTRY(work_item) node;
>> +    void (*func)(struct work_item *work);
>> +    void *private;
>> +};
>>
>
> Structs are not named in accordance to CODING_STYLE.
>
>> +static inline void async_queue_init(struct async_queue *queue,
>> +                                   int max_threads, int min_threads)
>> +{
>> +    queue->cur_threads  = 0;
>> +    queue->idle_threads = 0;
>> +    queue->max_threads  = max_threads;
>> +    queue->min_threads  = min_threads;
>> +    QTAILQ_INIT(&(queue->request_list));
>> +    QTAILQ_INIT(&(queue->work_item_pool));
>> +    qemu_mutex_init(&(queue->lock));
>> +    qemu_cond_init(&(queue->cond));
>> +}
>>
>
> I'd prefer there be a single queue that everything used verses multiple
> queues.  Otherwise, we'll end up having per device queues and my concern is
> that we'll end up with thousands and thousands of threads with no central
> place to tune the maximum thread number.

If there a single queue, we'll need something to control how job are
processed. For example,
in the VNC server, they must be processed in order (in fact, in order
per VNC client, but I don't see how we could do that).

>> +static inline struct work_item *async_work_init(struct async_queue
>> *queue,
>> +                                  void (*func)(struct work_item *),
>> +                                  void *data)
>>
>
> I'd suggest actually using a Notifier as the worker or at least something
> that looks exactly like it.  There's no need to pass a void * because more
> often than not, a caller just wants to pass a state structure anyway and
> they can embed the Notifier within the structure.  IOW:
>
> async_work_submit(queue, &s->worker);
>
> Then in the callback:
>
> DeviceState *s = container_of(worker, DeviceState, worker);
>
> I don't think the name makes the most sense either.  I think something like:
>
> threadlet_submit()
>
> Would work best.  It would be good for there to be a big comment warning
> that the routine does not run with the qemu_mutex and therefore cannot make
> use of any qemu functions without very special consideration.
>
>
> There shouldn't need to be an explicit init vs. submit function either.
>
> Regards,
>
> Anthony Liguori
>



-- 
Corentin Chary
http://xf.iksaif.net



reply via email to

[Prev in Thread] Current Thread [Next in Thread]