[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [PATCH v1 1/4] linux-aio: fix submit aio as a batch
From: |
Ming Lei |
Subject: |
[Qemu-devel] [PATCH v1 1/4] linux-aio: fix submit aio as a batch |
Date: |
Thu, 4 Sep 2014 10:50:25 +0800 |
In the enqueue path, we can't complete request, otherwise
"Co-routine re-entered recursively" may be caused, so this
patch fixes the issue with below ideas:
- for -EAGAIN or partial completion, retry the submision by
schedule an BH in following completion cb
- for part of completion, also update the io queue
- for other failure, return the failure if in enqueue path,
otherwise, abort all queued I/O
Signed-off-by: Ming Lei <address@hidden>
---
block/linux-aio.c | 99 +++++++++++++++++++++++++++++++++++++++++------------
1 file changed, 77 insertions(+), 22 deletions(-)
diff --git a/block/linux-aio.c b/block/linux-aio.c
index 9aca758..ff66d1e 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -38,11 +38,19 @@ struct qemu_laiocb {
QLIST_ENTRY(qemu_laiocb) node;
};
+/*
+ * TODO: support to batch I/O from multiple bs in one same
+ * AIO context, one important use case is multi-lun scsi,
+ * so in future the IO queue should be per AIO context.
+ */
typedef struct {
struct iocb *iocbs[MAX_QUEUED_IO];
int plugged;
unsigned int size;
unsigned int idx;
+
+ /* handle -EAGAIN and partial completion */
+ QEMUBH *retry;
} LaioQueue;
struct qemu_laio_state {
@@ -138,6 +146,12 @@ static void qemu_laio_completion_bh(void *opaque)
}
}
+static void qemu_laio_start_retry(struct qemu_laio_state *s)
+{
+ if (s->io_q.idx)
+ qemu_bh_schedule(s->io_q.retry);
+}
+
static void qemu_laio_completion_cb(EventNotifier *e)
{
struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
@@ -145,6 +159,7 @@ static void qemu_laio_completion_cb(EventNotifier *e)
if (event_notifier_test_and_clear(&s->e)) {
qemu_bh_schedule(s->completion_bh);
}
+ qemu_laio_start_retry(s);
}
static void laio_cancel(BlockDriverAIOCB *blockacb)
@@ -164,6 +179,7 @@ static void laio_cancel(BlockDriverAIOCB *blockacb)
ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
if (ret == 0) {
laiocb->ret = -ECANCELED;
+ qemu_laio_start_retry(laiocb->ctx);
return;
}
@@ -191,45 +207,80 @@ static void ioq_init(LaioQueue *io_q)
io_q->plugged = 0;
}
-static int ioq_submit(struct qemu_laio_state *s)
+static void abort_queue(struct qemu_laio_state *s)
+{
+ int i;
+ for (i = 0; i < s->io_q.idx; i++) {
+ struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
+ struct qemu_laiocb,
+ iocb);
+ laiocb->ret = -EIO;
+ qemu_laio_process_completion(s, laiocb);
+ }
+}
+
+static int ioq_submit(struct qemu_laio_state *s, bool enqueue)
{
int ret, i = 0;
int len = s->io_q.idx;
+ int j = 0;
- do {
- ret = io_submit(s->ctx, len, s->io_q.iocbs);
- } while (i++ < 3 && ret == -EAGAIN);
+ if (!len) {
+ return 0;
+ }
- /* empty io queue */
- s->io_q.idx = 0;
+ ret = io_submit(s->ctx, len, s->io_q.iocbs);
+ if (ret == -EAGAIN) { /* retry in following completion cb */
+ return 0;
+ } else if (ret < 0) {
+ if (enqueue) {
+ return ret;
+ }
- if (ret < 0) {
- i = 0;
- } else {
- i = ret;
+ /* in non-queue path, all IOs have to be completed */
+ abort_queue(s);
+ ret = len;
+ } else if (ret == 0) {
+ goto out;
}
- for (; i < len; i++) {
- struct qemu_laiocb *laiocb =
- container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
-
- laiocb->ret = (ret < 0) ? ret : -EIO;
- qemu_laio_process_completion(s, laiocb);
+ for (i = ret; i < len; i++) {
+ s->io_q.iocbs[j++] = s->io_q.iocbs[i];
}
+
+ out:
+ /*
+ * update io queue, for partial completion, retry will be
+ * started automatically in following completion cb.
+ */
+ s->io_q.idx -= ret;
+
return ret;
}
-static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
+static void ioq_submit_retry(void *opaque)
+{
+ struct qemu_laio_state *s = opaque;
+ ioq_submit(s, false);
+}
+
+static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
{
unsigned int idx = s->io_q.idx;
+ if (unlikely(idx == s->io_q.size)) {
+ return -1;
+ }
+
s->io_q.iocbs[idx++] = iocb;
s->io_q.idx = idx;
- /* submit immediately if queue is full */
- if (idx == s->io_q.size) {
- ioq_submit(s);
+ /* submit immediately if queue depth is above 2/3 */
+ if (idx > s->io_q.size * 2 / 3) {
+ return ioq_submit(s, true);
}
+
+ return 0;
}
void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
@@ -251,7 +302,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx,
bool unplug)
}
if (s->io_q.idx > 0) {
- ret = ioq_submit(s);
+ ret = ioq_submit(s, false);
}
return ret;
@@ -295,7 +346,9 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void
*aio_ctx, int fd,
goto out_free_aiocb;
}
} else {
- ioq_enqueue(s, iocbs);
+ if (ioq_enqueue(s, iocbs) < 0) {
+ goto out_free_aiocb;
+ }
}
return &laiocb->common;
@@ -310,12 +363,14 @@ void laio_detach_aio_context(void *s_, AioContext
*old_context)
aio_set_event_notifier(old_context, &s->e, NULL);
qemu_bh_delete(s->completion_bh);
+ qemu_bh_delete(s->io_q.retry);
}
void laio_attach_aio_context(void *s_, AioContext *new_context)
{
struct qemu_laio_state *s = s_;
+ s->io_q.retry = aio_bh_new(new_context, ioq_submit_retry, s);
s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
}
--
1.7.9.5