qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [RFC] block-queue: Delay and batch metadata writes


From: Anthony Liguori
Subject: Re: [Qemu-devel] [RFC] block-queue: Delay and batch metadata writes
Date: Mon, 20 Sep 2010 09:31:04 -0500
User-agent: Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.1.12) Gecko/20100826 Lightning/1.0b1 Thunderbird/3.0.7

On 09/20/2010 08:56 AM, Kevin Wolf wrote:
I won't get this ready until I leave for vacation on Wednesday, so I thought I
could just as well post it as an RFC in this state.

With this patch applied, qcow2 doesn't directly access the image file any more
for metadata, but rather goes through the newly introduced blkqueue. Write
and sync requests are queued there and executed in a separate worker thread.
Reads consider the contents of the queue before accessing the the image file.

What makes this interesting is that we can delay syncs and if multiple syncs
occur, we can merge them into one bdrv_flush.

A typical sequence in qcow2 (simple cluster allocation) looks like this:

1. Update refcount table
2. bdrv_flush
3. Update L2 entry

Let's expand it a bit more:

1. Update refcount table
2. bdrv_flush
3. Update L2 entry
4. Write data to disk
5. Report write complete

I'm struggling to understand how a thread helps out.

If you run 1-3 in a thread, you need to inject a barrier between steps 3 and 5 or you'll report the write complete before writing the metadata out. You can't delay completing step 3 until a guest requests a flush. If you do, then you're implementing a writeback cache for metadata.

If you're comfortable with a writeback cache for metadata, then you should also be comfortable with a writeback cache for data in which case, cache=writeback is the answer.

If it's a matter of batching, batching can't occur if you have a barrier between steps 3 and 5. The only way you can get batching is by doing a writeback cache for the metadata such that you can complete your request before the metadata is written.

Am I misunderstanding the idea?

Regards,

Anthony Liguori

If we delay the operation and get three of these sequences queued before
actually executing, we end up with the following result, saving two syncs:

1. Update refcount table (req 1)
2. Update refcount table (req 2)
3. Update refcount table (req 3)
4. bdrv_flush
5. Update L2 entry (req 1)
6. Update L2 entry (req 2)
7. Update L2 entry (req 3)

This patch only commits a sync if either the guests has requested a flush or if
a certain number of requests in the queue, so usually we batch more than just
three requests.

I didn't run any detailed benchmarks but just tried what happens with
installation time of a Fedora 13 guest, and while git master takes about 40-50%
longer than before the metadata syncs, we get most of it back with blkqueue.

Of course, in this state the code is not correct, but it's correct enough to
try and have qcow2 run on a file backend. Some remaining problems are:

- There's no locking between the worker thread and other functions accessing
   the same backend driver. Should be fine for file, but probably not for other
   backends.

- Error handling doesn't really exist. If something goes wrong with writing
   metadata we can't fail the guest request any more because it's long
   completed. Losing this data is actually okay, the guest hasn't flushed yet.

   However, we need to be able to fail a flush, and we also need some way to
   handle errors transparently. This probably means that we have to stop the VM
   and let the user fix things so that we can retry. The only other way would be
   to shut down the VM and end up in the same situation as with a host crash.

   Or maybe it would even be enough to start failing all new requests.

- The Makefile integration is obviously very wrong, too. It worked for me good
   enough, but you need to be aware when block-queue.o is compiled with
   RUN_TESTS and when it isn't. The tests need to be split out properly.

They are certainly fixable and shouldn't have any major impact on performance,
so that's just a matter of doing it.

Kevin

---
  Makefile               |    3 +
  Makefile.objs          |    1 +
  block-queue.c          |  720 ++++++++++++++++++++++++++++++++++++++++++++++++
  block-queue.h          |   49 ++++
  block/qcow2-cluster.c  |   28 +-
  block/qcow2-refcount.c |   44 ++--
  block/qcow2.c          |   14 +
  block/qcow2.h          |    4 +
  qemu-thread.c          |   13 +
  qemu-thread.h          |    1 +
  10 files changed, 838 insertions(+), 39 deletions(-)
  create mode 100644 block-queue.c
  create mode 100644 block-queue.h

diff --git a/Makefile b/Makefile
index ab91d42..0202dc6 100644
--- a/Makefile
+++ b/Makefile
@@ -125,6 +125,9 @@ qemu-nbd$(EXESUF): qemu-nbd.o qemu-tool.o qemu-error.o 
$(trace-obj-y) $(block-ob

  qemu-io$(EXESUF): qemu-io.o cmd.o qemu-tool.o qemu-error.o $(trace-obj-y) 
$(block-obj-y) $(qobject-obj-y)

+block-queue$(EXESUF): QEMU_CFLAGS += -DRUN_TESTS
+block-queue$(EXESUF): qemu-tool.o qemu-error.o qemu-thread.o $(block-obj-y) 
$(qobject-obj-y)
+
  qemu-img-cmds.h: $(SRC_PATH)/qemu-img-cmds.hx
        $(call quiet-command,sh $(SRC_PATH)/hxtool -h<  $<  >  $@,"  GEN   $@")

diff --git a/Makefile.objs b/Makefile.objs
index 3ef6d80..e97a246 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -9,6 +9,7 @@ qobject-obj-y += qerror.o

  block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
  block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
+block-obj-y += qemu-thread.o block-queue.o
  block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
  block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o

diff --git a/block-queue.c b/block-queue.c
new file mode 100644
index 0000000..13579a7
--- /dev/null
+++ b/block-queue.c
@@ -0,0 +1,720 @@
+/*
+ * QEMU System Emulator
+ *
+ * Copyright (c) 2010 Kevin Wolf<address@hidden>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to 
deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#include<signal.h>
+
+#include "qemu-common.h"
+#include "qemu-queue.h"
+#include "qemu-thread.h"
+#include "qemu-barrier.h"
+#include "block.h"
+#include "block-queue.h"
+
+enum blkqueue_req_type {
+    REQ_TYPE_WRITE,
+    REQ_TYPE_BARRIER,
+};
+
+typedef struct BlockQueueRequest {
+    enum blkqueue_req_type type;
+
+    uint64_t    offset;
+    void*       buf;
+    uint64_t    size;
+    unsigned    section;
+
+    QTAILQ_ENTRY(BlockQueueRequest) link;
+    QSIMPLEQ_ENTRY(BlockQueueRequest) link_section;
+} BlockQueueRequest;
+
+struct BlockQueue {
+    BlockDriverState*   bs;
+
+    QemuThread          thread;
+    bool                thread_done;
+    QemuMutex           lock;
+    QemuMutex           flush_lock;
+    QemuCond            cond;
+
+    int                 barriers_requested;
+    int                 barriers_submitted;
+    int                 queue_size;
+
+    QTAILQ_HEAD(bq_queue_head, BlockQueueRequest) queue;
+    QSIMPLEQ_HEAD(, BlockQueueRequest) sections;
+};
+
+static void *blkqueue_thread(void *bq);
+
+BlockQueue *blkqueue_create(BlockDriverState *bs)
+{
+    BlockQueue *bq = qemu_mallocz(sizeof(BlockQueue));
+    bq->bs = bs;
+
+    QTAILQ_INIT(&bq->queue);
+    QSIMPLEQ_INIT(&bq->sections);
+
+    qemu_mutex_init(&bq->lock);
+    qemu_mutex_init(&bq->flush_lock);
+    qemu_cond_init(&bq->cond);
+
+    bq->thread_done = false;
+    qemu_thread_create(&bq->thread, blkqueue_thread, bq);
+
+    return bq;
+}
+
+void blkqueue_init_context(BlockQueueContext* context, BlockQueue *bq)
+{
+    context->bq = bq;
+    context->section = 0;
+}
+
+void blkqueue_destroy(BlockQueue *bq)
+{
+    bq->thread_done = true;
+    qemu_cond_signal(&bq->cond);
+    qemu_thread_join(&bq->thread);
+
+    blkqueue_flush(bq);
+
+    fprintf(stderr, "blkqueue_destroy: %d/%d barriers left\n",
+        bq->barriers_submitted, bq->barriers_requested);
+
+    qemu_mutex_destroy(&bq->lock);
+    qemu_mutex_destroy(&bq->flush_lock);
+    qemu_cond_destroy(&bq->cond);
+
+    assert(QTAILQ_FIRST(&bq->queue) == NULL);
+    assert(QSIMPLEQ_FIRST(&bq->sections) == NULL);
+    qemu_free(bq);
+}
+
+int blkqueue_pread(BlockQueueContext *context, uint64_t offset, void *buf,
+    uint64_t size)
+{
+    BlockQueue *bq = context->bq;
+    BlockQueueRequest *req;
+    int ret;
+
+    /*
+     * First check if there are any pending writes for the same data. Reverse
+     * order to return data written by the latest write.
+     */
+    QTAILQ_FOREACH_REVERSE(req,&bq->queue, bq_queue_head, link) {
+        uint64_t end = offset + size;
+        uint64_t req_end = req->offset + req->size;
+        uint8_t *read_buf = buf;
+        uint8_t *req_buf = req->buf;
+
+        /* We're only interested in queued writes */
+        if (req->type != REQ_TYPE_WRITE) {
+            continue;
+        }
+
+        /*
+         * If we read from a write in the queue (i.e. our read overlaps the
+         * write request), our next write probably depends on this write, so
+         * let's move forward to its section.
+         */
+        if (end>  req->offset&&  offset<  req_end) {
+            context->section = MAX(context->section, req->section);
+        }
+
+        /* How we continue, depends on the kind of overlap we have */
+        if ((offset>= req->offset)&&  (end<= req_end)) {
+            /* Completely contained in the write request */
+            memcpy(buf,&req_buf[offset - req->offset], size);
+            return 0;
+        } else if ((end>= req->offset)&&  (end<= req_end)) {
+            /* Overlap in the end of the read request */
+            assert(offset<  req->offset);
+            memcpy(&read_buf[req->offset - offset], req_buf, end - 
req->offset);
+            size = req->offset - offset;
+        } else if ((offset>= req->offset)&&  (offset<  req_end)) {
+            /* Overlap in the start of the read request */
+            assert(end>  req_end);
+            memcpy(read_buf,&req_buf[offset - req->offset], req_end - offset);
+            buf = read_buf =&read_buf[req_end - offset];
+            offset = req_end;
+            size = end - req_end;
+        } else if ((req->offset>= offset)&&  (req_end<= end)) {
+            /*
+             * The write request is completely contained in the read request.
+             * memcpy the data from the write request here, continue with the
+             * data before the write request and handle the data after the
+             * write request with a recursive call.
+             */
+            memcpy(&read_buf[req->offset - offset], req_buf, req_end - 
req->offset);
+            size = req->offset - offset;
+            blkqueue_pread(context, req_end,&read_buf[req_end - offset], end - 
req_end);
+        }
+    }
+
+    /* The requested is not written in the queue, read it from disk */
+    ret = bdrv_pread(bq->bs, offset, buf, size);
+    if (ret<  0) {
+        return ret;
+    }
+
+    return 0;
+}
+
+int blkqueue_pwrite(BlockQueueContext *context, uint64_t offset, void *buf,
+    uint64_t size)
+{
+    BlockQueue *bq = context->bq;
+    BlockQueueRequest *section_req;
+
+    /* Create request structure */
+    BlockQueueRequest *req = qemu_malloc(sizeof(*req));
+    req->type       = REQ_TYPE_WRITE;
+    req->offset     = offset;
+    req->size       = size;
+    req->buf        = qemu_malloc(size);
+    req->section    = context->section;
+    memcpy(req->buf, buf, size);
+
+    /*
+     * Find the right place to insert it into the queue:
+     * Right before the barrier that closes the current section.
+     */
+    qemu_mutex_lock(&bq->lock);
+    QSIMPLEQ_FOREACH(section_req,&bq->sections, link_section) {
+        if (section_req->section>= req->section) {
+            req->section = section_req->section;
+            context->section = section_req->section;
+            QTAILQ_INSERT_BEFORE(section_req, req, link);
+            bq->queue_size++;
+            goto out;
+        }
+    }
+
+    /* If there was no barrier, just put it at the end. */
+    QTAILQ_INSERT_TAIL(&bq->queue, req, link);
+    bq->queue_size++;
+    qemu_cond_signal(&bq->cond);
+
+out:
+    qemu_mutex_unlock(&bq->lock);
+    return 0;
+}
+
+int blkqueue_barrier(BlockQueueContext *context)
+{
+    BlockQueue *bq = context->bq;
+    BlockQueueRequest *section_req;
+
+    bq->barriers_requested++;
+
+    /* Create request structure */
+    BlockQueueRequest *req = qemu_malloc(sizeof(*req));
+    req->type       = REQ_TYPE_BARRIER;
+    req->section    = context->section;
+    req->buf        = NULL;
+
+    /* Find another barrier to merge with. */
+    qemu_mutex_lock(&bq->lock);
+    QSIMPLEQ_FOREACH(section_req,&bq->sections, link_section) {
+        if (section_req->section>= req->section) {
+            req->section = section_req->section;
+            context->section = section_req->section + 1;
+            qemu_free(req);
+            goto out;
+        }
+    }
+
+    /*
+     * If there wasn't a barrier for the same section yet, insert a new one at
+     * the end.
+     */
+    QTAILQ_INSERT_TAIL(&bq->queue, req, link);
+    QSIMPLEQ_INSERT_TAIL(&bq->sections, req, link_section);
+    bq->queue_size++;
+    context->section++;
+    qemu_cond_signal(&bq->cond);
+
+    bq->barriers_submitted++;
+
+out:
+    qemu_mutex_unlock(&bq->lock);
+    return 0;
+}
+
+/*
+ * Caller needs to hold the bq->lock mutex
+ */
+static BlockQueueRequest *blkqueue_pop(BlockQueue *bq)
+{
+    BlockQueueRequest *req;
+
+    req = QTAILQ_FIRST(&bq->queue);
+    if (req == NULL) {
+        goto out;
+    }
+
+    QTAILQ_REMOVE(&bq->queue, req, link);
+    bq->queue_size--;
+
+    if (req->type == REQ_TYPE_BARRIER) {
+        assert(QSIMPLEQ_FIRST(&bq->sections) == req);
+        QSIMPLEQ_REMOVE_HEAD(&bq->sections, link_section);
+    }
+
+out:
+    return req;
+}
+
+static void blkqueue_free_request(BlockQueueRequest *req)
+{
+    qemu_free(req->buf);
+    qemu_free(req);
+}
+
+static void blkqueue_process_request(BlockQueue *bq)
+{
+    BlockQueueRequest *req;
+    BlockQueueRequest *req2;
+    int ret;
+
+    /*
+     * Note that we leave the request in the queue while we process it. No
+     * other request will be queued before this one and we have only one thread
+     * that processes the queue, so afterwards it will still be the first
+     * request. (Not true for barriers in the first position, but we can handle
+     * that)
+     */
+    req = QTAILQ_FIRST(&bq->queue);
+    if (req == NULL) {
+        return;
+    }
+
+    switch (req->type) {
+        case REQ_TYPE_WRITE:
+            ret = bdrv_pwrite(bq->bs, req->offset, req->buf, req->size);
+            if (ret<  0) {
+                /* TODO Error reporting! */
+                return;
+            }
+            break;
+        case REQ_TYPE_BARRIER:
+            bdrv_flush(bq->bs);
+            break;
+    }
+
+    /*
+     * Only remove the request from the queue when it's written, so that reads
+     * always access the right data.
+     */
+    qemu_mutex_lock(&bq->lock);
+    req2 = QTAILQ_FIRST(&bq->queue);
+    if (req == req2) {
+        blkqueue_pop(bq);
+        blkqueue_free_request(req);
+    } else {
+        /*
+         * If it's a barrier and something has been queued before it, just
+         * leave it in the queue and flush once again later.
+         */
+        assert(req->type == REQ_TYPE_BARRIER);
+        bq->barriers_submitted++;
+    }
+    qemu_mutex_unlock(&bq->lock);
+}
+
+struct blkqueue_flush_aiocb {
+    BlockQueue *bq;
+    BlockDriverCompletionFunc *cb;
+    void *opaque;
+};
+
+static void *blkqueue_aio_flush_thread(void *opaque)
+{
+    struct blkqueue_flush_aiocb *acb = opaque;
+
+    /* Process any left over requests */
+    blkqueue_flush(acb->bq);
+
+    acb->cb(acb->opaque, 0);
+    qemu_free(acb);
+
+    return NULL;
+}
+
+void blkqueue_aio_flush(BlockQueue *bq, BlockDriverCompletionFunc *cb,
+    void *opaque)
+{
+    struct blkqueue_flush_aiocb *acb;
+
+    acb = qemu_malloc(sizeof(*acb));
+    acb->bq = bq;
+    acb->cb = cb;
+    acb->opaque = opaque;
+
+    qemu_thread_create(NULL, blkqueue_aio_flush_thread, acb);
+}
+
+void blkqueue_flush(BlockQueue *bq)
+{
+    qemu_mutex_lock(&bq->flush_lock);
+
+    /* Process any left over requests */
+    while (QTAILQ_FIRST(&bq->queue)) {
+        blkqueue_process_request(bq);
+    }
+
+    qemu_mutex_unlock(&bq->flush_lock);
+}
+
+static void *blkqueue_thread(void *_bq)
+{
+    BlockQueue *bq = _bq;
+#ifndef RUN_TESTS
+    BlockQueueRequest *req;
+#endif
+
+    qemu_mutex_lock(&bq->flush_lock);
+    while (!bq->thread_done) {
+        barrier();
+#ifndef RUN_TESTS
+        req = QTAILQ_FIRST(&bq->queue);
+
+        /* Don't process barriers, we only do that on flushes */
+        if (req&&  (req->type != REQ_TYPE_BARRIER || bq->queue_size>  42)) {
+            blkqueue_process_request(bq);
+        } else {
+            qemu_cond_wait(&bq->cond,&bq->flush_lock);
+        }
+#else
+        qemu_cond_wait(&bq->cond,&bq->flush_lock);
+#endif
+    }
+    qemu_mutex_unlock(&bq->flush_lock);
+
+    return NULL;
+}
+
+#ifdef RUN_TESTS
+
+#define CHECK_WRITE(req, _offset, _size, _buf, _section) \
+    do { \
+        assert(req != NULL); \
+        assert(req->type == REQ_TYPE_WRITE); \
+        assert(req->offset == _offset); \
+        assert(req->size == _size); \
+        assert(req->section == _section); \
+        assert(!memcmp(req->buf, _buf, _size)); \
+    } while(0)
+
+#define CHECK_BARRIER(req, _section) \
+    do { \
+        assert(req != NULL); \
+        assert(req->type == REQ_TYPE_BARRIER); \
+        assert(req->section == _section); \
+    } while(0)
+
+#define CHECK_READ(_context, _offset, _buf, _size, _cmpbuf) \
+    do { \
+        int ret; \
+        memset(buf, 0, 512); \
+        ret = blkqueue_pread(_context, _offset, _buf, _size); \
+        assert(ret == 0); \
+        assert(!memcmp(_cmpbuf, _buf, _size)); \
+    } while(0)
+
+#define QUEUE_WRITE(_context, _offset, _buf, _size, _pattern) \
+    do { \
+        int ret; \
+        memset(_buf, _pattern, _size); \
+        ret = blkqueue_pwrite(_context, _offset, _buf, _size); \
+        assert(ret == 0); \
+    } while(0)
+#define QUEUE_BARRIER(_context) \
+    do { \
+        int ret; \
+        ret = blkqueue_barrier(_context); \
+        assert(ret == 0); \
+    } while(0)
+
+#define POP_CHECK_WRITE(_bq, _offset, _buf, _size, _pattern, _section) \
+    do { \
+        BlockQueueRequest *req; \
+        memset(_buf, _pattern, _size); \
+        req = blkqueue_pop(_bq); \
+        CHECK_WRITE(req, _offset, _size, _buf, _section); \
+        blkqueue_free_request(req); \
+    } while(0)
+#define POP_CHECK_BARRIER(_bq, _section) \
+    do { \
+        BlockQueueRequest *req; \
+        req = blkqueue_pop(_bq); \
+        CHECK_BARRIER(req, _section); \
+        blkqueue_free_request(req); \
+    } while(0)
+
+static void  __attribute__((used)) dump_queue(BlockQueue *bq)
+{
+    BlockQueueRequest *req;
+
+    fprintf(stderr, "--- Queue dump ---\n");
+    QTAILQ_FOREACH(req,&bq->queue, link) {
+        fprintf(stderr, "[%d] ", req->section);
+        if (req->type == REQ_TYPE_WRITE) {
+            fprintf(stderr, "Write off=%5"PRId64", len=%5"PRId64", buf=%p\n",
+                req->offset, req->size, req->buf);
+        } else if (req->type == REQ_TYPE_BARRIER) {
+            fprintf(stderr, "Barrier\n");
+        } else {
+            fprintf(stderr, "Unknown type %d\n", req->type);
+        }
+    }
+}
+
+static void test_basic(BlockDriverState *bs)
+{
+    uint8_t buf[512];
+    BlockQueue *bq;
+    BlockQueueContext context;
+
+    bq = blkqueue_create(bs);
+    blkqueue_init_context(&context, bq);
+
+    /* Queue requests */
+    QUEUE_WRITE(&context,   0, buf, 512, 0x12);
+    QUEUE_WRITE(&context, 512, buf,  42, 0x34);
+    QUEUE_BARRIER(&context);
+    QUEUE_WRITE(&context, 678, buf,  42, 0x56);
+
+    /* Verify queue contents */
+    POP_CHECK_WRITE(bq,     0, buf, 512, 0x12, 0);
+    POP_CHECK_WRITE(bq,   512, buf,  42, 0x34, 0);
+    POP_CHECK_BARRIER(bq, 0);
+    POP_CHECK_WRITE(bq,   678, buf,  42, 0x56, 1);
+
+    blkqueue_destroy(bq);
+}
+
+static void test_merge(BlockDriverState *bs)
+{
+    uint8_t buf[512];
+    BlockQueue *bq;
+    BlockQueueContext ctx1, ctx2;
+
+    bq = blkqueue_create(bs);
+    blkqueue_init_context(&ctx1, bq);
+    blkqueue_init_context(&ctx2, bq);
+
+    /* Queue requests */
+    QUEUE_WRITE(&ctx1,    0, buf, 512, 0x12);
+    QUEUE_BARRIER(&ctx1);
+    QUEUE_WRITE(&ctx2,  512, buf,  42, 0x34);
+    QUEUE_WRITE(&ctx1, 1024, buf, 512, 0x12);
+    QUEUE_BARRIER(&ctx2);
+    QUEUE_WRITE(&ctx2, 1512, buf,  42, 0x34);
+
+    /* Verify queue contents */
+    POP_CHECK_WRITE(bq,     0, buf, 512, 0x12, 0);
+    POP_CHECK_WRITE(bq,   512, buf,  42, 0x34, 0);
+    POP_CHECK_BARRIER(bq, 0);
+    POP_CHECK_WRITE(bq,  1024, buf, 512, 0x12, 1);
+    POP_CHECK_WRITE(bq,  1512, buf,  42, 0x34, 1);
+
+    /* Same queue, new contexts */
+    blkqueue_init_context(&ctx1, bq);
+    blkqueue_init_context(&ctx2, bq);
+
+    /* Queue requests */
+    QUEUE_BARRIER(&ctx2);
+    QUEUE_WRITE(&ctx2,  512, buf,  42, 0x34);
+    QUEUE_WRITE(&ctx2,   12, buf,  20, 0x45);
+    QUEUE_BARRIER(&ctx2);
+    QUEUE_WRITE(&ctx2,  892, buf, 142, 0x56);
+
+    QUEUE_WRITE(&ctx1,    0, buf,   8, 0x12);
+    QUEUE_BARRIER(&ctx1);
+    QUEUE_WRITE(&ctx1, 1024, buf, 512, 0x12);
+    QUEUE_BARRIER(&ctx1);
+    QUEUE_WRITE(&ctx1, 1512, buf,  42, 0x34);
+    QUEUE_BARRIER(&ctx1);
+
+    /* Verify queue contents */
+    POP_CHECK_WRITE(bq,     0, buf,   8, 0x12, 0);
+    POP_CHECK_BARRIER(bq, 0);
+    POP_CHECK_WRITE(bq,   512, buf,  42, 0x34, 1);
+    POP_CHECK_WRITE(bq,    12, buf,  20, 0x45, 1);
+    POP_CHECK_WRITE(bq,  1024, buf, 512, 0x12, 1);
+    POP_CHECK_BARRIER(bq, 1);
+    POP_CHECK_WRITE(bq,   892, buf, 142, 0x56, 2);
+    POP_CHECK_WRITE(bq,  1512, buf,  42, 0x34, 2);
+    POP_CHECK_BARRIER(bq, 2);
+
+    blkqueue_destroy(bq);
+}
+
+static void test_read(BlockDriverState *bs)
+{
+    uint8_t buf[512], buf2[512];
+    BlockQueue *bq;
+    BlockQueueContext ctx1;
+
+    bq = blkqueue_create(bs);
+    blkqueue_init_context(&ctx1, bq);
+
+    /* Queue requests and do some test reads */
+    memset(buf2, 0xa5, 512);
+    CHECK_READ(&ctx1, 0, buf, 32, buf2);
+
+    QUEUE_WRITE(&ctx1, 5, buf, 5, 0x12);
+    memset(buf2, 0x12, 5);
+    CHECK_READ(&ctx1,  5, buf, 5, buf2);
+    CHECK_READ(&ctx1,  7, buf, 2, buf2);
+    memset(buf2, 0xa5, 512);
+    memset(buf2 + 5, 0x12, 5);
+    CHECK_READ(&ctx1,  0, buf, 8, buf2);
+    CHECK_READ(&ctx1,  0, buf, 10, buf2);
+    CHECK_READ(&ctx1,  0, buf, 32, buf2);
+    memset(buf2, 0xa5, 512);
+    memset(buf2, 0x12, 5);
+    CHECK_READ(&ctx1,  5, buf, 16, buf2);
+    memset(buf2, 0xa5, 512);
+    CHECK_READ(&ctx1,  0, buf,  2, buf2);
+    CHECK_READ(&ctx1, 10, buf, 16, buf2);
+
+    QUEUE_WRITE(&ctx1, 0, buf, 2, 0x12);
+    memset(&buf2[5], 0x12, 5);
+    memset(buf2, 0x12, 2);
+    CHECK_READ(&ctx1,  0, buf, 32, buf2);
+
+    /* Verify queue contents */
+    POP_CHECK_WRITE(bq,     5, buf,   5, 0x12, 0);
+    POP_CHECK_WRITE(bq,     0, buf,   2, 0x12, 0);
+
+    blkqueue_destroy(bq);
+}
+
+static void test_read_order(BlockDriverState *bs)
+{
+    uint8_t buf[512], buf2[512];
+    BlockQueue *bq;
+    BlockQueueContext ctx1, ctx2;
+
+    bq = blkqueue_create(bs);
+    blkqueue_init_context(&ctx1, bq);
+    blkqueue_init_context(&ctx2, bq);
+
+    /* Queue requests and do some test reads */
+    QUEUE_WRITE(&ctx1, 25, buf, 5, 0x44);
+    QUEUE_BARRIER(&ctx1);
+    QUEUE_WRITE(&ctx1, 5, buf, 5, 0x12);
+    QUEUE_BARRIER(&ctx1);
+    QUEUE_WRITE(&ctx2, 10, buf, 5, 0x34);
+
+    memset(buf2, 0xa5, 512);
+    memset(buf2 + 5, 0x12, 5);
+    memset(buf2 + 10, 0x34, 5);
+    CHECK_READ(&ctx2, 0, buf, 20, buf2);
+    QUEUE_WRITE(&ctx2,  0, buf, 10, 0x34);
+    QUEUE_BARRIER(&ctx2);
+
+    /* Verify queue contents */
+    POP_CHECK_WRITE(bq,    25, buf,   5, 0x44, 0);
+    POP_CHECK_WRITE(bq,    10, buf,   5, 0x34, 0);
+    POP_CHECK_BARRIER(bq, 0);
+    POP_CHECK_WRITE(bq,     5, buf,   5, 0x12, 1);
+    POP_CHECK_WRITE(bq,     0, buf,  10, 0x34, 1);
+    POP_CHECK_BARRIER(bq, 1);
+
+    blkqueue_destroy(bq);
+}
+
+static void test_process_request(BlockDriverState *bs)
+{
+    uint8_t buf[512], buf2[512];
+    BlockQueue *bq;
+    BlockQueueContext ctx1;
+
+    bq = blkqueue_create(bs);
+    blkqueue_init_context(&ctx1, bq);
+
+    /* Queue requests and do a test read */
+    QUEUE_WRITE(&ctx1, 25, buf, 5, 0x44);
+    QUEUE_BARRIER(&ctx1);
+
+    memset(buf2, 0xa5, 512);
+    memset(buf2 + 25, 0x44, 5);
+    CHECK_READ(&ctx1, 0, buf, 64, buf2);
+
+    /* Process the queue (plus one call to test a NULL condition) */
+    blkqueue_process_request(bq);
+    blkqueue_process_request(bq);
+    blkqueue_process_request(bq);
+
+    /* Verify the queue is empty */
+    assert(blkqueue_pop(bq) == NULL);
+
+    /* Check if we still read the same */
+    CHECK_READ(&ctx1, 0, buf, 64, buf2);
+
+    blkqueue_destroy(bq);
+}
+
+static void run_test(void (*testfn)(BlockDriverState*), BlockDriverState *bs)
+{
+    void* buf;
+    int ret;
+
+    buf = qemu_malloc(1024 * 1024);
+    memset(buf, 0xa5, 1024 * 1024);
+    ret = bdrv_write(bs, 0, buf, 2048);
+    assert(ret>= 0);
+    qemu_free(buf);
+
+    testfn(bs);
+}
+
+int main(void)
+{
+    BlockDriverState *bs;
+    int ret;
+
+    bdrv_init();
+    bs = bdrv_new("");
+    ret = bdrv_open(bs, "block-queue.img", BDRV_O_RDWR, NULL);
+    if (ret<  0) {
+        fprintf(stderr, "Couldn't open block-queue.img: %s\n",
+            strerror(-ret));
+        exit(1);
+    }
+
+    run_test(&test_basic, bs);
+    run_test(&test_merge, bs);
+    run_test(&test_read, bs);
+    run_test(&test_read_order, bs);
+    run_test(&test_process_request, bs);
+
+    bdrv_delete(bs);
+
+    return 0;
+}
+#endif
diff --git a/block-queue.h b/block-queue.h
new file mode 100644
index 0000000..4ce0e1b
--- /dev/null
+++ b/block-queue.h
@@ -0,0 +1,49 @@
+/*
+ * QEMU System Emulator
+ *
+ * Copyright (c) 2010 Kevin Wolf<address@hidden>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to 
deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#ifndef BLOCK_QUEUE_H
+#define BLOCK_QUEUE_H
+
+#include "qemu-common.h"
+
+typedef struct BlockQueue BlockQueue;
+
+typedef struct BlockQueueContext {
+    BlockQueue* bq;
+    unsigned    section;
+} BlockQueueContext;
+
+BlockQueue *blkqueue_create(BlockDriverState *bs);
+void blkqueue_init_context(BlockQueueContext* context, BlockQueue *bq);
+void blkqueue_destroy(BlockQueue *bq);
+int blkqueue_pread(BlockQueueContext *context, uint64_t offset, void *buf,
+    uint64_t size);
+int blkqueue_pwrite(BlockQueueContext *context, uint64_t offset, void *buf,
+    uint64_t size);
+int blkqueue_barrier(BlockQueueContext *context);
+void blkqueue_flush(BlockQueue *bq);
+void blkqueue_aio_flush(BlockQueue *bq, BlockDriverCompletionFunc *cb,
+    void *opaque);
+
+#endif
diff --git a/block/qcow2-cluster.c b/block/qcow2-cluster.c
index f562b16..eeae173 100644
--- a/block/qcow2-cluster.c
+++ b/block/qcow2-cluster.c
@@ -64,7 +64,8 @@ int qcow2_grow_l1_table(BlockDriverState *bs, int min_size)
      BLKDBG_EVENT(bs->file, BLKDBG_L1_GROW_WRITE_TABLE);
      for(i = 0; i<  s->l1_size; i++)
          new_l1_table[i] = cpu_to_be64(new_l1_table[i]);
-    ret = bdrv_pwrite_sync(bs->file, new_l1_table_offset, new_l1_table, 
new_l1_size2);
+    ret = blkqueue_pwrite(&s->bq_context, new_l1_table_offset, new_l1_table, 
new_l1_size2);
+    blkqueue_barrier(&s->bq_context);
      if (ret<  0)
          goto fail;
      for(i = 0; i<  s->l1_size; i++)
@@ -74,7 +75,8 @@ int qcow2_grow_l1_table(BlockDriverState *bs, int min_size)
      BLKDBG_EVENT(bs->file, BLKDBG_L1_GROW_ACTIVATE_TABLE);
      cpu_to_be32w((uint32_t*)data, new_l1_size);
      cpu_to_be64w((uint64_t*)(data + 4), new_l1_table_offset);
-    ret = bdrv_pwrite_sync(bs->file, offsetof(QCowHeader, l1_size), 
data,sizeof(data));
+    ret = blkqueue_pwrite(&s->bq_context, offsetof(QCowHeader, l1_size), data, 
sizeof(data));
+    blkqueue_barrier(&s->bq_context);
      if (ret<  0) {
          goto fail;
      }
@@ -177,8 +179,7 @@ static int l2_load(BlockDriverState *bs, uint64_t l2_offset,
      *l2_table = s->l2_cache + (min_index<<  s->l2_bits);

      BLKDBG_EVENT(bs->file, BLKDBG_L2_LOAD);
-    ret = bdrv_pread(bs->file, l2_offset, *l2_table,
-        s->l2_size * sizeof(uint64_t));
+    ret = blkqueue_pread(&s->bq_context, l2_offset, *l2_table, s->l2_size * 
sizeof(uint64_t));
      if (ret<  0) {
          return ret;
      }
@@ -207,8 +208,8 @@ static int write_l1_entry(BlockDriverState *bs, int 
l1_index)
      }

      BLKDBG_EVENT(bs->file, BLKDBG_L1_UPDATE);
-    ret = bdrv_pwrite_sync(bs->file, s->l1_table_offset + 8 * l1_start_index,
-        buf, sizeof(buf));
+    ret = blkqueue_pwrite(&s->bq_context, s->l1_table_offset + 8 * 
l1_start_index, buf, sizeof(buf));
+    blkqueue_barrier(&s->bq_context);
      if (ret<  0) {
          return ret;
      }
@@ -255,16 +256,15 @@ static int l2_allocate(BlockDriverState *bs, int 
l1_index, uint64_t **table)
      } else {
          /* if there was an old l2 table, read it from the disk */
          BLKDBG_EVENT(bs->file, BLKDBG_L2_ALLOC_COW_READ);
-        ret = bdrv_pread(bs->file, old_l2_offset, l2_table,
-            s->l2_size * sizeof(uint64_t));
+        ret = blkqueue_pread(&s->bq_context, old_l2_offset, l2_table, 
s->l2_size * sizeof(uint64_t));
          if (ret<  0) {
              goto fail;
          }
      }
      /* write the l2 table to the file */
      BLKDBG_EVENT(bs->file, BLKDBG_L2_ALLOC_WRITE);
-    ret = bdrv_pwrite_sync(bs->file, l2_offset, l2_table,
-        s->l2_size * sizeof(uint64_t));
+    ret = blkqueue_pwrite(&s->bq_context, l2_offset, l2_table, s->l2_size * 
sizeof(uint64_t));
+    blkqueue_barrier(&s->bq_context);
      if (ret<  0) {
          goto fail;
      }
@@ -378,7 +378,7 @@ static int qcow_read(BlockDriverState *bs, int64_t 
sector_num,
              memcpy(buf, s->cluster_cache + index_in_cluster * 512, 512 * n);
          } else {
              BLKDBG_EVENT(bs->file, BLKDBG_READ);
-            ret = bdrv_pread(bs->file, cluster_offset + index_in_cluster * 
512, buf, n * 512);
+            ret = blkqueue_pread(&s->bq_context, cluster_offset + 
index_in_cluster * 512, buf, n * 512);
              if (ret != n * 512)
                  return -1;
              if (s->crypt_method) {
@@ -648,6 +648,7 @@ uint64_t 
qcow2_alloc_compressed_cluster_offset(BlockDriverState *bs,
  static int write_l2_entries(BlockDriverState *bs, uint64_t *l2_table,
      uint64_t l2_offset, int l2_index, int num)
  {
+    BDRVQcowState *s = bs->opaque;
      int l2_start_index = l2_index&  ~(L1_ENTRIES_PER_SECTOR - 1);
      int start_offset = (8 * l2_index)&  ~511;
      int end_offset = (8 * (l2_index + num) + 511)&  ~511;
@@ -655,8 +656,7 @@ static int write_l2_entries(BlockDriverState *bs, uint64_t 
*l2_table,
      int ret;

      BLKDBG_EVENT(bs->file, BLKDBG_L2_UPDATE);
-    ret = bdrv_pwrite(bs->file, l2_offset + start_offset,
-&l2_table[l2_start_index], len);
+    ret = blkqueue_pwrite(&s->bq_context, l2_offset + 
start_offset,&l2_table[l2_start_index], len);
      if (ret<  0) {
          return ret;
      }
@@ -723,7 +723,7 @@ int qcow2_alloc_cluster_link_l2(BlockDriverState *bs, 
QCowL2Meta *m)
       * Also flush bs->file to get the right order for L2 and refcount update.
       */
      if (j != 0) {
-        bdrv_flush(bs->file);
+        blkqueue_barrier(&s->bq_context);
          for (i = 0; i<  j; i++) {
              qcow2_free_any_clusters(bs,
                  be64_to_cpu(old_cluster[i])&  ~QCOW_OFLAG_COPIED, 1);
diff --git a/block/qcow2-refcount.c b/block/qcow2-refcount.c
index 4c19e7e..0d21d1f 100644
--- a/block/qcow2-refcount.c
+++ b/block/qcow2-refcount.c
@@ -44,7 +44,7 @@ static int write_refcount_block(BlockDriverState *bs)
      }

      BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_UPDATE);
-    if (bdrv_pwrite_sync(bs->file, s->refcount_block_cache_offset,
+    if (blkqueue_pwrite(&s->bq_context, s->refcount_block_cache_offset,
              s->refcount_block_cache, size)<  0)
      {
          return -EIO;
@@ -66,8 +66,7 @@ int qcow2_refcount_init(BlockDriverState *bs)
      s->refcount_table = qemu_malloc(refcount_table_size2);
      if (s->refcount_table_size>  0) {
          BLKDBG_EVENT(bs->file, BLKDBG_REFTABLE_LOAD);
-        ret = bdrv_pread(bs->file, s->refcount_table_offset,
-                         s->refcount_table, refcount_table_size2);
+        ret = bdrv_pread(bs->file, s->refcount_table_offset, 
s->refcount_table, refcount_table_size2);
          if (ret != refcount_table_size2)
              goto fail;
          for(i = 0; i<  s->refcount_table_size; i++)
@@ -100,8 +99,7 @@ static int load_refcount_block(BlockDriverState *bs,
      }

      BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_LOAD);
-    ret = bdrv_pread(bs->file, refcount_block_offset, s->refcount_block_cache,
-                     s->cluster_size);
+    ret = blkqueue_pread(&s->bq_context, refcount_block_offset, 
s->refcount_block_cache, s->cluster_size);
      if (ret<  0) {
          return ret;
      }
@@ -269,8 +267,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, 
int64_t cluster_index)

      /* Now the new refcount block needs to be written to disk */
      BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_WRITE);
-    ret = bdrv_pwrite_sync(bs->file, new_block, s->refcount_block_cache,
-        s->cluster_size);
+    ret = blkqueue_pwrite(&s->bq_context, new_block, s->refcount_block_cache, 
s->cluster_size);
+    blkqueue_barrier(&s->bq_context);
      if (ret<  0) {
          goto fail_block;
      }
@@ -279,9 +277,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, 
int64_t cluster_index)
      if (refcount_table_index<  s->refcount_table_size) {
          uint64_t data64 = cpu_to_be64(new_block);
          BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_HOOKUP);
-        ret = bdrv_pwrite_sync(bs->file,
-            s->refcount_table_offset + refcount_table_index * sizeof(uint64_t),
-&data64, sizeof(data64));
+        ret = blkqueue_pwrite(&s->bq_context, s->refcount_table_offset + 
refcount_table_index * sizeof(uint64_t),&data64, sizeof(data64));
+        blkqueue_barrier(&s->bq_context);
          if (ret<  0) {
              goto fail_block;
          }
@@ -359,8 +356,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, 
int64_t cluster_index)

      /* Write refcount blocks to disk */
      BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_WRITE_BLOCKS);
-    ret = bdrv_pwrite_sync(bs->file, meta_offset, new_blocks,
-        blocks_clusters * s->cluster_size);
+    ret = blkqueue_pwrite(&s->bq_context, meta_offset, new_blocks, 
blocks_clusters * s->cluster_size);
+    blkqueue_barrier(&s->bq_context);
      qemu_free(new_blocks);
      if (ret<  0) {
          goto fail_table;
@@ -372,8 +369,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, 
int64_t cluster_index)
      }

      BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_WRITE_TABLE);
-    ret = bdrv_pwrite_sync(bs->file, table_offset, new_table,
-        table_size * sizeof(uint64_t));
+    ret = blkqueue_pwrite(&s->bq_context, table_offset, new_table, table_size 
* sizeof(uint64_t));
+    blkqueue_barrier(&s->bq_context);
      if (ret<  0) {
          goto fail_table;
      }
@@ -387,8 +384,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, 
int64_t cluster_index)
      cpu_to_be64w((uint64_t*)data, table_offset);
      cpu_to_be32w((uint32_t*)(data + 8), table_clusters);
      BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_SWITCH_TABLE);
-    ret = bdrv_pwrite_sync(bs->file, offsetof(QCowHeader, 
refcount_table_offset),
-        data, sizeof(data));
+    ret = blkqueue_pwrite(&s->bq_context, offsetof(QCowHeader, 
refcount_table_offset), data, sizeof(data));
+    blkqueue_barrier(&s->bq_context);
      if (ret<  0) {
          goto fail_table;
      }
@@ -444,9 +441,8 @@ static int write_refcount_block_entries(BlockDriverState 
*bs,
      size = (last_index - first_index)<<  REFCOUNT_SHIFT;

      BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_UPDATE_PART);
-    ret = bdrv_pwrite_sync(bs->file,
-        refcount_block_offset + (first_index<<  REFCOUNT_SHIFT),
-&s->refcount_block_cache[first_index], size);
+    ret = blkqueue_pwrite(&s->bq_context, refcount_block_offset + (first_index<<  
REFCOUNT_SHIFT),&s->refcount_block_cache[first_index], size);
+    blkqueue_barrier(&s->bq_context);
      if (ret<  0) {
          return ret;
      }
@@ -763,8 +759,7 @@ int qcow2_update_snapshot_refcount(BlockDriverState *bs,
              l1_table = NULL;
          }
          l1_allocated = 1;
-        if (bdrv_pread(bs->file, l1_table_offset,
-                       l1_table, l1_size2) != l1_size2)
+        if (blkqueue_pread(&s->bq_context, l1_table_offset, l1_table, 
l1_size2) != l1_size2)
              goto fail;
          for(i = 0;i<  l1_size; i++)
              be64_to_cpus(&l1_table[i]);
@@ -783,7 +778,7 @@ int qcow2_update_snapshot_refcount(BlockDriverState *bs,
              old_l2_offset = l2_offset;
              l2_offset&= ~QCOW_OFLAG_COPIED;
              l2_modified = 0;
-            if (bdrv_pread(bs->file, l2_offset, l2_table, l2_size) != l2_size)
+            if (blkqueue_pread(&s->bq_context, l2_offset, l2_table, l2_size) 
!= l2_size)
                  goto fail;
              for(j = 0; j<  s->l2_size; j++) {
                  offset = be64_to_cpu(l2_table[j]);
@@ -943,7 +938,7 @@ static int check_refcounts_l2(BlockDriverState *bs, 
BdrvCheckResult *res,
      l2_size = s->l2_size * sizeof(uint64_t);
      l2_table = qemu_malloc(l2_size);

-    if (bdrv_pread(bs->file, l2_offset, l2_table, l2_size) != l2_size)
+    if (blkqueue_pread(&s->bq_context, l2_offset, l2_table, l2_size) != 
l2_size)
          goto fail;

      /* Do the actual checks */
@@ -1038,8 +1033,7 @@ static int check_refcounts_l1(BlockDriverState *bs,
          l1_table = NULL;
      } else {
          l1_table = qemu_malloc(l1_size2);
-        if (bdrv_pread(bs->file, l1_table_offset,
-                       l1_table, l1_size2) != l1_size2)
+        if (blkqueue_pread(&s->bq_context, l1_table_offset, l1_table, 
l1_size2) != l1_size2)
              goto fail;
          for(i = 0;i<  l1_size; i++)
              be64_to_cpus(&l1_table[i]);
diff --git a/block/qcow2.c b/block/qcow2.c
index f2b1b1c..9b1cd78 100644
--- a/block/qcow2.c
+++ b/block/qcow2.c
@@ -237,6 +237,10 @@ static int qcow_open(BlockDriverState *bs, int flags)
      if (qcow2_read_snapshots(bs)<  0)
          goto fail;

+    /* Block queue */
+    s->bq = blkqueue_create(bs->file);
+    blkqueue_init_context(&s->bq_context, s->bq);
+
  #ifdef DEBUG_ALLOC
      qcow2_check_refcounts(bs);
  #endif
@@ -494,6 +498,7 @@ static QCowAIOCB *qcow_aio_setup(BlockDriverState *bs,
          int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
          BlockDriverCompletionFunc *cb, void *opaque, int is_write)
  {
+    BDRVQcowState *s = bs->opaque;
      QCowAIOCB *acb;

      acb = qemu_aio_get(&qcow_aio_pool, bs, cb, opaque);
@@ -514,6 +519,10 @@ static QCowAIOCB *qcow_aio_setup(BlockDriverState *bs,
      acb->cluster_offset = 0;
      acb->l2meta.nb_clusters = 0;
      QLIST_INIT(&acb->l2meta.dependent_requests);
+
+    /* TODO Push the context into l2meta */
+    blkqueue_init_context(&s->bq_context, s->bq);
+
      return acb;
  }

@@ -667,6 +676,7 @@ static void qcow_close(BlockDriverState *bs)
      qemu_free(s->cluster_cache);
      qemu_free(s->cluster_data);
      qcow2_refcount_close(bs);
+    blkqueue_destroy(s->bq);
  }

  /*
@@ -1123,12 +1133,16 @@ static int qcow_write_compressed(BlockDriverState *bs, 
int64_t sector_num,

  static void qcow_flush(BlockDriverState *bs)
  {
+    BDRVQcowState *s = bs->opaque;
+    blkqueue_flush(s->bq);
      bdrv_flush(bs->file);
  }

  static BlockDriverAIOCB *qcow_aio_flush(BlockDriverState *bs,
           BlockDriverCompletionFunc *cb, void *opaque)
  {
+    BDRVQcowState *s = bs->opaque;
+    blkqueue_flush(s->bq);
      return bdrv_aio_flush(bs->file, cb, opaque);
  }

diff --git a/block/qcow2.h b/block/qcow2.h
index 3ff162e..361f1ba 100644
--- a/block/qcow2.h
+++ b/block/qcow2.h
@@ -26,6 +26,7 @@
  #define BLOCK_QCOW2_H

  #include "aes.h"
+#include "block-queue.h"

  //#define DEBUG_ALLOC
  //#define DEBUG_ALLOC2
@@ -108,6 +109,9 @@ typedef struct BDRVQcowState {
      int64_t free_cluster_index;
      int64_t free_byte_offset;

+    BlockQueue *bq;
+    BlockQueueContext bq_context;
+
      uint32_t crypt_method; /* current crypt method, 0 if no key yet */
      uint32_t crypt_method_header;
      AES_KEY aes_encrypt_key;
diff --git a/qemu-thread.c b/qemu-thread.c
index fbc78fe..abcb7a6 100644
--- a/qemu-thread.c
+++ b/qemu-thread.c
@@ -167,6 +167,19 @@ void qemu_thread_create(QemuThread *thread,
      pthread_sigmask(SIG_SETMASK,&oldset, NULL);
  }

+void *qemu_thread_join(QemuThread *thread)
+{
+    int err;
+    void *ret;
+
+    err = pthread_join(thread->thread,&ret);
+    if (err) {
+        error_exit(err, __func__);
+    }
+
+    return ret;
+}
+
  void qemu_thread_signal(QemuThread *thread, int sig)
  {
      int err;
diff --git a/qemu-thread.h b/qemu-thread.h
index 19bb30c..2b6f218 100644
--- a/qemu-thread.h
+++ b/qemu-thread.h
@@ -36,6 +36,7 @@ int qemu_cond_timedwait(QemuCond *cond, QemuMutex *mutex, 
uint64_t msecs);
  void qemu_thread_create(QemuThread *thread,
                         void *(*start_routine)(void*),
                         void *arg);
+void *qemu_thread_join(QemuThread *thread);
  void qemu_thread_signal(QemuThread *thread, int sig);
  void qemu_thread_self(QemuThread *thread);
  int qemu_thread_equal(QemuThread *thread1, QemuThread *thread2);





reply via email to

[Prev in Thread] Current Thread [Next in Thread]