qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [RFC] virtio-blk: simple multithreaded MQ implementation fo


From: Roman Pen
Subject: [Qemu-devel] [RFC] virtio-blk: simple multithreaded MQ implementation for bdrv_raw
Date: Fri, 27 May 2016 13:55:04 +0200

Hello, all.

This is RFC because mostly this patch is a quick attempt to get true
multithreaded multiqueue support for a block device with native AIO.
The goal is to squeeze everything possible on lockless IO path from
MQ block on a guest to MQ block on a host.

To avoid any locks in qemu backend and not to introduce thread safety
into qemu block-layer I open same backend device several times, one
device per one MQ.  e.g. the following is the stack for a virtio-blk
with num-queues=2:

            VirtIOBlock
               /   \
     VirtQueue#0   VirtQueue#1
      IOThread#0    IOThread#1
         BH#0          BH#1
      Backend#0     Backend#1
               \   /
             /dev/null0

To group all objects related to one vq new structure is introduced:

    typedef struct VirtQueueCtx {
        BlockBackend *blk;
        struct VirtIOBlock *s;
        VirtQueue *vq;
        void *rq;
        QEMUBH *bh;
        QEMUBH *batch_notify_bh;
        IOThread *iothread;
        Notifier insert_notifier;
        Notifier remove_notifier;
        /* Operation blocker on BDS */
        Error *blocker;
    } VirtQueueCtx;

And VirtIOBlock includes an array of these contexts:

     typedef struct VirtIOBlock {
         VirtIODevice parent_obj;
    +    VirtQueueCtx mq[VIRTIO_QUEUE_MAX];
     ...

This patch is based on Stefan's series: "virtio-blk: multiqueue support",
with minor difference: I reverted "virtio-blk: multiqueue batch notify",
which does not make a lot sense when each VQ is handled by it's own
iothread.

The qemu configuration stays the same, i.e. put num-queues=N and N
iothreads will be started on demand and N drives will be opened:

    qemu -device virtio-blk-pci,num-queues=8

My configuration is the following:

host:
    Intel(R) Core(TM) i7-4770 CPU @ 3.40GHz,
    8 CPUs,
    /dev/nullb0 as backend with the following parameters:
      $ cat /sys/module/null_blk/parameters/submit_queues
      8
      $ cat /sys/module/null_blk/parameters/irqmode
      1

guest:
    8 VCPUs

qemu:
    -object iothread,id=t0 \
    -drive 
if=none,id=d0,file=/dev/nullb0,format=raw,snapshot=off,cache=none,aio=native \
    -device 
virtio-blk-pci,num-queues=$N,iothread=t0,drive=d0,disable-modern=off,disable-legacy=on

    where $N varies during the tests.

fio:
    [global]
    description=Emulation of Storage Server Access Pattern
    bssplit=512/20:1k/16:2k/9:4k/12:8k/19:16k/10:32k/8:64k/4
    fadvise_hint=0
    rw=randrw:2
    direct=1

    ioengine=libaio
    iodepth=64
    iodepth_batch_submit=64
    iodepth_batch_complete=64
    numjobs=8
    gtod_reduce=1
    group_reporting=1

    time_based=1
    runtime=30

    [job]
    filename=/dev/vda

Results:
    num-queues   RD bw      WR bw
    ----------   -----      -----

    * with 1 iothread *

    1 thr 1 mq   1225MB/s   1221MB/s
    1 thr 2 mq   1559MB/s   1553MB/s
    1 thr 4 mq   1729MB/s   1725MB/s
    1 thr 8 mq   1660MB/s   1655MB/s

    * with N iothreads *

    2 thr 2 mq   1845MB/s   1842MB/s
    4 thr 4 mq   2187MB/s   2183MB/s
    8 thr 8 mq   1383MB/s   1378MB/s

Obviously, 8 iothreads + 8 vcpu threads is too much for my machine
with 8 CPUs, but 4 iothreads show quite good result.

This patch will work only for raw block device or for non-expandable
raw image files, because it is quite clear what will happen if any
expandable image is opened and accessed from many threads :)  Also
any alignment attempts on write or zeroing blocks or what ever lead
to corruptions.

So these options 'snapshot=off,cache=none,aio=native' are specified in
a hope that it is enough to stop qemu from doing smart things and will
force it simply to forward IO request from guest to host immediately.
Fio verification test and couple of filesystem tests show that nothing
terrible has happend.

I also did an attempt to assign several AIO contexts to one BlockBackend
device in order to access one BlockBackend from many iothreads, but I
failed to finish this part in reasonable time.  Of course, my qemu block
layer experience is rather shallow and I do not see obvious ways how
to make this IO path thread-safe without changing everything all over
the place.

--
Roman

Signed-off-by: Roman Pen <address@hidden>
Cc: Stefan Hajnoczi <address@hidden>
Cc: address@hidden
---
 hw/block/dataplane/virtio-blk.c | 163 +++++++++---------
 hw/block/virtio-blk.c           | 364 ++++++++++++++++++++++++++++------------
 include/hw/virtio/virtio-blk.h  |  45 ++++-
 3 files changed, 380 insertions(+), 192 deletions(-)

diff --git a/hw/block/dataplane/virtio-blk.c b/hw/block/dataplane/virtio-blk.c
index 48c0bb7..004d4a5 100644
--- a/hw/block/dataplane/virtio-blk.c
+++ b/hw/block/dataplane/virtio-blk.c
@@ -32,19 +32,6 @@ struct VirtIOBlockDataPlane {
 
     VirtIOBlkConf *conf;
     VirtIODevice *vdev;
-
-    Notifier insert_notifier, remove_notifier;
-
-    /* Note that these EventNotifiers are assigned by value.  This is
-     * fine as long as you do not call event_notifier_cleanup on them
-     * (because you don't own the file descriptor or handle; you just
-     * use it).
-     */
-    IOThread *iothread;
-    AioContext *ctx;
-
-    /* Operation blocker on BDS */
-    Error *blocker;
 };
 
 /* Raise an interrupt to signal guest, if necessary */
@@ -57,52 +44,50 @@ void virtio_blk_data_plane_notify(VirtIOBlockDataPlane *s, 
VirtQueue *vq)
     event_notifier_set(virtio_queue_get_guest_notifier(vq));
 }
 
-static void data_plane_set_up_op_blockers(VirtIOBlockDataPlane *s)
+static void data_plane_set_up_op_blockers(VirtQueueCtx *vq_ctx)
 {
-    assert(!s->blocker);
-    error_setg(&s->blocker, "block device is in use by data plane");
-    blk_op_block_all(s->conf->conf.blk, s->blocker);
-    blk_op_unblock(s->conf->conf.blk, BLOCK_OP_TYPE_RESIZE, s->blocker);
-    blk_op_unblock(s->conf->conf.blk, BLOCK_OP_TYPE_DRIVE_DEL, s->blocker);
-    blk_op_unblock(s->conf->conf.blk, BLOCK_OP_TYPE_BACKUP_SOURCE, s->blocker);
-    blk_op_unblock(s->conf->conf.blk, BLOCK_OP_TYPE_CHANGE, s->blocker);
-    blk_op_unblock(s->conf->conf.blk, BLOCK_OP_TYPE_COMMIT_SOURCE, s->blocker);
-    blk_op_unblock(s->conf->conf.blk, BLOCK_OP_TYPE_COMMIT_TARGET, s->blocker);
-    blk_op_unblock(s->conf->conf.blk, BLOCK_OP_TYPE_EJECT, s->blocker);
-    blk_op_unblock(s->conf->conf.blk, BLOCK_OP_TYPE_EXTERNAL_SNAPSHOT,
-                   s->blocker);
-    blk_op_unblock(s->conf->conf.blk, BLOCK_OP_TYPE_INTERNAL_SNAPSHOT,
-                   s->blocker);
-    blk_op_unblock(s->conf->conf.blk, BLOCK_OP_TYPE_INTERNAL_SNAPSHOT_DELETE,
-                   s->blocker);
-    blk_op_unblock(s->conf->conf.blk, BLOCK_OP_TYPE_MIRROR_SOURCE, s->blocker);
-    blk_op_unblock(s->conf->conf.blk, BLOCK_OP_TYPE_STREAM, s->blocker);
-    blk_op_unblock(s->conf->conf.blk, BLOCK_OP_TYPE_REPLACE, s->blocker);
+    assert(!vq_ctx->blocker);
+    error_setg(&vq_ctx->blocker, "block device is in use by data plane");
+    blk_op_block_all(vq_ctx->blk, vq_ctx->blocker);
+    blk_op_unblock(vq_ctx->blk, BLOCK_OP_TYPE_RESIZE, vq_ctx->blocker);
+    blk_op_unblock(vq_ctx->blk, BLOCK_OP_TYPE_DRIVE_DEL, vq_ctx->blocker);
+    blk_op_unblock(vq_ctx->blk, BLOCK_OP_TYPE_BACKUP_SOURCE, vq_ctx->blocker);
+    blk_op_unblock(vq_ctx->blk, BLOCK_OP_TYPE_CHANGE, vq_ctx->blocker);
+    blk_op_unblock(vq_ctx->blk, BLOCK_OP_TYPE_COMMIT_SOURCE, vq_ctx->blocker);
+    blk_op_unblock(vq_ctx->blk, BLOCK_OP_TYPE_COMMIT_TARGET, vq_ctx->blocker);
+    blk_op_unblock(vq_ctx->blk, BLOCK_OP_TYPE_EJECT, vq_ctx->blocker);
+    blk_op_unblock(vq_ctx->blk, BLOCK_OP_TYPE_EXTERNAL_SNAPSHOT,
+                   vq_ctx->blocker);
+    blk_op_unblock(vq_ctx->blk, BLOCK_OP_TYPE_INTERNAL_SNAPSHOT,
+                   vq_ctx->blocker);
+    blk_op_unblock(vq_ctx->blk, BLOCK_OP_TYPE_INTERNAL_SNAPSHOT_DELETE,
+                   vq_ctx->blocker);
+    blk_op_unblock(vq_ctx->blk, BLOCK_OP_TYPE_MIRROR_SOURCE, vq_ctx->blocker);
+    blk_op_unblock(vq_ctx->blk, BLOCK_OP_TYPE_STREAM, vq_ctx->blocker);
+    blk_op_unblock(vq_ctx->blk, BLOCK_OP_TYPE_REPLACE, vq_ctx->blocker);
 }
 
-static void data_plane_remove_op_blockers(VirtIOBlockDataPlane *s)
+static void data_plane_remove_op_blockers(VirtQueueCtx *vq_ctx)
 {
-    if (s->blocker) {
-        blk_op_unblock_all(s->conf->conf.blk, s->blocker);
-        error_free(s->blocker);
-        s->blocker = NULL;
+    if (vq_ctx->blocker) {
+        blk_op_unblock_all(vq_ctx->blk, vq_ctx->blocker);
+        error_free(vq_ctx->blocker);
+        vq_ctx->blocker = NULL;
     }
 }
 
 static void data_plane_blk_insert_notifier(Notifier *n, void *data)
 {
-    VirtIOBlockDataPlane *s = container_of(n, VirtIOBlockDataPlane,
-                                           insert_notifier);
-    assert(s->conf->conf.blk == data);
-    data_plane_set_up_op_blockers(s);
+    VirtQueueCtx *vq_ctx = container_of(n, VirtQueueCtx, insert_notifier);
+
+    data_plane_set_up_op_blockers(vq_ctx);
 }
 
 static void data_plane_blk_remove_notifier(Notifier *n, void *data)
 {
-    VirtIOBlockDataPlane *s = container_of(n, VirtIOBlockDataPlane,
-                                           remove_notifier);
-    assert(s->conf->conf.blk == data);
-    data_plane_remove_op_blockers(s);
+    VirtQueueCtx *vq_ctx = container_of(n, VirtQueueCtx, remove_notifier);
+
+    data_plane_remove_op_blockers(vq_ctx);
 }
 
 /* Context: QEMU global mutex held */
@@ -111,6 +96,10 @@ void virtio_blk_data_plane_create(VirtIODevice *vdev, 
VirtIOBlkConf *conf,
                                   Error **errp)
 {
     VirtIOBlockDataPlane *s;
+    VirtQueueCtx *vq_ctx;
+    VirtIOBlock *vblk = (VirtIOBlock *)vdev;
+    /* Use first block, others should be the same */
+    BlockBackend *blk0 = vblk->mq[0].blk;
     BusState *qbus = BUS(qdev_get_parent_bus(DEVICE(vdev)));
     VirtioBusClass *k = VIRTIO_BUS_GET_CLASS(qbus);
 
@@ -131,7 +120,7 @@ void virtio_blk_data_plane_create(VirtIODevice *vdev, 
VirtIOBlkConf *conf,
     /* If dataplane is (re-)enabled while the guest is running there could be
      * block jobs that can conflict.
      */
-    if (blk_op_is_blocked(conf->conf.blk, BLOCK_OP_TYPE_DATAPLANE, errp)) {
+    if (blk_op_is_blocked(blk0, BLOCK_OP_TYPE_DATAPLANE, errp)) {
         error_prepend(errp, "cannot start dataplane thread: ");
         return;
     }
@@ -141,17 +130,19 @@ void virtio_blk_data_plane_create(VirtIODevice *vdev, 
VirtIOBlkConf *conf,
     s->conf = conf;
 
     if (conf->iothread) {
-        s->iothread = conf->iothread;
-        object_ref(OBJECT(s->iothread));
-    }
-    s->ctx = iothread_get_aio_context(s->iothread);
+        int i;
 
-    s->insert_notifier.notify = data_plane_blk_insert_notifier;
-    s->remove_notifier.notify = data_plane_blk_remove_notifier;
-    blk_add_insert_bs_notifier(conf->conf.blk, &s->insert_notifier);
-    blk_add_remove_bs_notifier(conf->conf.blk, &s->remove_notifier);
+        for (i = 0; i < conf->num_queues; i++)
+            object_ref(OBJECT(conf->iothreads_arr[i]));
+    }
 
-    data_plane_set_up_op_blockers(s);
+    for_each_valid_vq_ctx(vblk, vq_ctx) {
+        vq_ctx->insert_notifier.notify = data_plane_blk_insert_notifier;
+        vq_ctx->remove_notifier.notify = data_plane_blk_remove_notifier;
+        blk_add_insert_bs_notifier(vq_ctx->blk, &vq_ctx->insert_notifier);
+        blk_add_remove_bs_notifier(vq_ctx->blk, &vq_ctx->remove_notifier);
+        data_plane_set_up_op_blockers(vq_ctx);
+    }
 
     *dataplane = s;
 }
@@ -159,15 +150,22 @@ void virtio_blk_data_plane_create(VirtIODevice *vdev, 
VirtIOBlkConf *conf,
 /* Context: QEMU global mutex held */
 void virtio_blk_data_plane_destroy(VirtIOBlockDataPlane *s)
 {
+    VirtQueueCtx *vq_ctx;
+    VirtIOBlock *vblk = (VirtIOBlock *)s->vdev;
+    int i;
+
     if (!s) {
         return;
     }
 
     virtio_blk_data_plane_stop(s);
-    data_plane_remove_op_blockers(s);
-    notifier_remove(&s->insert_notifier);
-    notifier_remove(&s->remove_notifier);
-    object_unref(OBJECT(s->iothread));
+    for_each_valid_vq_ctx(vblk, vq_ctx) {
+        data_plane_remove_op_blockers(vq_ctx);
+        notifier_remove(&vq_ctx->insert_notifier);
+        notifier_remove(&vq_ctx->remove_notifier);
+    }
+    for (i = 0; i < s->conf->num_queues; i++)
+        object_unref(OBJECT(s->conf->iothreads_arr[i]));
     g_free(s);
 }
 
@@ -188,9 +186,9 @@ void virtio_blk_data_plane_start(VirtIOBlockDataPlane *s)
     BusState *qbus = BUS(qdev_get_parent_bus(DEVICE(s->vdev)));
     VirtioBusClass *k = VIRTIO_BUS_GET_CLASS(qbus);
     VirtIOBlock *vblk = VIRTIO_BLK(s->vdev);
-    unsigned i;
     unsigned nvqs = s->conf->num_queues;
-    int r;
+    VirtQueueCtx *vq_ctx;
+    int r, i;
 
     if (vblk->dataplane_started || s->starting) {
         return;
@@ -222,24 +220,24 @@ void virtio_blk_data_plane_start(VirtIOBlockDataPlane *s)
     vblk->dataplane_started = true;
     trace_virtio_blk_data_plane_start(s);
 
-    blk_set_aio_context(s->conf->conf.blk, s->ctx);
-
     /* Kick right away to begin processing requests already in vring */
-    for (i = 0; i < nvqs; i++) {
-        VirtQueue *vq = virtio_get_queue(s->vdev, i);
+    for_each_valid_vq_ctx(vblk, vq_ctx) {
+        AioContext *ctx = iothread_get_aio_context(vq_ctx->iothread);
 
-        event_notifier_set(virtio_queue_get_host_notifier(vq));
+        blk_set_aio_context(vq_ctx->blk, ctx);
+        event_notifier_set(virtio_queue_get_host_notifier(vq_ctx->vq));
     }
 
     /* Get this show started by hooking up our callbacks */
-    aio_context_acquire(s->ctx);
-    for (i = 0; i < nvqs; i++) {
-        VirtQueue *vq = virtio_get_queue(s->vdev, i);
+    for_each_valid_vq_ctx(vblk, vq_ctx) {
+        AioContext *ctx = iothread_get_aio_context(vq_ctx->iothread);
+        VirtQueue *vq = vq_ctx->vq;
 
-        virtio_queue_aio_set_host_notifier_handler(vq, s->ctx,
+        aio_context_acquire(ctx);
+        virtio_queue_aio_set_host_notifier_handler(vq, ctx,
                 virtio_blk_data_plane_handle_output);
+        aio_context_release(ctx);
     }
-    aio_context_release(s->ctx);
     return;
 
   fail_guest_notifiers:
@@ -254,8 +252,9 @@ void virtio_blk_data_plane_stop(VirtIOBlockDataPlane *s)
     BusState *qbus = BUS(qdev_get_parent_bus(DEVICE(s->vdev)));
     VirtioBusClass *k = VIRTIO_BUS_GET_CLASS(qbus);
     VirtIOBlock *vblk = VIRTIO_BLK(s->vdev);
-    unsigned i;
     unsigned nvqs = s->conf->num_queues;
+    unsigned i;
+    VirtQueueCtx *vq_ctx;
 
     if (!vblk->dataplane_started || s->stopping) {
         return;
@@ -270,19 +269,19 @@ void virtio_blk_data_plane_stop(VirtIOBlockDataPlane *s)
     s->stopping = true;
     trace_virtio_blk_data_plane_stop(s);
 
-    aio_context_acquire(s->ctx);
-
     /* Stop notifications for new requests from guest */
-    for (i = 0; i < nvqs; i++) {
-        VirtQueue *vq = virtio_get_queue(s->vdev, i);
+    for_each_valid_vq_ctx(vblk, vq_ctx) {
+        AioContext *ctx = iothread_get_aio_context(vq_ctx->iothread);
+        VirtQueue *vq = vq_ctx->vq;
 
-        virtio_queue_aio_set_host_notifier_handler(vq, s->ctx, NULL);
-    }
+        aio_context_acquire(ctx);
+        virtio_queue_aio_set_host_notifier_handler(vq, ctx, NULL);
 
-    /* Drain and switch bs back to the QEMU main loop */
-    blk_set_aio_context(s->conf->conf.blk, qemu_get_aio_context());
+        /* Drain and switch bs back to the QEMU main loop */
+        blk_set_aio_context(vq_ctx->blk, qemu_get_aio_context());
 
-    aio_context_release(s->ctx);
+        aio_context_release(ctx);
+    }
 
     for (i = 0; i < nvqs; i++) {
         k->set_host_notifier(qbus->parent, i, false);
diff --git a/hw/block/virtio-blk.c b/hw/block/virtio-blk.c
index d885810..15604f2 100644
--- a/hw/block/virtio-blk.c
+++ b/hw/block/virtio-blk.c
@@ -29,6 +29,8 @@
 #endif
 #include "hw/virtio/virtio-bus.h"
 #include "hw/virtio/virtio-access.h"
+#include "qom/object_interfaces.h"
+#include "block/block_int.h"
 
 void virtio_blk_init_request(VirtIOBlock *s, VirtQueue *vq,
                              VirtIOBlockReq *req)
@@ -53,64 +55,55 @@ void virtio_blk_free_request(VirtIOBlockReq *req)
  */
 static void virtio_blk_batch_notify_bh(void *opaque)
 {
-    VirtIOBlock *s = opaque;
+    VirtQueueCtx *vq_ctx = opaque;
+    VirtIOBlock *s = vq_ctx->s;
     VirtIODevice *vdev = VIRTIO_DEVICE(s);
-    unsigned nvqs = s->conf.num_queues;
-    unsigned i = 0;
-
-    while ((i = find_next_bit(s->batch_notify_vqs, nvqs, i)) < nvqs) {
-        VirtQueue *vq = virtio_get_queue(vdev, i);
-
-        bitmap_clear(s->batch_notify_vqs, i, 1);
 
-        if (s->dataplane_started && !s->dataplane_disabled) {
-            virtio_blk_data_plane_notify(s->dataplane, vq);
-        } else {
-            virtio_notify(vdev, vq);
-        }
+    if (s->dataplane_started && !s->dataplane_disabled) {
+        virtio_blk_data_plane_notify(s->dataplane, vq_ctx->vq);
+    } else {
+        virtio_notify(vdev, vq_ctx->vq);
     }
 }
 
 /* Force batch notifications to run */
-static void virtio_blk_batch_notify_flush(VirtIOBlock *s)
+static void virtio_blk_batch_notify_flush(VirtQueueCtx *vq_ctx)
 {
-    qemu_bh_cancel(s->batch_notify_bh);
-    virtio_blk_batch_notify_bh(s);
+    qemu_bh_cancel(vq_ctx->batch_notify_bh);
+    virtio_blk_batch_notify_bh(vq_ctx);
 }
 
 static void virtio_blk_req_complete(VirtIOBlockReq *req, unsigned char status)
 {
-    VirtIOBlock *s = req->dev;
+    VirtQueueCtx *vq_ctx = virtio_req_get_mq_ctx(req);
 
     trace_virtio_blk_req_complete(req, status);
 
     stb_p(&req->in->status, status);
     virtqueue_push(req->vq, &req->elem, req->in_len);
-
-    bitmap_set(s->batch_notify_vqs, virtio_queue_get_id(req->vq), 1);
-    qemu_bh_schedule(s->batch_notify_bh);
+    qemu_bh_schedule(vq_ctx->batch_notify_bh);
 }
 
 static int virtio_blk_handle_rw_error(VirtIOBlockReq *req, int error,
     bool is_read)
 {
-    BlockErrorAction action = blk_get_error_action(req->dev->blk,
+    VirtQueueCtx *vq_ctx = virtio_req_get_mq_ctx(req);
+    BlockErrorAction action = blk_get_error_action(vq_ctx->blk,
                                                    is_read, error);
-    VirtIOBlock *s = req->dev;
 
     if (action == BLOCK_ERROR_ACTION_STOP) {
         /* Break the link as the next request is going to be parsed from the
          * ring again. Otherwise we may end up doing a double completion! */
         req->mr_next = NULL;
-        req->next = s->rq;
-        s->rq = req;
+        req->next = vq_ctx->rq;
+        vq_ctx->rq = req;
     } else if (action == BLOCK_ERROR_ACTION_REPORT) {
         virtio_blk_req_complete(req, VIRTIO_BLK_S_IOERR);
-        block_acct_failed(blk_get_stats(s->blk), &req->acct);
+        block_acct_failed(blk_get_stats(vq_ctx->blk), &req->acct);
         virtio_blk_free_request(req);
     }
 
-    blk_error_action(s->blk, action, is_read, error);
+    blk_error_action(vq_ctx->blk, action, is_read, error);
     return action != BLOCK_ERROR_ACTION_IGNORE;
 }
 
@@ -120,6 +113,8 @@ static void virtio_blk_rw_complete(void *opaque, int ret)
 
     while (next) {
         VirtIOBlockReq *req = next;
+        VirtQueueCtx *vq_ctx = virtio_req_get_mq_ctx(req);
+
         next = req->mr_next;
         trace_virtio_blk_rw_complete(req, ret);
 
@@ -147,7 +142,7 @@ static void virtio_blk_rw_complete(void *opaque, int ret)
         }
 
         virtio_blk_req_complete(req, VIRTIO_BLK_S_OK);
-        block_acct_done(blk_get_stats(req->dev->blk), &req->acct);
+        block_acct_done(blk_get_stats(vq_ctx->blk), &req->acct);
         virtio_blk_free_request(req);
     }
 }
@@ -155,6 +150,7 @@ static void virtio_blk_rw_complete(void *opaque, int ret)
 static void virtio_blk_flush_complete(void *opaque, int ret)
 {
     VirtIOBlockReq *req = opaque;
+    VirtQueueCtx *vq_ctx = virtio_req_get_mq_ctx(req);
 
     if (ret) {
         if (virtio_blk_handle_rw_error(req, -ret, 0)) {
@@ -163,7 +159,7 @@ static void virtio_blk_flush_complete(void *opaque, int ret)
     }
 
     virtio_blk_req_complete(req, VIRTIO_BLK_S_OK);
-    block_acct_done(blk_get_stats(req->dev->blk), &req->acct);
+    block_acct_done(blk_get_stats(vq_ctx->blk), &req->acct);
     virtio_blk_free_request(req);
 }
 
@@ -234,6 +230,7 @@ static int virtio_blk_handle_scsi_req(VirtIOBlockReq *req)
     VirtIODevice *vdev = VIRTIO_DEVICE(req->dev);
     VirtQueueElement *elem = &req->elem;
     VirtIOBlock *blk = req->dev;
+    VirtQueueCtx *vq_ctx = virtio_req_get_mq_ctx(req);
 
 #ifdef __linux__
     int i;
@@ -316,7 +313,7 @@ static int virtio_blk_handle_scsi_req(VirtIOBlockReq *req)
     ioctl_req->hdr.sbp = elem->in_sg[elem->in_num - 3].iov_base;
     ioctl_req->hdr.mx_sb_len = elem->in_sg[elem->in_num - 3].iov_len;
 
-    acb = blk_aio_ioctl(blk->blk, SG_IO, &ioctl_req->hdr,
+    acb = blk_aio_ioctl(vq_ctx->blk, SG_IO, &ioctl_req->hdr,
                         virtio_blk_ioctl_complete, ioctl_req);
     if (!acb) {
         g_free(ioctl_req);
@@ -416,6 +413,7 @@ void virtio_blk_submit_multireq(BlockBackend *blk, 
MultiReqBuffer *mrb)
     int i = 0, start = 0, num_reqs = 0, niov = 0, nb_sectors = 0;
     int max_xfer_len = 0;
     int64_t sector_num = 0;
+    VirtQueueCtx *vq_ctx;
 
     if (mrb->num_reqs == 1) {
         submit_requests(blk, mrb, 0, 1, -1);
@@ -423,7 +421,8 @@ void virtio_blk_submit_multireq(BlockBackend *blk, 
MultiReqBuffer *mrb)
         return;
     }
 
-    max_xfer_len = blk_get_max_transfer_length(mrb->reqs[0]->dev->blk);
+    vq_ctx = virtio_req_get_mq_ctx(mrb->reqs[0]);
+    max_xfer_len = blk_get_max_transfer_length(vq_ctx->blk);
     max_xfer_len = MIN_NON_ZERO(max_xfer_len, BDRV_REQUEST_MAX_SECTORS);
 
     qsort(mrb->reqs, mrb->num_reqs, sizeof(*mrb->reqs),
@@ -464,16 +463,18 @@ void virtio_blk_submit_multireq(BlockBackend *blk, 
MultiReqBuffer *mrb)
 
 static void virtio_blk_handle_flush(VirtIOBlockReq *req, MultiReqBuffer *mrb)
 {
-    block_acct_start(blk_get_stats(req->dev->blk), &req->acct, 0,
+    VirtQueueCtx *vq_ctx = virtio_req_get_mq_ctx(req);
+
+    block_acct_start(blk_get_stats(vq_ctx->blk), &req->acct, 0,
                      BLOCK_ACCT_FLUSH);
 
     /*
      * Make sure all outstanding writes are posted to the backing device.
      */
     if (mrb->is_write && mrb->num_reqs > 0) {
-        virtio_blk_submit_multireq(req->dev->blk, mrb);
+        virtio_blk_submit_multireq(vq_ctx->blk, mrb);
     }
-    blk_aio_flush(req->dev->blk, virtio_blk_flush_complete, req);
+    blk_aio_flush(vq_ctx->blk, virtio_blk_flush_complete, req);
 }
 
 static bool virtio_blk_sect_range_ok(VirtIOBlock *dev,
@@ -481,6 +482,8 @@ static bool virtio_blk_sect_range_ok(VirtIOBlock *dev,
 {
     uint64_t nb_sectors = size >> BDRV_SECTOR_BITS;
     uint64_t total_sectors;
+    /* Use first block, others should be the same */
+    BlockBackend *blk0 = dev->mq[0].blk;
 
     if (nb_sectors > BDRV_REQUEST_MAX_SECTORS) {
         return false;
@@ -491,7 +494,7 @@ static bool virtio_blk_sect_range_ok(VirtIOBlock *dev,
     if (size % dev->conf.conf.logical_block_size) {
         return false;
     }
-    blk_get_geometry(dev->blk, &total_sectors);
+    blk_get_geometry(blk0, &total_sectors);
     if (sector > total_sectors || nb_sectors > total_sectors - sector) {
         return false;
     }
@@ -505,6 +508,7 @@ void virtio_blk_handle_request(VirtIOBlockReq *req, 
MultiReqBuffer *mrb)
     struct iovec *iov = req->elem.out_sg;
     unsigned in_num = req->elem.in_num;
     unsigned out_num = req->elem.out_num;
+    VirtQueueCtx *vq_ctx = virtio_req_get_mq_ctx(req);
 
     if (req->elem.out_num < 1 || req->elem.in_num < 1) {
         error_report("virtio-blk missing headers");
@@ -556,13 +560,13 @@ void virtio_blk_handle_request(VirtIOBlockReq *req, 
MultiReqBuffer *mrb)
         if (!virtio_blk_sect_range_ok(req->dev, req->sector_num,
                                       req->qiov.size)) {
             virtio_blk_req_complete(req, VIRTIO_BLK_S_IOERR);
-            block_acct_invalid(blk_get_stats(req->dev->blk),
+            block_acct_invalid(blk_get_stats(vq_ctx->blk),
                                is_write ? BLOCK_ACCT_WRITE : BLOCK_ACCT_READ);
             virtio_blk_free_request(req);
             return;
         }
 
-        block_acct_start(blk_get_stats(req->dev->blk),
+        block_acct_start(blk_get_stats(vq_ctx->blk),
                          &req->acct, req->qiov.size,
                          is_write ? BLOCK_ACCT_WRITE : BLOCK_ACCT_READ);
 
@@ -571,7 +575,7 @@ void virtio_blk_handle_request(VirtIOBlockReq *req, 
MultiReqBuffer *mrb)
         if (mrb->num_reqs > 0 && (mrb->num_reqs == VIRTIO_BLK_MAX_MERGE_REQS ||
                                   is_write != mrb->is_write ||
                                   !req->dev->conf.request_merging)) {
-            virtio_blk_submit_multireq(req->dev->blk, mrb);
+            virtio_blk_submit_multireq(vq_ctx->blk, mrb);
         }
 
         assert(mrb->num_reqs < VIRTIO_BLK_MAX_MERGE_REQS);
@@ -610,20 +614,21 @@ void virtio_blk_handle_request(VirtIOBlockReq *req, 
MultiReqBuffer *mrb)
 
 void virtio_blk_handle_vq(VirtIOBlock *s, VirtQueue *vq)
 {
+    VirtQueueCtx *vq_ctx = virtio_vq_get_mq_ctx(s, vq);
     VirtIOBlockReq *req;
     MultiReqBuffer mrb = {};
 
-    blk_io_plug(s->blk);
+    blk_io_plug(vq_ctx->blk);
 
     while ((req = virtio_blk_get_request(s, vq))) {
         virtio_blk_handle_request(req, &mrb);
     }
 
     if (mrb.num_reqs) {
-        virtio_blk_submit_multireq(s->blk, &mrb);
+        virtio_blk_submit_multireq(vq_ctx->blk, &mrb);
     }
 
-    blk_io_unplug(s->blk);
+    blk_io_unplug(vq_ctx->blk);
 }
 
 static void virtio_blk_handle_output(VirtIODevice *vdev, VirtQueue *vq)
@@ -644,14 +649,14 @@ static void virtio_blk_handle_output(VirtIODevice *vdev, 
VirtQueue *vq)
 
 static void virtio_blk_dma_restart_bh(void *opaque)
 {
-    VirtIOBlock *s = opaque;
-    VirtIOBlockReq *req = s->rq;
+    VirtQueueCtx *vq_ctx = opaque;
+    VirtIOBlockReq *req = vq_ctx->rq;
     MultiReqBuffer mrb = {};
 
-    qemu_bh_delete(s->bh);
-    s->bh = NULL;
+    qemu_bh_delete(vq_ctx->bh);
+    vq_ctx->bh = NULL;
 
-    s->rq = NULL;
+    vq_ctx->rq = NULL;
 
     while (req) {
         VirtIOBlockReq *next = req->next;
@@ -660,7 +665,7 @@ static void virtio_blk_dma_restart_bh(void *opaque)
     }
 
     if (mrb.num_reqs) {
-        virtio_blk_submit_multireq(s->blk, &mrb);
+        virtio_blk_submit_multireq(vq_ctx->blk, &mrb);
     }
 }
 
@@ -668,15 +673,19 @@ static void virtio_blk_dma_restart_cb(void *opaque, int 
running,
                                       RunState state)
 {
     VirtIOBlock *s = opaque;
+    VirtQueueCtx *vq_ctx;
 
     if (!running) {
         return;
     }
 
-    if (!s->bh) {
-        s->bh = aio_bh_new(blk_get_aio_context(s->conf.conf.blk),
-                           virtio_blk_dma_restart_bh, s);
-        qemu_bh_schedule(s->bh);
+    for_each_valid_vq_ctx(s, vq_ctx) {
+        if (vq_ctx->bh)
+            continue;
+
+        vq_ctx->bh = aio_bh_new(blk_get_aio_context(vq_ctx->blk),
+                                virtio_blk_dma_restart_bh, vq_ctx);
+        qemu_bh_schedule(vq_ctx->bh);
     }
 }
 
@@ -684,23 +693,30 @@ static void virtio_blk_reset(VirtIODevice *vdev)
 {
     VirtIOBlock *s = VIRTIO_BLK(vdev);
     AioContext *ctx;
+    VirtQueueCtx *vq_ctx;
 
-    /*
-     * This should cancel pending requests, but can't do nicely until there
-     * are per-device request lists.
-     */
-    ctx = blk_get_aio_context(s->blk);
-    aio_context_acquire(ctx);
-    blk_drain(s->blk);
+    /* Acquire all aio contexts and drain all underlying backends */
+    for_each_valid_vq_ctx(s, vq_ctx) {
+        ctx = blk_get_aio_context(vq_ctx->blk);
+        aio_context_acquire(ctx);
+        blk_drain(vq_ctx->blk);
+    }
 
+    /* Stop dataplane if any */
     if (s->dataplane) {
         virtio_blk_data_plane_stop(s->dataplane);
     }
 
-    virtio_blk_batch_notify_flush(s);
-    aio_context_release(ctx);
+    for_each_valid_vq_ctx(s, vq_ctx)
+        virtio_blk_batch_notify_flush(vq_ctx);
 
-    blk_set_enable_write_cache(s->blk, s->original_wce);
+    /* Release aio contexts */
+    for_each_valid_vq_ctx(s, vq_ctx) {
+        ctx = blk_get_aio_context(vq_ctx->blk);
+        aio_context_release(ctx);
+
+        blk_set_enable_write_cache(vq_ctx->blk, s->original_wce);
+    }
 }
 
 /* coalesce internal state, copy to pci i/o region 0
@@ -712,8 +728,10 @@ static void virtio_blk_update_config(VirtIODevice *vdev, 
uint8_t *config)
     struct virtio_blk_config blkcfg;
     uint64_t capacity;
     int blk_size = conf->logical_block_size;
+    /* Use first block, others should be the same */
+    BlockBackend *blk0 = s->mq[0].blk;
 
-    blk_get_geometry(s->blk, &capacity);
+    blk_get_geometry(blk0, &capacity);
     memset(&blkcfg, 0, sizeof(blkcfg));
     virtio_stq_p(vdev, &blkcfg.capacity, capacity);
     virtio_stl_p(vdev, &blkcfg.seg_max, 128 - 2);
@@ -733,7 +751,7 @@ static void virtio_blk_update_config(VirtIODevice *vdev, 
uint8_t *config)
      * divided by 512 - instead it is the amount of blk_size blocks
      * per track (cylinder).
      */
-    if (blk_getlength(s->blk) /  conf->heads / conf->secs % blk_size) {
+    if (blk_getlength(blk0) /  conf->heads / conf->secs % blk_size) {
         blkcfg.geometry.sectors = conf->secs & ~s->sector_mask;
     } else {
         blkcfg.geometry.sectors = conf->secs;
@@ -741,7 +759,7 @@ static void virtio_blk_update_config(VirtIODevice *vdev, 
uint8_t *config)
     blkcfg.size_max = 0;
     blkcfg.physical_block_exp = get_physical_block_exp(conf);
     blkcfg.alignment_offset = 0;
-    blkcfg.wce = blk_enable_write_cache(s->blk);
+    blkcfg.wce = blk_enable_write_cache(blk0);
     virtio_stw_p(vdev, &blkcfg.num_queues, s->conf.num_queues);
     memcpy(config, &blkcfg, sizeof(struct virtio_blk_config));
 }
@@ -750,18 +768,23 @@ static void virtio_blk_set_config(VirtIODevice *vdev, 
const uint8_t *config)
 {
     VirtIOBlock *s = VIRTIO_BLK(vdev);
     struct virtio_blk_config blkcfg;
+    VirtQueueCtx *vq_ctx;
 
     memcpy(&blkcfg, config, sizeof(blkcfg));
 
-    aio_context_acquire(blk_get_aio_context(s->blk));
-    blk_set_enable_write_cache(s->blk, blkcfg.wce != 0);
-    aio_context_release(blk_get_aio_context(s->blk));
+    for_each_valid_vq_ctx(s, vq_ctx) {
+        aio_context_acquire(blk_get_aio_context(vq_ctx->blk));
+        blk_set_enable_write_cache(vq_ctx->blk, blkcfg.wce != 0);
+        aio_context_release(blk_get_aio_context(vq_ctx->blk));
+    }
 }
 
 static uint64_t virtio_blk_get_features(VirtIODevice *vdev, uint64_t features,
                                         Error **errp)
 {
     VirtIOBlock *s = VIRTIO_BLK(vdev);
+    /* Use first block, others should be the same */
+    BlockBackend *blk0 = s->mq[0].blk;
 
     virtio_add_feature(&features, VIRTIO_BLK_F_SEG_MAX);
     virtio_add_feature(&features, VIRTIO_BLK_F_GEOMETRY);
@@ -780,10 +803,10 @@ static uint64_t virtio_blk_get_features(VirtIODevice 
*vdev, uint64_t features,
     if (s->conf.config_wce) {
         virtio_add_feature(&features, VIRTIO_BLK_F_CONFIG_WCE);
     }
-    if (blk_enable_write_cache(s->blk)) {
+    if (blk_enable_write_cache(blk0)) {
         virtio_add_feature(&features, VIRTIO_BLK_F_WCE);
     }
-    if (blk_is_read_only(s->blk)) {
+    if (blk_is_read_only(blk0)) {
         virtio_add_feature(&features, VIRTIO_BLK_F_RO);
     }
     if (s->conf.num_queues > 1) {
@@ -819,22 +842,33 @@ static void virtio_blk_set_status(VirtIODevice *vdev, 
uint8_t status)
      *     Guest writes 1 to the WCE configuration field (writeback mode)
      *     Guest sets DRIVER_OK bit in status field
      *
-     * s->blk would erroneously be placed in writethrough mode.
+     * vq_ctx->blk would erroneously be placed in writethrough mode.
      */
     if (!virtio_vdev_has_feature(vdev, VIRTIO_BLK_F_CONFIG_WCE)) {
-        aio_context_acquire(blk_get_aio_context(s->blk));
-        blk_set_enable_write_cache(s->blk,
-                                   virtio_vdev_has_feature(vdev,
+        VirtQueueCtx *vq_ctx;
+
+        for_each_valid_vq_ctx(s, vq_ctx) {
+            aio_context_acquire(blk_get_aio_context(vq_ctx->blk));
+            blk_set_enable_write_cache(vq_ctx->blk,
+                                       virtio_vdev_has_feature(vdev,
                                                            VIRTIO_BLK_F_WCE));
-        aio_context_release(blk_get_aio_context(s->blk));
+            aio_context_release(blk_get_aio_context(vq_ctx->blk));
+        }
     }
 }
 
 static bool virtio_blk_mq_rq_indices_needed(void *opaque)
 {
     VirtIOBlock *s = opaque;
+    VirtQueueCtx *vq_ctx;
+    bool has_rq = false;
+
+    for_each_valid_vq_ctx(s, vq_ctx) {
+        if ((has_rq = !!vq_ctx->rq))
+            break;
+    }
 
-    return s->conf.num_queues && s->rq;
+    return s->conf.num_queues && has_rq;
 }
 
 /* Array of virtqueue indices for requests in s->rq */
@@ -870,27 +904,35 @@ static void virtio_blk_save(QEMUFile *f, void *opaque)
 {
     VirtIODevice *vdev = VIRTIO_DEVICE(opaque);
     VirtIOBlock *s = VIRTIO_BLK(vdev);
+    VirtQueueCtx *vq_ctx;
 
     if (s->dataplane) {
         virtio_blk_data_plane_stop(s->dataplane);
     }
 
-    virtio_blk_batch_notify_flush(s);
+    for_each_valid_vq_ctx(s, vq_ctx)
+        virtio_blk_batch_notify_flush(vq_ctx);
 
     virtio_save(vdev, f);
 }
-    
+
 static void virtio_blk_save_device(VirtIODevice *vdev, QEMUFile *f)
 {
     VirtIOBlock *s = VIRTIO_BLK(vdev);
-    VirtIOBlockReq *req = s->rq;
+    VirtIOBlockReq *req;
+    VirtQueueCtx *vq_ctx;
 
     s->num_rq = 0;
-    while (req) {
-        qemu_put_sbyte(f, 1);
-        qemu_put_virtqueue_element(f, &req->elem);
-        req = req->next;
-        s->num_rq++;
+
+    for_each_valid_vq_ctx(s, vq_ctx) {
+        req = vq_ctx->rq;
+
+        while (req) {
+            qemu_put_sbyte(f, 1);
+            qemu_put_virtqueue_element(f, &req->elem);
+            req = req->next;
+            s->num_rq++;
+        }
     }
     qemu_put_sbyte(f, 0);
 
@@ -900,14 +942,17 @@ static void virtio_blk_save_device(VirtIODevice *vdev, 
QEMUFile *f)
      * write it when virtio-blk subsections are needed.
      */
     if (virtio_blk_mq_rq_indices_needed(s)) {
-        uint32_t i;
+        uint32_t i = 0;
 
         s->mq_rq_indices = g_new(uint32_t, s->num_rq);
-        req = s->rq;
-        for (i = 0; i < s->num_rq; i++) {
-            s->mq_rq_indices[i] = virtio_get_queue_index(req->vq);
-            req = req->next;
+        for_each_valid_vq_ctx(s, vq_ctx) {
+            req = vq_ctx->rq;
+            while (req) {
+                s->mq_rq_indices[i++] = virtio_get_queue_index(req->vq);
+                req = req->next;
+            }
         }
+        assert(i == s->num_rq);
 
         vmstate_save_state(f, &virtio_blk_vmstate, s, NULL);
         qemu_put_ubyte(f, ~QEMU_VM_SUBSECTION);
@@ -933,7 +978,7 @@ static int virtio_blk_load_device(VirtIODevice *vdev, 
QEMUFile *f,
 {
     VirtIOBlock *s = VIRTIO_BLK(vdev);
     VirtQueue *vq0 = virtio_get_queue(vdev, 0);
-    VirtIOBlockReq **tailp = (VirtIOBlockReq **)&s->rq;
+    VirtIOBlockReq **tailp = (VirtIOBlockReq **)&s->mq[0].rq;
     VirtIOBlockReq *req;
     uint32_t num_rq = 0;
     int ret;
@@ -949,6 +994,8 @@ static int virtio_blk_load_device(VirtIODevice *vdev, 
QEMUFile *f,
         num_rq++;
     }
 
+    tailp = NULL;
+
     s->num_rq = 0;
     s->mq_rq_indices = NULL;
     ret = vmstate_load_state(f, &virtio_blk_vmstate, s, 1);
@@ -970,7 +1017,14 @@ static int virtio_blk_load_device(VirtIODevice *vdev, 
QEMUFile *f,
             goto out;
         }
 
-        req = s->rq;
+        tailp = g_new(VirtIOBlockReq *, num_rq);
+        if (tailp == NULL) {
+            ret = -EINVAL;
+            goto out;
+        }
+
+        req = s->mq[0].rq;
+        s->mq[0].rq = NULL;
         for (i = 0; i < num_rq; i++) {
             uint32_t idx = s->mq_rq_indices[i];
 
@@ -980,6 +1034,15 @@ static int virtio_blk_load_device(VirtIODevice *vdev, 
QEMUFile *f,
             }
 
             req->vq = virtio_get_queue(vdev, idx);
+            if (tailp[idx] == NULL) {
+                assert(s->mq[idx].rq == NULL);
+                s->mq[idx].rq = tailp[idx] = req;
+            } else {
+                assert(s->mq[idx].rq);
+                tailp[idx]->next = req;
+                tailp[idx] = req;
+            }
+
             req = req->next;
         }
     } else if (ret == -ENOENT) {
@@ -991,6 +1054,7 @@ static int virtio_blk_load_device(VirtIODevice *vdev, 
QEMUFile *f,
     }
 
 out:
+    g_free(tailp);
     g_free(s->mq_rq_indices);
     s->mq_rq_indices = NULL;
     return ret;
@@ -1007,20 +1071,100 @@ static const BlockDevOps virtio_block_ops = {
     .resize_cb = virtio_blk_resize,
 };
 
+static bool virtio_blk_dup_iothreads_and_drives(VirtIOBlock *s,
+                                                Error **errp)
+{
+    BlockDriverState *bs = blk_bs(s->conf.conf.blk);
+    Error *local_err = NULL;
+    char *tname;
+    int i;
+
+    if (!s->conf.iothread) {
+        error_setg(errp, "at least one iothread should be specified\n");
+        return false;
+    }
+    s->conf.drives_arr[0] = s->conf.conf.blk;
+
+    if (s->conf.num_queues == 1)
+        return true;
+    if (bs->drv != &bdrv_raw) {
+        error_setg(errp, "multiqueues are supported only for raw block 
device\n");
+        return false;
+    }
+    if (bs->open_flags & BDRV_O_SNAPSHOT) {
+        error_setg(errp, "multiqueues do not work with snapshots\n");
+        return false;
+    }
+
+    tname = object_get_canonical_path_component(OBJECT(s->conf.iothread));
+    if (!tname) {
+        error_setg(errp, "can't get path component for s->conf.iothread");
+        return false;
+    }
+
+    for (i = 1; i < s->conf.num_queues; i++) {
+        IOThread *t;
+        char *id;
+
+        id = g_strdup_printf("%s_%d", tname, i);
+        if (!id) {
+            error_setg(errp, "can't allocate new thread id");
+            goto error;
+        }
+
+        t = (IOThread *)user_creatable_add_type(TYPE_IOTHREAD, id,
+                                                NULL, NULL, &local_err);
+        g_free(id);
+        if (local_err) {
+            error_propagate(errp, local_err);
+            goto error;
+        }
+        s->conf.iothreads_arr[i] = t;
+    }
+    for (i = 1; i < s->conf.num_queues; i++) {
+        BlockBackend *blk;
+
+        blk = blk_new_open(bs->filename, NULL, bs->full_open_options,
+                           bs->open_flags,
+                           &local_err);
+        if (!blk) {
+            error_propagate(errp, local_err);
+            goto error;
+        }
+        s->conf.drives_arr[i] = blk;
+    }
+    g_free(tname);
+
+    return true;
+
+error:
+    g_free(tname);
+
+    while (i-- > 1) {
+        object_unref(OBJECT(s->conf.iothreads_arr[i]));
+        blk_unref(s->conf.drives_arr[i]);
+    }
+
+    return false;
+}
+
 static void virtio_blk_device_realize(DeviceState *dev, Error **errp)
 {
     VirtIODevice *vdev = VIRTIO_DEVICE(dev);
     VirtIOBlock *s = VIRTIO_BLK(dev);
     VirtIOBlkConf *conf = &s->conf;
+    /* Use first block, others should be the same */
+    BlockBackend *blk0 = s->conf.conf.blk;
+    VirtQueueCtx *vq_ctx;
     Error *err = NULL;
     static int virtio_blk_id;
     unsigned i;
 
-    if (!conf->conf.blk) {
+    if (!blk0) {
         error_setg(errp, "drive property not set");
         return;
     }
-    if (!blk_is_inserted(conf->conf.blk)) {
+    if (!blk_is_inserted(blk0)) {
         error_setg(errp, "Device needs media, but drive is empty");
         return;
     }
@@ -1028,9 +1172,12 @@ static void virtio_blk_device_realize(DeviceState *dev, 
Error **errp)
         error_setg(errp, "num-queues property must be larger than 0");
         return;
     }
+    if (!virtio_blk_dup_iothreads_and_drives(s, errp)) {
+        return;
+    }
 
     blkconf_serial(&conf->conf, &conf->serial);
-    s->original_wce = blk_enable_write_cache(conf->conf.blk);
+    s->original_wce = blk_enable_write_cache(blk0);
     blkconf_geometry(&conf->conf, NULL, 65535, 255, 255, &err);
     if (err) {
         error_propagate(errp, err);
@@ -1041,12 +1188,16 @@ static void virtio_blk_device_realize(DeviceState *dev, 
Error **errp)
     virtio_init(vdev, "virtio-blk", VIRTIO_ID_BLOCK,
                 sizeof(struct virtio_blk_config));
 
-    s->blk = conf->conf.blk;
-    s->rq = NULL;
     s->sector_mask = (s->conf.conf.logical_block_size / BDRV_SECTOR_SIZE) - 1;
 
+    /* Init MQ contexts */
     for (i = 0; i < conf->num_queues; i++) {
-        virtio_add_queue(vdev, 128, virtio_blk_handle_output);
+        vq_ctx = &s->mq[i];
+
+        vq_ctx->blk = s->conf.drives_arr[i];
+        vq_ctx->s   = s;
+        vq_ctx->vq  = virtio_add_queue(vdev, 128, virtio_blk_handle_output);
+        vq_ctx->iothread = s->conf.iothreads_arr[i];
     }
     virtio_blk_data_plane_create(vdev, conf, &s->dataplane, &err);
     if (err != NULL) {
@@ -1055,31 +1206,36 @@ static void virtio_blk_device_realize(DeviceState *dev, 
Error **errp)
         return;
     }
 
-    s->batch_notify_bh = aio_bh_new(blk_get_aio_context(s->blk),
-                                    virtio_blk_batch_notify_bh, s);
-    s->batch_notify_vqs = bitmap_new(conf->num_queues);
+    for_each_valid_vq_ctx(s, vq_ctx)
+        vq_ctx->batch_notify_bh = aio_bh_new(blk_get_aio_context(vq_ctx->blk),
+                                             virtio_blk_batch_notify_bh,
+                                             vq_ctx);
 
     s->change = qemu_add_vm_change_state_handler(virtio_blk_dma_restart_cb, s);
     register_savevm(dev, "virtio-blk", virtio_blk_id++, 2,
                     virtio_blk_save, virtio_blk_load, s);
-    blk_set_dev_ops(s->blk, &virtio_block_ops, s);
-    blk_set_guest_block_size(s->blk, s->conf.conf.logical_block_size);
+    for_each_valid_vq_ctx(s, vq_ctx) {
+        blk_set_dev_ops(vq_ctx->blk, &virtio_block_ops, s);
+        blk_set_guest_block_size(vq_ctx->blk, s->conf.conf.logical_block_size);
 
-    blk_iostatus_enable(s->blk);
+        blk_iostatus_enable(vq_ctx->blk);
+    }
 }
 
 static void virtio_blk_device_unrealize(DeviceState *dev, Error **errp)
 {
     VirtIODevice *vdev = VIRTIO_DEVICE(dev);
     VirtIOBlock *s = VIRTIO_BLK(dev);
+    VirtQueueCtx *vq_ctx;
 
-    qemu_bh_delete(s->batch_notify_bh);
-    g_free(s->batch_notify_vqs);
+    for_each_valid_vq_ctx(s, vq_ctx)
+        qemu_bh_delete(vq_ctx->batch_notify_bh);
     virtio_blk_data_plane_destroy(s->dataplane);
     s->dataplane = NULL;
     qemu_del_vm_change_state_handler(s->change);
     unregister_savevm(dev, "virtio-blk", s);
-    blockdev_mark_auto_del(s->blk);
+    for_each_valid_vq_ctx(s, vq_ctx)
+        blockdev_mark_auto_del(vq_ctx->blk);
     virtio_cleanup(vdev);
 }
 
diff --git a/include/hw/virtio/virtio-blk.h b/include/hw/virtio/virtio-blk.h
index 06148ea..bce0a36 100644
--- a/include/hw/virtio/virtio-blk.h
+++ b/include/hw/virtio/virtio-blk.h
@@ -33,7 +33,11 @@ struct virtio_blk_inhdr
 struct VirtIOBlkConf
 {
     BlockConf conf;
-    IOThread *iothread;
+    union {
+        IOThread *iothread; /* kept for compatibility */
+        IOThread *iothreads_arr[VIRTIO_QUEUE_MAX];
+    };
+    BlockBackend *drives_arr[VIRTIO_QUEUE_MAX];
     char *serial;
     uint32_t scsi;
     uint32_t config_wce;
@@ -41,21 +45,31 @@ struct VirtIOBlkConf
     uint16_t num_queues;
 };
 
+typedef struct VirtQueueCtx {
+    BlockBackend *blk;
+    struct VirtIOBlock *s;
+    VirtQueue *vq;
+    void *rq;
+    QEMUBH *bh;
+    QEMUBH *batch_notify_bh;
+    IOThread *iothread;
+    Notifier insert_notifier;
+    Notifier remove_notifier;
+    /* Operation blocker on BDS */
+    Error *blocker;
+} VirtQueueCtx;
+
 struct VirtIOBlockDataPlane;
 
 struct VirtIOBlockReq;
 typedef struct VirtIOBlock {
     VirtIODevice parent_obj;
-    BlockBackend *blk;
-    void *rq;
+    VirtQueueCtx mq[VIRTIO_QUEUE_MAX];
 
     /* The following two fields are used only during save/load */
     uint32_t num_rq;
     uint32_t *mq_rq_indices;
 
-    QEMUBH *bh;
-    QEMUBH *batch_notify_bh;
-    unsigned long *batch_notify_vqs;
     VirtIOBlkConf conf;
     unsigned short sector_mask;
     bool original_wce;
@@ -65,6 +79,11 @@ typedef struct VirtIOBlock {
     struct VirtIOBlockDataPlane *dataplane;
 } VirtIOBlock;
 
+#define for_each_valid_vq_ctx(s, vq_ctx)                    \
+    for (vq_ctx = &s->mq[0];                                \
+         vq_ctx < s->mq + ARRAY_SIZE(s->mq) && vq_ctx->blk; \
+         vq_ctx++)                                          \
+
 typedef struct VirtIOBlockReq {
     VirtQueueElement elem;
     int64_t sector_num;
@@ -79,6 +98,20 @@ typedef struct VirtIOBlockReq {
     BlockAcctCookie acct;
 } VirtIOBlockReq;
 
+static inline VirtQueueCtx *virtio_vq_get_mq_ctx(VirtIOBlock *s, VirtQueue *vq)
+{
+    uint16_t qi = virtio_get_queue_index(vq);
+
+    return &s->mq[qi];
+}
+
+static inline VirtQueueCtx *virtio_req_get_mq_ctx(VirtIOBlockReq *req)
+{
+    uint16_t qi = virtio_get_queue_index(req->vq);
+
+    return &req->dev->mq[qi];
+}
+
 #define VIRTIO_BLK_MAX_MERGE_REQS 32
 
 typedef struct MultiReqBuffer {
-- 
2.8.2




reply via email to

[Prev in Thread] Current Thread [Next in Thread]