qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH 11/15] block-backend: make queued_requests thread-safe


From: Paolo Bonzini
Subject: [PATCH 11/15] block-backend: make queued_requests thread-safe
Date: Mon, 12 Dec 2022 13:59:16 +0100

Protect quiesce_counter and queued_requests with a QemuMutex, so that
they are protected from concurrent access in the main thread (for example
blk_root_drained_end() reached from bdrv_drain_all()) and in the iothread
(where any I/O operation will call blk_inc_in_flight()).

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/block-backend.c | 44 +++++++++++++++++++++++++++++++++++--------
 1 file changed, 36 insertions(+), 8 deletions(-)

diff --git a/block/block-backend.c b/block/block-backend.c
index 627d491d4155..fdf82f1f1599 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -23,6 +23,7 @@
 #include "qapi/error.h"
 #include "qapi/qapi-events-block.h"
 #include "qemu/id.h"
+#include "qemu/thread.h"
 #include "qemu/main-loop.h"
 #include "qemu/option.h"
 #include "trace.h"
@@ -85,6 +86,8 @@ struct BlockBackend {
     NotifierList remove_bs_notifiers, insert_bs_notifiers;
     QLIST_HEAD(, BlockBackendAioNotifier) aio_notifiers;
 
+    /* Protected by quiesce_lock.  */
+    QemuMutex quiesce_lock;
     int quiesce_counter;
     CoQueue queued_requests;
 
@@ -372,6 +375,7 @@ BlockBackend *blk_new(AioContext *ctx, uint64_t perm, 
uint64_t shared_perm)
 
     block_acct_init(&blk->stats);
 
+    qemu_mutex_init(&blk->quiesce_lock);
     qemu_co_queue_init(&blk->queued_requests);
     notifier_list_init(&blk->remove_bs_notifiers);
     notifier_list_init(&blk->insert_bs_notifiers);
@@ -490,6 +494,7 @@ static void blk_delete(BlockBackend *blk)
     assert(QLIST_EMPTY(&blk->insert_bs_notifiers.notifiers));
     assert(QLIST_EMPTY(&blk->aio_notifiers));
     QTAILQ_REMOVE(&block_backends, blk, link);
+    qemu_mutex_destroy(&blk->quiesce_lock);
     drive_info_del(blk->legacy_dinfo);
     block_acct_cleanup(&blk->stats);
     g_free(blk);
@@ -1451,11 +1456,25 @@ void blk_inc_in_flight(BlockBackend *blk)
 {
     IO_CODE();
     qatomic_inc(&blk->in_flight);
-    if (!blk->disable_request_queuing) {
-        /* TODO: this is not thread-safe! */
+
+    /*
+     * Avoid a continuous stream of requests from AIO callbacks, which
+     * call a user-provided callback while blk->in_flight is elevated,
+     * if the BlockBackend is being quiesced.
+     *
+     * This initial test does not have to be perfect: a race might
+     * cause an extra I/O to be queued, but sooner or later a nonzero
+     * quiesce_counter will be observed.
+     */
+    if (!blk->disable_request_queuing && qatomic_read(&blk->quiesce_counter)) {
+        /*
+         * ... on the other hand, it is important that the final check and
+        * the wait are done under the lock, so that no wakeups are missed.
+         */
+        QEMU_LOCK_GUARD(&blk->quiesce_lock);
         while (blk->quiesce_counter) {
             qatomic_dec(&blk->in_flight);
-            qemu_co_queue_wait(&blk->queued_requests, NULL);
+            qemu_co_queue_wait(&blk->queued_requests, &blk->quiesce_lock);
             qatomic_inc(&blk->in_flight);
         }
     }
@@ -2542,7 +2561,8 @@ static void blk_root_drained_begin(BdrvChild *child)
     BlockBackend *blk = child->opaque;
     ThrottleGroupMember *tgm = &blk->public.throttle_group_member;
 
-    if (++blk->quiesce_counter == 1) {
+    qatomic_set(&blk->quiesce_counter, blk->quiesce_counter + 1);
+    if (blk->quiesce_counter == 1) {
         if (blk->dev_ops && blk->dev_ops->drained_begin) {
             blk->dev_ops->drained_begin(blk->dev_opaque);
         }
@@ -2560,6 +2580,7 @@ static bool blk_root_drained_poll(BdrvChild *child)
 {
     BlockBackend *blk = child->opaque;
     bool busy = false;
+
     assert(blk->quiesce_counter);
 
     if (blk->dev_ops && blk->dev_ops->drained_poll) {
@@ -2576,14 +2597,21 @@ static void blk_root_drained_end(BdrvChild *child)
     assert(blk->public.throttle_group_member.io_limits_disabled);
     qatomic_dec(&blk->public.throttle_group_member.io_limits_disabled);
 
-    if (--blk->quiesce_counter == 0) {
+    qemu_mutex_lock(&blk->quiesce_lock);
+    qatomic_set(&blk->quiesce_counter, blk->quiesce_counter - 1);
+    if (blk->quiesce_counter == 0) {
+        /* After this point, new I/O will not sleep on queued_requests.  */
+        qemu_mutex_unlock(&blk->quiesce_lock);
+
         if (blk->dev_ops && blk->dev_ops->drained_end) {
             blk->dev_ops->drained_end(blk->dev_opaque);
         }
-        while (qemu_co_enter_next(&blk->queued_requests, NULL)) {
-            /* Resume all queued requests */
-        }
+
+        /* Now resume all queued requests */
+        qemu_mutex_lock(&blk->quiesce_lock);
+        qemu_co_enter_all(&blk->queued_requests, &blk->quiesce_lock);
     }
+    qemu_mutex_unlock(&blk->quiesce_lock);
 }
 
 bool blk_register_buf(BlockBackend *blk, void *host, size_t size, Error **errp)
-- 
2.38.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]