qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH] linux-aio: avoid deadlock in nested aio_poll() call


From: Stefan Hajnoczi
Subject: [Qemu-devel] [PATCH] linux-aio: avoid deadlock in nested aio_poll() calls
Date: Mon, 4 Aug 2014 16:56:33 +0100

If two Linux AIO request completions are fetched in the same
io_getevents() call, QEMU will deadlock if request A's callback waits
for request B to complete using an aio_poll() loop.  This was reported
to happen with the mirror blockjob.

This patch moves completion processing into a BH and makes it resumable.
Nested event loops can resume completion processing so that request B
will complete and the deadlock will not occur.

Cc: Kevin Wolf <address@hidden>
Cc: Paolo Bonzini <address@hidden>
Cc: Ming Lei <address@hidden>
Cc: Marcin Gibuła <address@hidden>
Reported-by: Marcin Gibuła <address@hidden>
Signed-off-by: Stefan Hajnoczi <address@hidden>
---
 block/linux-aio.c | 71 ++++++++++++++++++++++++++++++++++++++++++-------------
 1 file changed, 55 insertions(+), 16 deletions(-)

diff --git a/block/linux-aio.c b/block/linux-aio.c
index 7ac7e8c..9aca758 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -51,6 +51,12 @@ struct qemu_laio_state {
 
     /* io queue for submit at batch */
     LaioQueue io_q;
+
+    /* I/O completion processing */
+    QEMUBH *completion_bh;
+    struct io_event events[MAX_EVENTS];
+    int event_idx;
+    int event_max;
 };
 
 static inline ssize_t io_event_ret(struct io_event *ev)
@@ -86,27 +92,58 @@ static void qemu_laio_process_completion(struct 
qemu_laio_state *s,
     qemu_aio_release(laiocb);
 }
 
-static void qemu_laio_completion_cb(EventNotifier *e)
+/* The completion BH fetches completed I/O requests and invokes their
+ * callbacks.
+ *
+ * The function is somewhat tricky because it supports nested event loops, for
+ * example when a request callback invokes aio_poll().  In order to do this,
+ * the completion events array and index are kept in qemu_laio_state.  The BH
+ * reschedules itself as long as there are completions pending so it will
+ * either be called again in a nested event loop or will be called after all
+ * events have been completed.  When there are no events left to complete, the
+ * BH returns without rescheduling.
+ */
+static void qemu_laio_completion_bh(void *opaque)
 {
-    struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
-
-    while (event_notifier_test_and_clear(&s->e)) {
-        struct io_event events[MAX_EVENTS];
-        struct timespec ts = { 0 };
-        int nevents, i;
+    struct qemu_laio_state *s = opaque;
 
+    /* Fetch more completion events when empty */
+    if (s->event_idx == s->event_max) {
         do {
-            nevents = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, events, 
&ts);
-        } while (nevents == -EINTR);
+            struct timespec ts = { 0 };
+            s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS,
+                                        s->events, &ts);
+        } while (s->event_max == -EINTR);
+
+        s->event_idx = 0;
+        if (s->event_max <= 0) {
+            s->event_max = 0;
+            return; /* no more events */
+        }
+    }
 
-        for (i = 0; i < nevents; i++) {
-            struct iocb *iocb = events[i].obj;
-            struct qemu_laiocb *laiocb =
-                    container_of(iocb, struct qemu_laiocb, iocb);
+    /* Reschedule so nested event loops see currently pending completions */
+    qemu_bh_schedule(s->completion_bh);
 
-            laiocb->ret = io_event_ret(&events[i]);
-            qemu_laio_process_completion(s, laiocb);
-        }
+    /* Process completion events */
+    while (s->event_idx < s->event_max) {
+        struct iocb *iocb = s->events[s->event_idx].obj;
+        struct qemu_laiocb *laiocb =
+                container_of(iocb, struct qemu_laiocb, iocb);
+
+        laiocb->ret = io_event_ret(&s->events[s->event_idx]);
+        s->event_idx++;
+
+        qemu_laio_process_completion(s, laiocb);
+    }
+}
+
+static void qemu_laio_completion_cb(EventNotifier *e)
+{
+    struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
+
+    if (event_notifier_test_and_clear(&s->e)) {
+        qemu_bh_schedule(s->completion_bh);
     }
 }
 
@@ -272,12 +309,14 @@ void laio_detach_aio_context(void *s_, AioContext 
*old_context)
     struct qemu_laio_state *s = s_;
 
     aio_set_event_notifier(old_context, &s->e, NULL);
+    qemu_bh_delete(s->completion_bh);
 }
 
 void laio_attach_aio_context(void *s_, AioContext *new_context)
 {
     struct qemu_laio_state *s = s_;
 
+    s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
     aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
 }
 
-- 
1.9.3




reply via email to

[Prev in Thread] Current Thread [Next in Thread]