qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v1 2/3] linux-aio: implement io plug and unplug


From: Paolo Bonzini
Subject: Re: [Qemu-devel] [PATCH v1 2/3] linux-aio: implement io plug and unplug
Date: Mon, 30 Jun 2014 19:31:00 +0200
User-agent: Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20100101 Thunderbird/24.6.0

Il 30/06/2014 17:47, Ming Lei ha scritto:
From: Ming Lei <address@hidden>

This patch implements .bdrv_io_plug and .bdrv_io_unplug
callbacks for linux-aio Block Drivers, so that submitting
I/O at batch can be supported on linux-aio.

Signed-off-by: Ming Lei <address@hidden>
---
 block/linux-aio.c |   85 +++++++++++++++++++++++++++++++++++++++++++++++++++--
 block/raw-aio.h   |    2 ++
 block/raw-posix.c |   31 +++++++++++++++++++
 3 files changed, 116 insertions(+), 2 deletions(-)

diff --git a/block/linux-aio.c b/block/linux-aio.c
index f0a2c08..7794162 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -25,6 +25,8 @@
  */
 #define MAX_EVENTS 128

+#define MAX_QUEUED_IO  128
+
 struct qemu_laiocb {
     BlockDriverAIOCB common;
     struct qemu_laio_state *ctx;
@@ -36,11 +38,84 @@ struct qemu_laiocb {
     QLIST_ENTRY(qemu_laiocb) node;
 };

+struct laio_queue {
+    struct iocb *iocbs[MAX_QUEUED_IO];
+    bool plugged;
+    unsigned int size;
+    unsigned int idx;
+};
+
 struct qemu_laio_state {
     io_context_t ctx;
     EventNotifier e;
+
+    /* io queue for submit at batch */
+    struct laio_queue io_q;
 };

+static void ioq_init(struct laio_queue *io_q)
+{
+    io_q->size = MAX_QUEUED_IO;
+    io_q->idx = 0;
+    io_q->plugged = false;
+}
+
+static int ioq_submit(struct qemu_laio_state *s)
+{
+    int ret, i, len = s->io_q.idx;
+
+    while (1) {
+        ret = io_submit(s->ctx, len, s->io_q.iocbs);
+        if (ret != -EAGAIN)
+            break;

Busy waiting is not acceptable here (it can be unbounded if, for example, an NFS server is on the other side of a network partition). You have to add a bottom half to qemu_laio_state that calls ioq_submit, and schedule it after calling io_getevents.

If the bottom half is already scheduled and the queue is full, I guess there's no other choice than returning -EAGAIN from ioq_enqueue and ultimately to the guest.

Paolo

+    }
+
+    /* empty io queue */
+    s->io_q.idx = 0;
+
+    if (ret >= 0)
+      return 0;
+
+    for (i = 0; i < len; i++) {
+        struct qemu_laiocb *laiocb =
+            container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
+        qemu_aio_release(laiocb);
+    }
+    return ret;
+}
+
+static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
+{
+    unsigned int idx = s->io_q.idx;
+
+    s->io_q.iocbs[idx++] = iocb;
+    s->io_q.idx = idx;
+
+    /* submit immediately if queue is full */
+    if (idx == s->io_q.size)
+        ioq_submit(s);
+}
+
+void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
+{
+    struct qemu_laio_state *s = aio_ctx;
+
+    s->io_q.plugged = true;
+}
+
+int laio_io_unplug(BlockDriverState *bs, void *aio_ctx)
+{
+    struct qemu_laio_state *s = aio_ctx;
+    int ret = 0;
+
+    if (s->io_q.idx > 0) {
+        ret = ioq_submit(s);
+    }
+    s->io_q.plugged = false;
+
+    return ret;
+}
+
 static inline ssize_t io_event_ret(struct io_event *ev)
 {
     return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
@@ -168,8 +243,12 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void 
*aio_ctx, int fd,
     }
     io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));

-    if (io_submit(s->ctx, 1, &iocbs) < 0)
-        goto out_free_aiocb;
+    if (!s->io_q.plugged) {
+        if (io_submit(s->ctx, 1, &iocbs) < 0)
+            goto out_free_aiocb;
+    } else {
+        ioq_enqueue(s, iocbs);
+    }
     return &laiocb->common;

 out_free_aiocb:
@@ -204,6 +283,8 @@ void *laio_init(void)
         goto out_close_efd;
     }

+    ioq_init(&s->io_q);
+
     return s;

 out_close_efd:
diff --git a/block/raw-aio.h b/block/raw-aio.h
index 8cf084e..ed47c3d 100644
--- a/block/raw-aio.h
+++ b/block/raw-aio.h
@@ -40,6 +40,8 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void 
*aio_ctx, int fd,
         BlockDriverCompletionFunc *cb, void *opaque, int type);
 void laio_detach_aio_context(void *s, AioContext *old_context);
 void laio_attach_aio_context(void *s, AioContext *new_context);
+void laio_io_plug(BlockDriverState *bs, void *aio_ctx);
+int laio_io_unplug(BlockDriverState *bs, void *aio_ctx);
 #endif

 #ifdef _WIN32
diff --git a/block/raw-posix.c b/block/raw-posix.c
index dacf4fb..ef64a6d 100644
--- a/block/raw-posix.c
+++ b/block/raw-posix.c
@@ -1054,6 +1054,27 @@ static BlockDriverAIOCB *raw_aio_submit(BlockDriverState 
*bs,
                        cb, opaque, type);
 }

+static void raw_aio_plug(BlockDriverState *bs)
+{
+#ifdef CONFIG_LINUX_AIO
+    BDRVRawState *s = bs->opaque;
+    if (s->use_aio) {
+        laio_io_plug(bs, s->aio_ctx);
+    }
+#endif
+}
+
+static int raw_aio_unplug(BlockDriverState *bs)
+{
+#ifdef CONFIG_LINUX_AIO
+    BDRVRawState *s = bs->opaque;
+    if (s->use_aio) {
+        return laio_io_unplug(bs, s->aio_ctx);
+    }
+#endif
+    return 0;
+}
+
 static BlockDriverAIOCB *raw_aio_readv(BlockDriverState *bs,
         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
         BlockDriverCompletionFunc *cb, void *opaque)
@@ -1503,6 +1524,8 @@ static BlockDriver bdrv_file = {
     .bdrv_aio_flush = raw_aio_flush,
     .bdrv_aio_discard = raw_aio_discard,
     .bdrv_refresh_limits = raw_refresh_limits,
+    .bdrv_io_plug = raw_aio_plug,
+    .bdrv_io_unplug = raw_aio_unplug,

     .bdrv_truncate = raw_truncate,
     .bdrv_getlength = raw_getlength,
@@ -1902,6 +1925,8 @@ static BlockDriver bdrv_host_device = {
     .bdrv_aio_flush    = raw_aio_flush,
     .bdrv_aio_discard   = hdev_aio_discard,
     .bdrv_refresh_limits = raw_refresh_limits,
+    .bdrv_io_plug = raw_aio_plug,
+    .bdrv_io_unplug = raw_aio_unplug,

     .bdrv_truncate      = raw_truncate,
     .bdrv_getlength    = raw_getlength,
@@ -2047,6 +2072,8 @@ static BlockDriver bdrv_host_floppy = {
     .bdrv_aio_writev    = raw_aio_writev,
     .bdrv_aio_flush    = raw_aio_flush,
     .bdrv_refresh_limits = raw_refresh_limits,
+    .bdrv_io_plug = raw_aio_plug,
+    .bdrv_io_unplug = raw_aio_unplug,

     .bdrv_truncate      = raw_truncate,
     .bdrv_getlength      = raw_getlength,
@@ -2175,6 +2202,8 @@ static BlockDriver bdrv_host_cdrom = {
     .bdrv_aio_writev    = raw_aio_writev,
     .bdrv_aio_flush    = raw_aio_flush,
     .bdrv_refresh_limits = raw_refresh_limits,
+    .bdrv_io_plug = raw_aio_plug,
+    .bdrv_io_unplug = raw_aio_unplug,

     .bdrv_truncate      = raw_truncate,
     .bdrv_getlength      = raw_getlength,
@@ -2309,6 +2338,8 @@ static BlockDriver bdrv_host_cdrom = {
     .bdrv_aio_writev    = raw_aio_writev,
     .bdrv_aio_flush    = raw_aio_flush,
     .bdrv_refresh_limits = raw_refresh_limits,
+    .bdrv_io_plug = raw_aio_plug,
+    .bdrv_io_unplug = raw_aio_unplug,

     .bdrv_truncate      = raw_truncate,
     .bdrv_getlength      = raw_getlength,





reply via email to

[Prev in Thread] Current Thread [Next in Thread]