qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastr


From: Stefan Hajnoczi
Subject: Re: [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure
Date: Tue, 9 Nov 2010 13:14:31 +0000

On Tue, Nov 9, 2010 at 12:06 PM, Arun R Bharadwaj
<address@hidden> wrote:
> * Stefan Hajnoczi <address@hidden> [2010-11-08 21:29:12]:
>
>> On Mon, Nov 8, 2010 at 2:33 PM, Arun R Bharadwaj
>> <address@hidden> wrote:
>> > diff --git a/Makefile.objs b/Makefile.objs
>> > index cd5a24b..3b7ec27 100644
>> > --- a/Makefile.objs
>> > +++ b/Makefile.objs
>> > @@ -9,6 +9,7 @@ qobject-obj-y += qerror.o
>> >
>> >  block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
>> >  block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
>> > +block-obj-$(CONFIG_POSIX) += qemu-thread.o
>> >  block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
>> >  block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>> >
>> > @@ -124,7 +125,6 @@ endif
>> >  common-obj-y += $(addprefix ui/, $(ui-obj-y))
>> >
>> >  common-obj-y += iov.o acl.o
>> > -common-obj-$(CONFIG_THREAD) += qemu-thread.o
>> >  common-obj-y += notify.o event_notifier.o
>> >  common-obj-y += qemu-timer.o
>>
>
> Hi Stefan,
>
> Thanks for the quick review.
>
>> This change makes CONFIG_THREAD unused.  The ./configure code that
>> sets CONFIG_THREAD=y should be removed.
>>
>
> I'll remove this.
>
>> > diff --git a/posix-aio-compat.c b/posix-aio-compat.c
>> > index 7b862b5..00b2a4e 100644
>> > --- a/posix-aio-compat.c
>> > +++ b/posix-aio-compat.c
>> > @@ -29,7 +29,32 @@
>> >  #include "block_int.h"
>> >
>> >  #include "block/raw-posix-aio.h"
>> > +#include "qemu-thread.h"
>> >
>> > +#define MAX_GLOBAL_THREADS  64
>> > +#define MIN_GLOBAL_THREADS   8
>> > +
>> > +QemuMutex aiocb_mutex;
>>
>> This variable should be static since it isn't used externally.
>>
>> > +
>> > +static void aio_thread(ThreadletWork *work)
>> >  {
>> >     pid_t pid;
>> > +    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, 
>> > work);
>> > +    ssize_t ret = 0;
>> >
>> >     pid = getpid();
>> > +    aiocb->active = 1;
>>
>> aiocb->active needs to be assigned with aiocb_mutex held and then released in
>> order for this memory write to be visible to other threads after this
>> line of code.
>>
>
> Yes. That makes sense. We definitely need to hold the mutex here.
>
>> >
>> >  static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
>> >  {
>> >     ssize_t ret;
>> >
>> > -    mutex_lock(&lock);
>> > +    qemu_mutex_lock(&aiocb_mutex);
>> >     ret = aiocb->ret;
>> > -    mutex_unlock(&lock);
>> > -
>> > +    qemu_mutex_unlock(&aiocb_mutex);
>> >     return ret;
>> >  }
>> >
>> > @@ -536,20 +619,20 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
>> >     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
>> >     int active = 0;
>> >
>> > -    mutex_lock(&lock);
>> >     if (!acb->active) {
>> > -        QTAILQ_REMOVE(&request_list, acb, node);
>> > -        acb->ret = -ECANCELED;
>> > +        if (!dequeue_work(&acb->work)) {
>> > +            acb->ret = -ECANCELED;
>> > +         } else {
>> > +            active = 1;
>> > +         }
>> >     } else if (acb->ret == -EINPROGRESS) {
>> >         active = 1;
>> >     }
>> > -    mutex_unlock(&lock);
>> >
>> >     if (active) {
>> >         /* fail safe: if the aio could not be canceled, we wait for
>> >            it */
>> > -        while (qemu_paio_error(acb) == EINPROGRESS)
>> > -            ;
>> > +        active = qemu_paio_error(acb);
>> >     }
>> >
>> >     paio_remove(acb);
>>
>> acb->ret is not being consistently accessed with aiocb_mutex held.
>>
>> We don't wait for the work item to complete if it is active.  This changes 
>> the
>> semantics of paio_cancel() and will break callers who expect the request to 
>> be
>> cancelled/completed when paio_cancel() returns.  Also, we go ahead and free 
>> the
>> acb for a running request which is dangerous because it may be reused and
>> corrupted.
>>
>> I think both the active variable and field in qemu_paiocb are unnecessary
>> because dequeue_work() already deals with inactive work items.  If
>> dequeue_work() was unsuccessful you need to wait until ret != -EINPROGRESS.
>>
>
> So this would mean that we can use the earlier infinite while loop
> right?
>
> while (qemu_paio_error(acb) == EINPROGRESS)
>    ;
>
> We can just take this outside the if (active) condition check.

I would go for something like this as paio_cancel():

if (dequeue_work(&acb->work) != 0) {
        /* Wait for running work item to complete */
        mutex_lock(&aiocb_mutex);
        while (acb->ret == -EINPROGRESS) {
                cond_wait(&aiocb_completion, &aiocb_mutex);
        }
        mutex_unlock(&aiocb_mutex);
}

paio_remove(acb);

If the work item has not yet been run, then dequeue_work() returns 0
and we can just remove the acb.

If the work item is just running we sleep on the completion condition
variable until there is a return value.

If the work item completes just as we're doing the check, then the
aiocb_mutex will synchronize things and we will not go to sleep
without detecting that the request has completed.

This approach requires introducing a condition variable.  We only need
a global aiocb_completion condition variable since there can only be
one cancellation happening at a time.

Stefan



reply via email to

[Prev in Thread] Current Thread [Next in Thread]