[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH 1/4] linux-aio: fix submit aio as a batch
From: |
Ming Lei |
Subject: |
Re: [Qemu-devel] [PATCH 1/4] linux-aio: fix submit aio as a batch |
Date: |
Fri, 5 Sep 2014 00:16:18 +0800 |
On Thu, Sep 4, 2014 at 10:59 PM, Benoît Canet <address@hidden> wrote:
> The Thursday 14 Aug 2014 à 17:41:41 (+0800), Ming Lei wrote :
>> In the enqueue path, we can't complete request, otherwise
>> "Co-routine re-entered recursively" may be caused, so this
>> patch fixes the issue with below ideas:
>
> s/with below ideas/with the following ideas/g
>
>>
>> - for -EAGAIN or partial completion, retry the submision by
>
> s/submision/submission/
>
>> schedule an BH in following completion cb
>
> s/schedule an/sheduling a/
>
>> - for part of completion, also update the io queue
>> - for other failure, return the failure if in enqueue path,
>> otherwise, abort all queued I/O
>>
>> Signed-off-by: Ming Lei <address@hidden>
>> ---
>> block/linux-aio.c | 99
>> +++++++++++++++++++++++++++++++++++++++++------------
>> 1 file changed, 77 insertions(+), 22 deletions(-)
>>
>> diff --git a/block/linux-aio.c b/block/linux-aio.c
>> index 7ac7e8c..4cdf507 100644
>> --- a/block/linux-aio.c
>> +++ b/block/linux-aio.c
>> @@ -38,11 +38,19 @@ struct qemu_laiocb {
>> QLIST_ENTRY(qemu_laiocb) node;
>> };
>>
>> +/*
>> + * TODO: support to batch I/O from multiple bs in one same
>> + * AIO context, one important use case is multi-lun scsi,
>> + * so in future the IO queue should be per AIO context.
>> + */
>> typedef struct {
>
> In QEMU we typically write the name twice in these kind of declarations:
>
> typedef struct LaioQueue {
> ... stuff ...
> } LaioQueue;
>
>> struct iocb *iocbs[MAX_QUEUED_IO];
>> int plugged;
>
> Are plugged values either 0 and 1 ?
> If so it should be "bool plugged;"
It is actually a reference counter.
>
>> unsigned int size;
>> unsigned int idx;
>
> See:
>
> address@hidden:~/code/qemu$ git grep "unsigned int"|wc
> 2283 14038 154201
> address@hidden:~/code/qemu$ git grep "uint32"|wc
> 12535 63129 810822
>
> Maybe you could use the most popular type.
>
>> +
>> + /* handle -EAGAIN and partial completion */
>> + QEMUBH *retry;
>> } LaioQueue;
>>
>> struct qemu_laio_state {
>> @@ -86,6 +94,12 @@ static void qemu_laio_process_completion(struct
>> qemu_laio_state *s,
>> qemu_aio_release(laiocb);
>> }
>>
>> +static void qemu_laio_start_retry(struct qemu_laio_state *s)
>> +{
>
>
>> + if (s->io_q.idx)
>> + qemu_bh_schedule(s->io_q.retry);
>
> In QEMU this test is writen like this:
>
> if (s->io_q.idx) {
> qemu_bh_schedule(s->io_q.retry);
> }
>
> I suggest you ran ./scripts/checkpatch.pl on your series before submitting it.
I don't know why the blow pre-commit hook didn't complain that:
address@hidden .git/hooks/pre-commit
#!/bin/bash
exec git diff --cached | scripts/checkpatch.pl --no-signoff -q -
>
>
>> +}
>> +
>> static void qemu_laio_completion_cb(EventNotifier *e)
>> {
>> struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
>> @@ -108,6 +122,7 @@ static void qemu_laio_completion_cb(EventNotifier *e)
>> qemu_laio_process_completion(s, laiocb);
>> }
>> }
>> + qemu_laio_start_retry(s);
>> }
>>
>> static void laio_cancel(BlockDriverAIOCB *blockacb)
>> @@ -127,6 +142,7 @@ static void laio_cancel(BlockDriverAIOCB *blockacb)
>> ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
>> if (ret == 0) {
>> laiocb->ret = -ECANCELED;
>> + qemu_laio_start_retry(laiocb->ctx);
>> return;
>> }
>>
>> @@ -154,45 +170,80 @@ static void ioq_init(LaioQueue *io_q)
>> io_q->plugged = 0;
>> }
>>
>> -static int ioq_submit(struct qemu_laio_state *s)
>> +static void abort_queue(struct qemu_laio_state *s)
>> +{
>> + int i;
>> + for (i = 0; i < s->io_q.idx; i++) {
>> + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
>> + struct qemu_laiocb,
>> + iocb);
>> + laiocb->ret = -EIO;
>> + qemu_laio_process_completion(s, laiocb);
>> + }
>> +}
>> +
>> +static int ioq_submit(struct qemu_laio_state *s, bool enqueue)
>> {
>> int ret, i = 0;
>> int len = s->io_q.idx;
>> + int j = 0;
>>
>> - do {
>> - ret = io_submit(s->ctx, len, s->io_q.iocbs);
>> - } while (i++ < 3 && ret == -EAGAIN);
>> + if (!len) {
>> + return 0;
>> + }
>> +
>> + ret = io_submit(s->ctx, len, s->io_q.iocbs);
>> + if (ret == -EAGAIN) { /* retry in following completion cb */
>> + return 0;
>> + } else if (ret < 0) {
>> + if (enqueue) {
>> + return ret;
>> + }
>>
>> - /* empty io queue */
>> - s->io_q.idx = 0;
>> + /* in non-queue path, all IOs have to be completed */
>> + abort_queue(s);
>> + ret = len;
>> + } else if (ret == 0) {
>> + goto out;
>> + }
>>
>> - if (ret < 0) {
>> - i = 0;
>> - } else {
>> - i = ret;
>> + for (i = ret; i < len; i++) {
>> + s->io_q.iocbs[j++] = s->io_q.iocbs[i];
>> }
>>
>> - for (; i < len; i++) {
>> - struct qemu_laiocb *laiocb =
>> - container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
>> + out:
>> + /*
>> + * update io queue, for partial completion, retry will be
>> + * started automatically in following completion cb.
>> + */
>> + s->io_q.idx -= ret;
>>
>> - laiocb->ret = (ret < 0) ? ret : -EIO;
>> - qemu_laio_process_completion(s, laiocb);
>> - }
>> return ret;
>> }
>>
>> -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
>> +static void ioq_submit_retry(void *opaque)
>> +{
>> + struct qemu_laio_state *s = opaque;
>> + ioq_submit(s, false);
>> +}
>> +
>> +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
>> {
>> unsigned int idx = s->io_q.idx;
>>
>> + if (unlikely(idx == s->io_q.size)) {
>> + return -1;
>> + }
>> +
>> s->io_q.iocbs[idx++] = iocb;
>> s->io_q.idx = idx;
>>
>> - /* submit immediately if queue is full */
>> - if (idx == s->io_q.size) {
>> - ioq_submit(s);
>> + /* submit immediately if queue depth is above 2/3 */
>> + if (idx > s->io_q.size * 2 / 3) {
>> + return ioq_submit(s, true);
>> }
>> +
>> + return 0;
>> }
>>
>> void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
>> @@ -214,7 +265,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx,
>> bool unplug)
>> }
>>
>> if (s->io_q.idx > 0) {
>> - ret = ioq_submit(s);
>> + ret = ioq_submit(s, false);
>> }
>>
>> return ret;
>> @@ -258,7 +309,9 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void
>> *aio_ctx, int fd,
>> goto out_free_aiocb;
>> }
>> } else {
>> - ioq_enqueue(s, iocbs);
>> + if (ioq_enqueue(s, iocbs) < 0) {
>> + goto out_free_aiocb;
>> + }
>> }
>> return &laiocb->common;
>>
>> @@ -272,6 +325,7 @@ void laio_detach_aio_context(void *s_, AioContext
>> *old_context)
>> struct qemu_laio_state *s = s_;
>>
>> aio_set_event_notifier(old_context, &s->e, NULL);
>> + qemu_bh_delete(s->io_q.retry);
>> }
>>
>> void laio_attach_aio_context(void *s_, AioContext *new_context)
>> @@ -279,6 +333,7 @@ void laio_attach_aio_context(void *s_, AioContext
>> *new_context)
>> struct qemu_laio_state *s = s_;
>>
>> aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
>> + s->io_q.retry = aio_bh_new(new_context, ioq_submit_retry, s);
>> }
>>
>> void *laio_init(void)
>> --
>> 1.7.9.5
>>
>>