qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH 12/18] block: explicitly acquire aiocontext in callb


From: Paolo Bonzini
Subject: [Qemu-devel] [PATCH 12/18] block: explicitly acquire aiocontext in callbacks that need it
Date: Thu, 6 Aug 2015 15:36:10 +0200

Signed-off-by: Paolo Bonzini <address@hidden>
---
 aio-posix.c                     |  4 ----
 aio-win32.c                     |  6 ------
 block/curl.c                    | 16 ++++++++++++---
 block/iscsi.c                   |  4 ++++
 block/nbd-client.c              | 14 +++++++++++--
 block/nfs.c                     |  6 ++++++
 block/sheepdog.c                | 29 ++++++++++++++++----------
 block/ssh.c                     | 45 ++++++++++++++++++++---------------------
 block/win32-aio.c               | 10 +++++----
 hw/block/dataplane/virtio-blk.c |  2 ++
 hw/scsi/virtio-scsi-dataplane.c |  6 ++++++
 nbd.c                           |  4 ++++
 12 files changed, 93 insertions(+), 53 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index 4944595..58f0937 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -163,9 +163,7 @@ bool aio_dispatch(AioContext *ctx)
         if (!node->deleted &&
             (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
             node->io_read) {
-            aio_context_acquire(ctx);
             node->io_read(node->opaque);
-            aio_context_release(ctx);
 
             /* aio_notify() does not count as progress */
             if (node->opaque != &ctx->notifier) {
@@ -175,9 +173,7 @@ bool aio_dispatch(AioContext *ctx)
         if (!node->deleted &&
             (revents & (G_IO_OUT | G_IO_ERR)) &&
             node->io_write) {
-            aio_context_acquire(ctx);
             node->io_write(node->opaque);
-            aio_context_release(ctx);
             progress = true;
         }
 
diff --git a/aio-win32.c b/aio-win32.c
index 929ed49..f6608b3 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -239,9 +239,7 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE 
event)
         if (!node->deleted &&
             (revents || event_notifier_get_handle(node->e) == event) &&
             node->io_notify) {
-            aio_context_acquire(ctx);
             node->io_notify(node->e);
-            aio_context_release(ctx);
 
             /* aio_notify() does not count as progress */
             if (node->e != &ctx->notifier) {
@@ -252,15 +250,11 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE 
event)
         if (!node->deleted &&
             (node->io_read || node->io_write)) {
             if ((revents & G_IO_IN) && node->io_read) {
-                aio_context_acquire(ctx);
                 node->io_read(node->opaque);
-                aio_context_release(ctx);
                 progress = true;
             }
             if ((revents & G_IO_OUT) && node->io_write) {
-                aio_context_acquire(ctx);
                 node->io_write(node->opaque);
-                aio_context_release(ctx);
                 progress = true;
             }
 
diff --git a/block/curl.c b/block/curl.c
index 032cc8a..b572828 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -330,9 +330,8 @@ static void curl_multi_check_completion(BDRVCURLState *s)
     }
 }
 
-static void curl_multi_do(void *arg)
+static void curl_multi_do_locked(CURLState *s)
 {
-    CURLState *s = (CURLState *)arg;
     int running;
     int r;
 
@@ -346,12 +345,23 @@ static void curl_multi_do(void *arg)
 
 }
 
+static void curl_multi_do(void *arg)
+{
+    CURLState *s = (CURLState *)arg;
+
+    aio_context_acquire(s->s->aio_context);
+    curl_multi_do_locked(s);
+    aio_context_release(s->s->aio_context);
+}
+
 static void curl_multi_read(void *arg)
 {
     CURLState *s = (CURLState *)arg;
 
-    curl_multi_do(arg);
+    aio_context_acquire(s->s->aio_context);
+    curl_multi_do_locked(s);
     curl_multi_check_completion(s->s);
+    aio_context_release(s->s->aio_context);
 }
 
 static void curl_multi_timeout_do(void *arg)
diff --git a/block/iscsi.c b/block/iscsi.c
index 5002916..1c3f99b 100644
--- a/block/iscsi.c
+++ b/block/iscsi.c
@@ -326,8 +326,10 @@ iscsi_process_read(void *arg)
     IscsiLun *iscsilun = arg;
     struct iscsi_context *iscsi = iscsilun->iscsi;
 
+    aio_context_acquire(iscsilun->aio_context);
     iscsi_service(iscsi, POLLIN);
     iscsi_set_events(iscsilun);
+    aio_context_release(iscsilun->aio_context);
 }
 
 static void
@@ -336,8 +338,10 @@ iscsi_process_write(void *arg)
     IscsiLun *iscsilun = arg;
     struct iscsi_context *iscsi = iscsilun->iscsi;
 
+    aio_context_acquire(iscsilun->aio_context);
     iscsi_service(iscsi, POLLOUT);
     iscsi_set_events(iscsilun);
+    aio_context_release(iscsilun->aio_context);
 }
 
 static int64_t sector_lun2qemu(int64_t sector, IscsiLun *iscsilun)
diff --git a/block/nbd-client.c b/block/nbd-client.c
index e1bb919..020399e 100644
--- a/block/nbd-client.c
+++ b/block/nbd-client.c
@@ -56,9 +56,8 @@ static void nbd_teardown_connection(BlockDriverState *bs)
     client->sock = -1;
 }
 
-static void nbd_reply_ready(void *opaque)
+static void nbd_reply_ready_locked(BlockDriverState *bs)
 {
-    BlockDriverState *bs = opaque;
     NbdClientSession *s = nbd_get_client_session(bs);
     uint64_t i;
     int ret;
@@ -95,11 +94,22 @@ fail:
     nbd_teardown_connection(bs);
 }
 
+static void nbd_reply_ready(void *opaque)
+{
+    BlockDriverState *bs = opaque;
+
+    aio_context_acquire(bdrv_get_aio_context(bs));
+    nbd_reply_ready_locked(bs);
+    aio_context_release(bdrv_get_aio_context(bs));
+}
+
 static void nbd_restart_write(void *opaque)
 {
     BlockDriverState *bs = opaque;
 
+    aio_context_acquire(bdrv_get_aio_context(bs));
     qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine, NULL);
+    aio_context_release(bdrv_get_aio_context(bs));
 }
 
 static int nbd_co_send_request(BlockDriverState *bs,
diff --git a/block/nfs.c b/block/nfs.c
index c026ff6..05b02f5 100644
--- a/block/nfs.c
+++ b/block/nfs.c
@@ -75,15 +75,21 @@ static void nfs_set_events(NFSClient *client)
 static void nfs_process_read(void *arg)
 {
     NFSClient *client = arg;
+
+    aio_context_acquire(client->aio_context);
     nfs_service(client->context, POLLIN);
     nfs_set_events(client);
+    aio_context_release(client->aio_context);
 }
 
 static void nfs_process_write(void *arg)
 {
     NFSClient *client = arg;
+
+    aio_context_acquire(client->aio_context);
     nfs_service(client->context, POLLOUT);
     nfs_set_events(client);
+    aio_context_release(client->aio_context);
 }
 
 static void nfs_co_init_task(NFSClient *client, NFSRPC *task)
diff --git a/block/sheepdog.c b/block/sheepdog.c
index 9585beb..2f2731e 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -608,13 +608,6 @@ static coroutine_fn int send_co_req(int sockfd, 
SheepdogReq *hdr, void *data,
     return ret;
 }
 
-static void restart_co_req(void *opaque)
-{
-    Coroutine *co = opaque;
-
-    qemu_coroutine_enter(co, NULL);
-}
-
 typedef struct SheepdogReqCo {
     int sockfd;
     AioContext *aio_context;
@@ -624,12 +617,21 @@ typedef struct SheepdogReqCo {
     unsigned int *rlen;
     int ret;
     bool finished;
+    Coroutine *co;
 } SheepdogReqCo;
 
+static void restart_co_req(void *opaque)
+{
+    SheepdogReqCo *srco = opaque;
+
+    aio_context_acquire(srco->aio_context);
+    qemu_coroutine_enter(srco->co, NULL);
+    aio_context_release(srco->aio_context);
+}
+
 static coroutine_fn void do_co_req(void *opaque)
 {
     int ret;
-    Coroutine *co;
     SheepdogReqCo *srco = opaque;
     int sockfd = srco->sockfd;
     SheepdogReq *hdr = srco->hdr;
@@ -637,15 +639,15 @@ static coroutine_fn void do_co_req(void *opaque)
     unsigned int *wlen = srco->wlen;
     unsigned int *rlen = srco->rlen;
 
-    co = qemu_coroutine_self();
-    aio_set_fd_handler(srco->aio_context, sockfd, NULL, restart_co_req, co);
+    srco->co = qemu_coroutine_self();
+    aio_set_fd_handler(srco->aio_context, sockfd, NULL, restart_co_req, srco);
 
     ret = send_co_req(sockfd, hdr, data, wlen);
     if (ret < 0) {
         goto out;
     }
 
-    aio_set_fd_handler(srco->aio_context, sockfd, restart_co_req, NULL, co);
+    aio_set_fd_handler(srco->aio_context, sockfd, restart_co_req, NULL, srco);
 
     ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
     if (ret != sizeof(*hdr)) {
@@ -672,6 +674,7 @@ out:
      * set each handler to NULL. */
     aio_set_fd_handler(srco->aio_context, sockfd, NULL, NULL, NULL);
 
+    srco->co = NULL;
     srco->ret = ret;
     srco->finished = true;
 }
@@ -904,14 +907,18 @@ static void co_read_response(void *opaque)
         s->co_recv = qemu_coroutine_create(aio_read_response);
     }
 
+    aio_context_acquire(s->aio_context);
     qemu_coroutine_enter(s->co_recv, opaque);
+    aio_context_release(s->aio_context);
 }
 
 static void co_write_request(void *opaque)
 {
     BDRVSheepdogState *s = opaque;
 
+    aio_context_acquire(s->aio_context);
     qemu_coroutine_enter(s->co_send, NULL);
+    aio_context_release(s->aio_context);
 }
 
 /*
diff --git a/block/ssh.c b/block/ssh.c
index 8d06739..783df9e 100644
--- a/block/ssh.c
+++ b/block/ssh.c
@@ -775,20 +775,34 @@ static int ssh_has_zero_init(BlockDriverState *bs)
     return has_zero_init;
 }
 
+typedef struct BDRVSSHRestart {
+    Coroutine *co;
+    AioContext *ctx;
+} BDRVSSHRestart;
+
 static void restart_coroutine(void *opaque)
 {
-    Coroutine *co = opaque;
+    BDRVSSHRestart *restart = opaque;
 
-    DPRINTF("co=%p", co);
+    DPRINTF("ctx=%p co=%p", restart->ctx, restart->co);
 
-    qemu_coroutine_enter(co, NULL);
+    aio_context_acquire(restart->ctx);
+    qemu_coroutine_enter(restart->co, NULL);
+    aio_context_release(restart->ctx);
 }
 
-static coroutine_fn void set_fd_handler(BDRVSSHState *s, BlockDriverState *bs)
+/* A non-blocking call returned EAGAIN, so yield, ensuring the
+ * handlers are set up so that we'll be rescheduled when there is an
+ * interesting event on the socket.
+ */
+static coroutine_fn void co_yield(BDRVSSHState *s, BlockDriverState *bs)
 {
     int r;
     IOHandler *rd_handler = NULL, *wr_handler = NULL;
-    Coroutine *co = qemu_coroutine_self();
+    BDRVSSHRestart restart = {
+        .ctx = bdrv_get_aio_context(bs),
+        .co = qemu_coroutine_self()
+    };
 
     r = libssh2_session_block_directions(s->session);
 
@@ -803,25 +817,10 @@ static coroutine_fn void set_fd_handler(BDRVSSHState *s, 
BlockDriverState *bs)
             rd_handler, wr_handler);
 
     aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock,
-                       rd_handler, wr_handler, co);
-}
-
-static coroutine_fn void clear_fd_handler(BDRVSSHState *s,
-                                          BlockDriverState *bs)
-{
-    DPRINTF("s->sock=%d", s->sock);
-    aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, NULL, NULL, NULL);
-}
-
-/* A non-blocking call returned EAGAIN, so yield, ensuring the
- * handlers are set up so that we'll be rescheduled when there is an
- * interesting event on the socket.
- */
-static coroutine_fn void co_yield(BDRVSSHState *s, BlockDriverState *bs)
-{
-    set_fd_handler(s, bs);
+                       rd_handler, wr_handler, &restart);
     qemu_coroutine_yield();
-    clear_fd_handler(s, bs);
+    DPRINTF("s->sock=%d - back", s->sock);
+    aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, NULL, NULL, NULL);
 }
 
 /* SFTP has a function `libssh2_sftp_seek64' which seeks to a position
diff --git a/block/win32-aio.c b/block/win32-aio.c
index 64e8682..a78d149 100644
--- a/block/win32-aio.c
+++ b/block/win32-aio.c
@@ -40,7 +40,7 @@ struct QEMUWin32AIOState {
     HANDLE hIOCP;
     EventNotifier e;
     int count;
-    bool is_aio_context_attached;
+    AioContext *aio_ctx;
 };
 
 typedef struct QEMUWin32AIOCB {
@@ -87,7 +87,9 @@ static void win32_aio_process_completion(QEMUWin32AIOState *s,
     }
 
 
+    aio_context_acquire(s->aio_ctx);
     waiocb->common.cb(waiocb->common.opaque, ret);
+    aio_context_release(s->aio_ctx);
     qemu_aio_unref(waiocb);
 }
 
@@ -175,13 +177,13 @@ void win32_aio_detach_aio_context(QEMUWin32AIOState *aio,
                                   AioContext *old_context)
 {
     aio_set_event_notifier(old_context, &aio->e, NULL);
-    aio->is_aio_context_attached = false;
+    aio->aio_ctx = NULL;
 }
 
 void win32_aio_attach_aio_context(QEMUWin32AIOState *aio,
                                   AioContext *new_context)
 {
-    aio->is_aio_context_attached = true;
+    aio->aio_ctx = new_context;
     aio_set_event_notifier(new_context, &aio->e, win32_aio_completion_cb);
 }
 
@@ -210,7 +212,7 @@ out_free_state:
 
 void win32_aio_cleanup(QEMUWin32AIOState *aio)
 {
-    assert(!aio->is_aio_context_attached);
+    assert(!aio->aio_ctx);
     CloseHandle(aio->hIOCP);
     event_notifier_cleanup(&aio->e);
     g_free(aio);
diff --git a/hw/block/dataplane/virtio-blk.c b/hw/block/dataplane/virtio-blk.c
index 6106e46..7c11fbd 100644
--- a/hw/block/dataplane/virtio-blk.c
+++ b/hw/block/dataplane/virtio-blk.c
@@ -95,6 +95,7 @@ static void handle_notify(EventNotifier *e)
     VirtIOBlock *vblk = VIRTIO_BLK(s->vdev);
 
     event_notifier_test_and_clear(&s->host_notifier);
+    aio_context_acquire(s->ctx);
     blk_io_plug(s->conf->conf.blk);
     for (;;) {
         MultiReqBuffer mrb = {};
@@ -135,6 +136,7 @@ static void handle_notify(EventNotifier *e)
         }
     }
     blk_io_unplug(s->conf->conf.blk);
+    aio_context_release(s->ctx);
 }
 
 /* Context: QEMU global mutex held */
diff --git a/hw/scsi/virtio-scsi-dataplane.c b/hw/scsi/virtio-scsi-dataplane.c
index 5575648..b588b692 100644
--- a/hw/scsi/virtio-scsi-dataplane.c
+++ b/hw/scsi/virtio-scsi-dataplane.c
@@ -112,9 +112,11 @@ static void virtio_scsi_iothread_handle_ctrl(EventNotifier 
*notifier)
     VirtIOSCSIReq *req;
 
     event_notifier_test_and_clear(notifier);
+    aio_context_acquire(s->ctx);
     while ((req = virtio_scsi_pop_req_vring(s, vring))) {
         virtio_scsi_handle_ctrl_req(s, req);
     }
+    aio_context_release(s->ctx);
 }
 
 static void virtio_scsi_iothread_handle_event(EventNotifier *notifier)
@@ -130,9 +132,11 @@ static void 
virtio_scsi_iothread_handle_event(EventNotifier *notifier)
         return;
     }
 
+    aio_context_acquire(s->ctx);
     if (s->events_dropped) {
         virtio_scsi_push_event(s, NULL, VIRTIO_SCSI_T_NO_EVENT, 0);
     }
+    aio_context_release(s->ctx);
 }
 
 static void virtio_scsi_iothread_handle_cmd(EventNotifier *notifier)
@@ -144,6 +148,7 @@ static void virtio_scsi_iothread_handle_cmd(EventNotifier 
*notifier)
     QTAILQ_HEAD(, VirtIOSCSIReq) reqs = QTAILQ_HEAD_INITIALIZER(reqs);
 
     event_notifier_test_and_clear(notifier);
+    aio_context_acquire(s->ctx);
     while ((req = virtio_scsi_pop_req_vring(s, vring))) {
         if (virtio_scsi_handle_cmd_req_prepare(s, req)) {
             QTAILQ_INSERT_TAIL(&reqs, req, next);
@@ -153,6 +158,7 @@ static void virtio_scsi_iothread_handle_cmd(EventNotifier 
*notifier)
     QTAILQ_FOREACH_SAFE(req, &reqs, next, next) {
         virtio_scsi_handle_cmd_req_submit(s, req);
     }
+    aio_context_release(s->ctx);
 }
 
 /* assumes s->ctx held */
diff --git a/nbd.c b/nbd.c
index 06b501b..497189c 100644
--- a/nbd.c
+++ b/nbd.c
@@ -1436,6 +1436,10 @@ static void nbd_restart_write(void *opaque)
 static void nbd_set_handlers(NBDClient *client)
 {
     if (client->exp && client->exp->ctx) {
+        /* Note that the handlers do not expect any concurrency; qemu-nbd
+         * does not instantiate multiple AioContexts yet, nor does it call
+         * aio_poll/aio_dispatch from multiple threads.
+         */
         aio_set_fd_handler(client->exp->ctx, client->sock,
                            client->can_read ? nbd_read : NULL,
                            client->send_coroutine ? nbd_restart_write : NULL,
-- 
2.4.3





reply via email to

[Prev in Thread] Current Thread [Next in Thread]