qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v5 04/12] block/io_uring: implements interfaces


From: Maxim Levitsky
Subject: Re: [Qemu-devel] [PATCH v5 04/12] block/io_uring: implements interfaces for io_uring
Date: Mon, 17 Jun 2019 15:26:50 +0300

On Mon, 2019-06-10 at 19:18 +0530, Aarushi Mehta wrote:
> Aborts when sqe fails to be set as sqes cannot be returned to the ring.
> 
> Signed-off-by: Aarushi Mehta <address@hidden>
> ---
>  MAINTAINERS             |   7 +
>  block/Makefile.objs     |   3 +
>  block/io_uring.c        | 314 ++++++++++++++++++++++++++++++++++++++++
>  include/block/aio.h     |  16 +-
>  include/block/raw-aio.h |  12 ++
>  5 files changed, 351 insertions(+), 1 deletion(-)
>  create mode 100644 block/io_uring.c
> 
> diff --git a/MAINTAINERS b/MAINTAINERS
> index 7be1225415..49f896796e 100644
> --- a/MAINTAINERS
> +++ b/MAINTAINERS
> @@ -2516,6 +2516,13 @@ F: block/file-posix.c
>  F: block/file-win32.c
>  F: block/win32-aio.c
>  
> +Linux io_uring
> +M: Aarushi Mehta <address@hidden>
> +R: Stefan Hajnoczi <address@hidden>
> +L: address@hidden
> +S: Maintained
> +F: block/io_uring.c
> +
>  qcow2
>  M: Kevin Wolf <address@hidden>
>  M: Max Reitz <address@hidden>
> diff --git a/block/Makefile.objs b/block/Makefile.objs
> index ae11605c9f..8fde7a23a5 100644
> --- a/block/Makefile.objs
> +++ b/block/Makefile.objs
> @@ -18,6 +18,7 @@ block-obj-y += block-backend.o snapshot.o qapi.o
>  block-obj-$(CONFIG_WIN32) += file-win32.o win32-aio.o
>  block-obj-$(CONFIG_POSIX) += file-posix.o
>  block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
> +block-obj-$(CONFIG_LINUX_IO_URING) += io_uring.o
>  block-obj-y += null.o mirror.o commit.o io.o create.o
>  block-obj-y += throttle-groups.o
>  block-obj-$(CONFIG_LINUX) += nvme.o
> @@ -61,5 +62,7 @@ block-obj-$(if $(CONFIG_LZFSE),m,n) += dmg-lzfse.o
>  dmg-lzfse.o-libs   := $(LZFSE_LIBS)
>  qcow.o-libs        := -lz
>  linux-aio.o-libs   := -laio
> +io_uring.o-cflags  := $(LINUX_IO_URING_CFLAGS)
> +io_uring.o-libs    := $(LINUX_IO_URING_LIBS)
>  parallels.o-cflags := $(LIBXML2_CFLAGS)
>  parallels.o-libs   := $(LIBXML2_LIBS)
> diff --git a/block/io_uring.c b/block/io_uring.c
> new file mode 100644
> index 0000000000..f327c7ef96
> --- /dev/null
> +++ b/block/io_uring.c
> @@ -0,0 +1,314 @@
> +/*
> + * Linux io_uring support.
> + *
> + * Copyright (C) 2009 IBM, Corp.
> + * Copyright (C) 2009 Red Hat, Inc.
> + * Copyright (C) 2019 Aarushi Mehta
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + */
> +#include "qemu/osdep.h"
> +#include <liburing.h>
> +#include "qemu-common.h"
> +#include "block/aio.h"
> +#include "qemu/queue.h"
> +#include "block/block.h"
> +#include "block/raw-aio.h"
> +#include "qemu/coroutine.h"
> +#include "qapi/error.h"
> +
> +#define MAX_EVENTS 128
> +
> +typedef struct LuringAIOCB {
> +    Coroutine *co;
> +    struct io_uring_sqe sqeq;
> +    ssize_t ret;
> +    QEMUIOVector *qiov;
> +    bool is_read;
> +    QSIMPLEQ_ENTRY(LuringAIOCB) next;
> +} LuringAIOCB;
> +
> +typedef struct LuringQueue {
> +    int plugged;
> +    unsigned int in_queue;
> +    unsigned int in_flight;
> +    bool blocked;
> +    QSIMPLEQ_HEAD(, LuringAIOCB) sq_overflow;
> +} LuringQueue;
> +
> +typedef struct LuringState {
> +    AioContext *aio_context;
> +
> +    struct io_uring ring;
> +
> +    /* io queue for submit at batch.  Protected by AioContext lock. */
> +    LuringQueue io_q;
> +
> +    /* I/O completion processing.  Only runs in I/O thread.  */
> +    QEMUBH *completion_bh;
> +} LuringState;
> +
> +/**
> + * ioq_submit:
> + * @s: AIO state
> + *
> + * Queues pending sqes and submits them
> + *
> + */
> +static int ioq_submit(LuringState *s);
> +
> +/**
> + * qemu_luring_process_completions:
> + * @s: AIO state
> + *
> + * Fetches completed I/O requests, consumes cqes and invokes their callbacks.
> + *
> + */
> +static void qemu_luring_process_completions(LuringState *s)
> +{
> +    struct io_uring_cqe *cqes;
> +    int ret;
> +
> +    /*
> +     * Request completion callbacks can run the nested event loop.
> +     * Schedule ourselves so the nested event loop will "see" remaining
> +     * completed requests and process them.  Without this, completion
> +     * callbacks that wait for other requests using a nested event loop
> +     * would hang forever.
> +     */

About that qemu_bh_schedule
The code is copied from linux-aio.c where it was added with the below commit.

Author: Stefan Hajnoczi <address@hidden>
Date:   Mon Aug 4 16:56:33 2014 +0100

    linux-aio: avoid deadlock in nested aio_poll() calls
    
    If two Linux AIO request completions are fetched in the same
    io_getevents() call, QEMU will deadlock if request A's callback waits
    for request B to complete using an aio_poll() loop.  This was reported
    to happen with the mirror blockjob.
    
    This patch moves completion processing into a BH and makes it resumable.
    Nested event loops can resume completion processing so that request B
    will complete and the deadlock will not occur.
    
    Cc: Kevin Wolf <address@hidden>
    Cc: Paolo Bonzini <address@hidden>
    Cc: Ming Lei <address@hidden>
    Cc: Marcin Gibuła <address@hidden>
    Reported-by: Marcin Gibuła <address@hidden>
    Signed-off-by: Stefan Hajnoczi <address@hidden>
    Tested-by: Marcin Gibuła <address@hidden>


I kind of opened a Pandora box by researching that area suspecting that the 
same treatment is needed in other block drivers,
but after all, this is correct behaviour, and this is why:

The reason that the bottom half workaround is needed in linux-aio is because 
aio uses an eventfd which just notifies it
of the completions once, thus if the co-routine which handles the response does 
aio_poll, the same fd won't be returned again,
at least unless more events are received which is not guaranteed.


Here in io_uring, I think the same would happen. Looking at the kernel source I 
see that poll implementation uses 'poll_wait' which is basically
a wait queue which is woken up when new completion events are added to the 
uring, thus attempting to poll again on the same uring fd will indeed block,
even if there are events not yet processed.

For all other leaf block drivers (drivers that access the data, rather that 
forward the requests to another block driver), they are all networking based, 
thus they poll the communication socket.
When the same situation occurs the nested aio_poll will notice that the socket 
still has data and thus run the corresponding co-routine, thus preventing the 
deadlock.

I think that the two above comments should be added to the source in some way 
to document this so that next guy after me won't need to spend time 
understanding this.


BTW, nvme userspace driver also solves this issue by not entering the 
co-routine directly from aio fd handler,  but doing that from a bottom half 
which the handler schedules. 
This works because the nested aio_poll will
run the bottom halves again, but I suspect that this adds overhead that could 
be avoided. I'll look at that later.


> +    qemu_bh_schedule(s->completion_bh);
> +
> +    while (io_uring_peek_cqe(&s->ring, &cqes) == 0) {


Maybe consider using io_uring_for_each_cqe and then io_uring_cq_advance
to avoid acking one ring entry at the time ? 
However this probably will break the nesting.

> +        if (!cqes) {
> +            break;
> +        }
> +        LuringAIOCB *luringcb = io_uring_cqe_get_data(cqes);
> +        ret = cqes->res;
> +
> +        if (ret == luringcb->qiov->size) {
> +            ret = 0;
> +        } else if (ret >= 0) {


You should very carefully check the allowed return values here.

It looks like you can get '-EINTR' here, which would ask you to rerun the read 
operation, and otherwise
you will get the number of bytes read, which might be less that what was asked 
for, which implies that you
need to retry the read operation with the remainder of the buffer rather that 
zero the end of the buffer IMHO 

(0 is returned on EOF according to 'read' semantics, which I think are used 
here, thus a short read might not be an EOF)


Looking at linux-aio.c though I do see that it just passes through the returned 
value with no special treatments. 
including lack of check for -EINTR.

I assume that since aio is linux specific, and it only supports direct IO, it 
happens
to have assumption of no short reads/-EINTR (but since libaio has very sparse 
documentation I can't verify this)

On the other hand the aio=threads implementation actually does everything as 
specified on the 'write' manpage,
retrying the reads on -EINTR, and doing additional reads if less that required 
number of bytes were read.

Looking at io_uring implementation in the kernel I see that it does support 
synchronous (non O_DIRECT mode), 
and in this case, it goes through the same ->read_iter which is pretty much the 
same path that 
regular read() takes and so it might return short reads and or -EINTR.


> +            /* Short Read/Write */
> +            if (luringcb->is_read) {
> +                /* Read, pad with zeroes */
> +                qemu_iovec_memset(luringcb->qiov, ret, 0,
> +                luringcb->qiov->size - ret);
> +            } else {
> +                ret = -ENOSPC;;
> +            }
> +        }
> +        luringcb->ret = ret;
> +
> +        io_uring_cqe_seen(&s->ring, cqes);
> +        cqes = NULL;
> +        /* Change counters one-by-one because we can be nested. */
> +        s->io_q.in_flight--;
> +
> +        /*
> +         * If the coroutine is already entered it must be in ioq_submit()
> +         * and will notice luringcb->ret has been filled in when it
> +         * eventually runs later. Coroutines cannot be entered recursively
> +         * so avoid doing that!
> +         */
> +        if (!qemu_coroutine_entered(luringcb->co)) {
> +            aio_co_wake(luringcb->co);
> +        }
> +    }
> +    qemu_bh_cancel(s->completion_bh);
> +}
> +
> +static void qemu_luring_process_completions_and_submit(LuringState *s)
> +{
> +    aio_context_acquire(s->aio_context);
> +    qemu_luring_process_completions(s);
> +
> +    if (!s->io_q.plugged && s->io_q.in_queue > 0) {
> +        ioq_submit(s);
> +    }
> +    aio_context_release(s->aio_context);
> +}
> +
> +static void qemu_luring_completion_bh(void *opaque)
> +{
> +    LuringState *s = opaque;
> +    qemu_luring_process_completions_and_submit(s);
> +}
> +
> +static void qemu_luring_completion_cb(void *opaque)
> +{
> +    LuringState *s = opaque;
> +    qemu_luring_process_completions_and_submit(s);
> +}
> +
> +static void ioq_init(LuringQueue *io_q)
> +{
> +    QSIMPLEQ_INIT(&io_q->sq_overflow);
> +    io_q->plugged = 0;
> +    io_q->in_queue = 0;
> +    io_q->in_flight = 0;
> +    io_q->blocked = false;
> +}
> +
> +static int ioq_submit(LuringState *s)
> +{
> +    int ret = 0;
> +    LuringAIOCB *luringcb, *luringcb_next;
> +
> +    while (s->io_q.in_queue > 0) {
> +        QSIMPLEQ_FOREACH_SAFE(luringcb, &s->io_q.sq_overflow, next,
> +                              luringcb_next) {

I am torn about the 'sq_overflow' name. it seems to me that its not immediately 
clear that these
are the requests that are waiting because the io uring got full, but I can't 
now think of a better name.

Maybe add a comment here to explain what is going on here?

Also maybe we could somehow utilize the plug/unplug facility to avoid reaching 
that state in first place?
Maybe the block layer has some kind of 'max outstanding requests' limit that 
could be used?

In my nvme-mdev I opted to not process the input queues when such a condition 
is detected, but here you can't as the block layer
pretty much calls you to process the requests.


> +            struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
> +            if (!sqes) {
> +                break;
> +            }
> +            /* Prep sqe for submission */
> +            *sqes = luringcb->sqeq;
> +            QSIMPLEQ_REMOVE_HEAD(&s->io_q.sq_overflow, next);
> +        }
> +        ret =  io_uring_submit(&s->ring);
> +        /* Prevent infinite loop if submission is refused */
> +        if (ret <= 0) {
> +            if (ret == -EAGAIN) {
> +                continue;
> +            }
> +            break;
> +        }
> +        s->io_q.in_flight += ret;
> +        s->io_q.in_queue  -= ret;
> +    }
> +    s->io_q.blocked = (s->io_q.in_queue > 0);
> +
> +    if (s->io_q.in_flight) {
> +        /*
> +         * We can try to complete something just right away if there are
> +         * still requests in-flight.
> +         */
> +        qemu_luring_process_completions(s);
> +    }
> +    return ret;
> +}
> +
> +void luring_io_plug(BlockDriverState *bs, LuringState *s)
> +{
> +    s->io_q.plugged++;
> +}
> +
> +void luring_io_unplug(BlockDriverState *bs, LuringState *s)
> +{
> +    assert(s->io_q.plugged);
> +    if (--s->io_q.plugged == 0 &&
> +        !s->io_q.blocked && s->io_q.in_queue > 0) {
> +        ioq_submit(s);
> +    }
> +}
> +
> +/**
> + * luring_do_submit:
> + * @fd: file descriptor for I/O
> + * @luringcb: AIO control block
> + * @s: AIO state
> + * @offset: offset for request
> + * @type: type of request
> + *
> + * Fetches sqes from ring, adds to pending queue and preps them
> + *
> + */
> +static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s,
> +                            uint64_t offset, int type)
> +{
> +    struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
> +    if (!sqes) {
> +        sqes = &luringcb->sqeq;
> +        QSIMPLEQ_INSERT_TAIL(&s->io_q.sq_overflow, luringcb, next);
> +    }
> +
> +    switch (type) {
> +    case QEMU_AIO_WRITE:
> +        io_uring_prep_writev(sqes, fd, luringcb->qiov->iov,
> +                             luringcb->qiov->niov, offset);
> +        break;
> +    case QEMU_AIO_READ:
> +        io_uring_prep_readv(sqes, fd, luringcb->qiov->iov,
> +                            luringcb->qiov->niov, offset);
> +        break;
> +    case QEMU_AIO_FLUSH:
> +        io_uring_prep_fsync(sqes, fd, 0);
> +        break;
> +    default:
> +        fprintf(stderr, "%s: invalid AIO request type, aborting 0x%x.\n",
> +                        __func__, type);

Nitpick: Don't we use some king of error printing functions like 'error_setg' 
rather that fprintf?


> +        abort();
> +    }
> +    io_uring_sqe_set_data(sqes, luringcb);
> +    s->io_q.in_queue++;
> +
> +    if (!s->io_q.blocked &&
> +        (!s->io_q.plugged ||
> +         s->io_q.in_flight + s->io_q.in_queue >= MAX_EVENTS)) {
> +        return ioq_submit(s);
> +    }
> +    return 0;
> +}
> +
> +int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int 
> fd,
> +                                uint64_t offset, QEMUIOVector *qiov, int 
> type)
> +{
> +    int ret;
> +    LuringAIOCB luringcb = {
> +        .co         = qemu_coroutine_self(),
> +        .ret        = -EINPROGRESS,
> +        .qiov       = qiov,
> +        .is_read    = (type == QEMU_AIO_READ),
> +    };
> +
> +    ret = luring_do_submit(fd, &luringcb, s, offset, type);
> +    if (ret < 0) {
> +        return ret;
> +    }
> +
> +    if (luringcb.ret == -EINPROGRESS) {
> +        qemu_coroutine_yield();
> +    }
> +    return luringcb.ret;
> +}
> +
> +void luring_detach_aio_context(LuringState *s, AioContext *old_context)
> +{
> +    aio_set_fd_handler(old_context, s->ring.ring_fd, false, NULL, NULL, NULL,
> +                       s);
> +    qemu_bh_delete(s->completion_bh);
> +    s->aio_context = NULL;
> +}
> +
> +void luring_attach_aio_context(LuringState *s, AioContext *new_context)
> +{
> +    s->aio_context = new_context;
> +    s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s);
> +    aio_set_fd_handler(s->aio_context, s->ring.ring_fd, false,
> +                       qemu_luring_completion_cb, NULL, NULL, s);
> +}
> +
> +LuringState *luring_init(Error **errp)
> +{
> +    int rc;
> +    LuringState *s;
> +    s = g_malloc0(sizeof(*s));
> +    struct io_uring *ring = &s->ring;
> +    rc =  io_uring_queue_init(MAX_EVENTS, ring, 0);
> +    if (rc < 0) {
> +        error_setg_errno(errp, errno, "failed to init linux io_uring ring");
> +        g_free(s);
> +        return NULL;
> +    }
> +



> +    ioq_init(&s->io_q);

Another nitpick, maybe inline that function as it is used just here?
(that will save the static declaration upfront as well)
Feel free to leave this as is, if you think this way it is clearer.

> +    return s;
> +
> +}
> +
> +void luring_cleanup(LuringState *s)
> +{
> +    io_uring_queue_exit(&s->ring);
> +    g_free(s);
> +}
> diff --git a/include/block/aio.h b/include/block/aio.h
> index 0ca25dfec6..9da3fd9793 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -50,6 +50,7 @@ typedef void IOHandler(void *opaque);
>  struct Coroutine;
>  struct ThreadPool;
>  struct LinuxAioState;
> +struct LuringState;
>  
>  struct AioContext {
>      GSource source;
> @@ -118,11 +119,19 @@ struct AioContext {
>      struct ThreadPool *thread_pool;
>  
>  #ifdef CONFIG_LINUX_AIO
> -    /* State for native Linux AIO.  Uses aio_context_acquire/release for
> +    /*
> +     * State for native Linux AIO.  Uses aio_context_acquire/release for
>       * locking.
>       */
>      struct LinuxAioState *linux_aio;
>  #endif
> +#ifdef CONFIG_LINUX_IO_URING
> +    /*
> +     * State for Linux io_uring.  Uses aio_context_acquire/release for
> +     * locking.
> +     */
> +    struct LuringState *linux_io_uring;
> +#endif
>  
>      /* TimerLists for calling timers - one per clock type.  Has its own
>       * locking.
> @@ -387,6 +396,11 @@ struct LinuxAioState *aio_setup_linux_aio(AioContext 
> *ctx, Error **errp);
>  /* Return the LinuxAioState bound to this AioContext */
>  struct LinuxAioState *aio_get_linux_aio(AioContext *ctx);
>  
> +/* Setup the LuringState bound to this AioContext */
> +struct LuringState *aio_setup_linux_io_uring(AioContext *ctx, Error **errp);
> +
> +/* Return the LuringState bound to this AioContext */
> +struct LuringState *aio_get_linux_io_uring(AioContext *ctx);
>  /**
>   * aio_timer_new_with_attrs:
>   * @ctx: the aio context
> diff --git a/include/block/raw-aio.h b/include/block/raw-aio.h
> index 0cb7cc74a2..71d7d1395f 100644
> --- a/include/block/raw-aio.h
> +++ b/include/block/raw-aio.h
> @@ -55,6 +55,18 @@ void laio_attach_aio_context(LinuxAioState *s, AioContext 
> *new_context);
>  void laio_io_plug(BlockDriverState *bs, LinuxAioState *s);
>  void laio_io_unplug(BlockDriverState *bs, LinuxAioState *s);
>  #endif
> +/* io_uring.c - Linux io_uring implementation */
> +#ifdef CONFIG_LINUX_IO_URING
> +typedef struct LuringState LuringState;
> +LuringState *luring_init(Error **errp);
> +void luring_cleanup(LuringState *s);
> +int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int 
> fd,
> +                                uint64_t offset, QEMUIOVector *qiov, int 
> type);
> +void luring_detach_aio_context(LuringState *s, AioContext *old_context);
> +void luring_attach_aio_context(LuringState *s, AioContext *new_context);
> +void luring_io_plug(BlockDriverState *bs, LuringState *s);
> +void luring_io_unplug(BlockDriverState *bs, LuringState *s);
> +#endif
>  
>  #ifdef _WIN32
>  typedef struct QEMUWin32AIOState QEMUWin32AIOState;


I plan on this or next week to do some benchmarks of the code and I will share 
the results as soon
as I do them.

Please pardon me if I made some mistakes in the review because most of the qemu 
is new for me,
so I don't yet know well most of the stuff here.

Best regards,
        Maxim Levitsky






reply via email to

[Prev in Thread] Current Thread [Next in Thread]