qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] Re: [RFC PATCH v3 2/4] Add block-queue


From: Stefan Hajnoczi
Subject: [Qemu-devel] Re: [RFC PATCH v3 2/4] Add block-queue
Date: Fri, 3 Dec 2010 09:44:28 +0000

On Tue, Nov 30, 2010 at 12:48 PM, Kevin Wolf <address@hidden> wrote:
> +/*
> + * Adds a write request to the queue.
> + */
> +int blkqueue_pwrite(BlockQueueContext *context, uint64_t offset, void *buf,
> +    uint64_t size)
> +{
> +    BlockQueue *bq = context->bq;
> +    BlockQueueRequest *section_req;
> +    bool completed;
> +
> +    /* Don't use the queue for writethrough images */
> +    if ((bq->bs->open_flags & WRITEBACK_MODES) == 0) {
> +        return bdrv_pwrite(bq->bs, offset, buf, size);
> +    }
> +
> +    /* First check if there are any pending writes for the same data. */
> +    DPRINTF("--        pwrite: [%#lx + %ld]\n", offset, size);
> +    completed = blkqueue_check_queue_overlap(context, &bq->queue, &offset,
> +        &buf, &size, &blkqueue_pwrite, &pwrite_handle_overlap,
> +        context->section);
> +
> +    if (completed) {
> +        return 0;
> +    }
> +
> +    /* Create request structure */
> +    BlockQueueRequest *req = qemu_malloc(sizeof(*req));
> +    QLIST_INIT(&req->acbs);
> +    req->type       = REQ_TYPE_WRITE;
> +    req->bq         = bq;
> +    req->offset     = offset;
> +    req->size       = size;
> +    req->buf        = qemu_malloc(size);

qemu_blockalign()

> +static int insert_barrier(BlockQueueContext *context, BlockQueueAIOCB *acb)
> +{
> +    BlockQueue *bq = context->bq;
> +    BlockQueueRequest *section_req;
> +
> +    bq->barriers_requested++;
> +
> +    /* Create request structure */
> +    BlockQueueRequest *req = qemu_malloc(sizeof(*req));
> +    QLIST_INIT(&req->acbs);
> +    req->type       = REQ_TYPE_BARRIER;
> +    req->bq         = bq;
> +    req->section    = context->section;
> +    req->buf        = NULL;
> +
> +    /* Find another barrier to merge with. */
> +    QSIMPLEQ_FOREACH(section_req, &bq->sections, link_section) {
> +        if (section_req->section >= req->section) {
> +
> +            /*
> +             * If acb is set, the intention of the barrier request is to 
> flush
> +             * the complete queue and notify the caller when all requests 
> have
> +             * been processed. To achieve this, we may only merge with the 
> very
> +             * last request in the queue.
> +             */
> +            if (acb && QTAILQ_NEXT(section_req, link)) {
> +                continue;
> +            }
> +
> +            req->section = section_req->section;
> +            context->section = section_req->section + 1;
> +            qemu_free(req);
> +            req = section_req;
> +            goto out;
> +        }
> +    }

I think the search for an existing barrier should be moved above the
req allocation.  It's more work to allocate req, do an unnecessary
req->section = section_req->section, and then free it again.

> +
> +    /*
> +     * If there wasn't a barrier for the same section yet, insert a new one 
> at
> +     * the end.
> +     */
> +    DPRINTF("queue-ins flush: %p\n", req);
> +    QTAILQ_INSERT_TAIL(&bq->queue, req, link);
> +    QSIMPLEQ_INSERT_TAIL(&bq->sections, req, link_section);
> +    bq->queue_size++;
> +    context->section++;
> +
> +    bq->barriers_submitted++;
> +
> +    /*
> +     * At this point, req is either the newly inserted request, or a 
> previously
> +     * existing barrier with which the current request has been merged.
> +     *
> +     * Insert the ACB in the list of that request so that the callback is
> +     * called when the request has completed.
> +     */
> +out:
> +    if (acb) {
> +        QLIST_INSERT_HEAD(&req->acbs, acb, link);

Is there a reason to insert at the head and not append to the tail?
Preserving order is usually good.

> +/*
> + * If there are any blkqueue_aio_flush callbacks pending, call them with ret
> + * as the error code and remove them from the queue.
> + *
> + * If keep_queue is false, all requests are removed from the queue
> + */
> +static void blkqueue_fail_flush(BlockQueue *bq, int ret, bool keep_queue)
> +{
> +    BlockQueueRequest *req, *next_req;
> +    BlockQueueAIOCB *acb, *next_acb;
> +
> +    QTAILQ_FOREACH_SAFE(req, &bq->queue, link, next_req) {
> +
> +        /* Call and remove registered callbacks */
> +        QLIST_FOREACH_SAFE(acb, &req->acbs, link, next_acb) {
> +            acb->common.cb(acb->common.opaque, ret);
> +            qemu_free(acb);

qemu_aio_release()

> +        }
> +        QLIST_INIT(&req->acbs);
> +
> +        /* If requested, remove the request itself */
> +        if (!keep_queue) {
> +            QTAILQ_REMOVE(&bq->queue, req, link);

bq->queue_size--;

> +            if (req->type == REQ_TYPE_BARRIER) {
> +                QSIMPLEQ_REMOVE(&bq->sections, req, BlockQueueRequest,
> +                    link_section);
> +            }

Now free the request?

> +        }
> +    }
> +
> +    /* Make sure that blkqueue_flush stops running */
> +    bq->flushing = ret;
> +}
> +
> +static void blkqueue_process_request_cb(void *opaque, int ret)
> +{
> +    BlockQueueRequest *req = opaque;
> +    BlockQueue *bq = req->bq;
> +    BlockQueueAIOCB *acb, *next;
> +
> +    DPRINTF("  done    req:    %p [%#lx + %ld]\n", req, req->offset, 
> req->size);
> +
> +    /* Remove from in-flight list */
> +    QTAILQ_REMOVE(&bq->in_flight, req, link);
> +    bq->in_flight_num--;
> +
> +    /*
> +     * Error handling gets a bit complicated, because we have already 
> completed
> +     * the requests that went wrong. There are two ways of dealing with this:
> +     *
> +     * 1. With werror=stop we can put the request back into the queue and 
> stop
> +     *    the VM. When the user continues the VM, the request is retried.
> +     *
> +     * 2. In other cases we need to return an error on the next bdrv_flush. 
> The
> +     *    caller must cope with the fact that he doesn't know which of the
> +     *    requests succeeded (i.e. invalidate all caches)
> +     *
> +     * If we're in an blkqueue_aio_flush, we must return an error in both
> +     * cases. If we stop the VM, we can clear bq->errno immediately again.
> +     * Otherwise, it's cleared in bdrv_(aio_)flush.
> +     */
> +    if (ret < 0) {
> +        if (bq->error_ret != -ENOSPC) {
> +            bq->error_ret = ret;
> +        }
> +    }
> +
> +    /* Call any callbacks attached to the request (see blkqueue_aio_flush) */
> +    QLIST_FOREACH_SAFE(acb, &req->acbs, link, next) {
> +        acb->common.cb(acb->common.opaque, bq->error_ret);
> +        qemu_free(acb);

qemu_aio_release()

> +    }
> +    QLIST_INIT(&req->acbs);
> +
> +    /* Handle errors in the VM stop case */
> +    if (ret < 0) {
> +        bool keep_queue = bq->error_handler(bq->error_opaque, ret);
> +
> +        /* Fail any flushes that may wait for the queue to become empty */
> +        blkqueue_fail_flush(bq, bq->error_ret, keep_queue);
> +
> +        if (keep_queue) {
> +            /* Reinsert request into the queue */
> +            QTAILQ_INSERT_HEAD(&bq->queue, req, link);
> +            if (req->type == REQ_TYPE_BARRIER) {
> +                QSIMPLEQ_INSERT_HEAD(&bq->sections, req, link_section);
> +            }
> +
> +            /* Clear the error to restore a normal state after 'cont' */
> +            bq->error_ret = 0;
> +            return;
> +        }
> +    }
> +
> +    /* Cleanup */
> +    blkqueue_free_request(req);
> +
> +    /* Check if there are more requests to submit */
> +    blkqueue_process_request(bq);
> +}
> +
> +/*
> + * Checks if the first request on the queue can run. If so, remove it from 
> the
> + * queue, submit the request and put it onto the queue of in-flight requests.
> + *
> + * Returns 0 if a request has been submitted, -1 if no request can run or an
> + * error has occurred.
> + */

If we want specific error codes:
-EAGAIN no request can run
-EINVAL, -EIO, ... specific errors

> +static int blkqueue_submit_request(BlockQueue *bq)
> +{
> +    BlockDriverAIOCB *acb;
> +    BlockQueueRequest *req;
> +
> +    /*
> +     * If we had an error, we must not submit new requests from another
> +     * section or may we get ordering problems. In fact, not submitting any 
> new
> +     * requests looks like a good idea in this case.
> +     */
> +    if (bq->error_ret) {
> +        return -1;
> +    }
> +
> +    /* Fetch a request */
> +    req = QTAILQ_FIRST(&bq->queue);
> +    if (req == NULL) {
> +        return -1;
> +    }
> +
> +    /* Writethrough images aren't supposed to have any queue entries */
> +    assert((bq->bs->open_flags & WRITEBACK_MODES) != 0);
> +
> +    /*
> +     * We need to wait for completion before we can submit new requests:
> +     * 1. If we're currently processing a barrier, or the new request is a
> +     *    barrier, we need to guarantee this barrier semantics.
> +     * 2. We must make sure that newer writes cannot pass older ones.
> +     */
> +    if (bq->in_flight_num > 0) {

I don't understand why we refuse to do more work on in_flight_num > 0.
 This function increments in_flight_num which means
blkqueue_process_request() will only ever submit one request at a
time?  Why does blkqueue_process_request() loop then?  I'm missing
something.

> +        return -1;
> +    }
> +
> +    /* Process barriers only if the queue is long enough */
> +    if (!bq->flushing) {
> +        if (req->type == REQ_TYPE_BARRIER && bq->queue_size < 50) {
> +            return -1;
> +        }

Not sure about this.  Need to think about the patch more but this
check looks like it could have consequences like starvation.  I guess
that's why you check for !bq->flushing.

> +static void blkqueue_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> +    BlockQueueAIOCB *acb = (BlockQueueAIOCB*) blockacb;
> +
> +    /*
> +     * We can't cancel the flush any more, but that doesn't hurt. We just
> +     * need to make sure that we don't call the callback when it completes.
> +     */
> +    QLIST_REMOVE(acb, link);
> +    qemu_free(acb);

qemu_aio_release()

> +}
> +
> +/*
> + * Inserts a barrier at the end of the queue (or merges with an existing
> + * barrier there). Once the barrier has completed, the callback is called.
> + */
> +BlockDriverAIOCB* blkqueue_aio_flush(BlockQueueContext *context,
> +    BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    BlockQueueAIOCB *acb;
> +    BlockDriverState *bs = context->bq->bs;
> +    int ret;
> +
> +    /* Don't use the queue for writethrough images */
> +    if ((bs->open_flags & WRITEBACK_MODES) == 0) {
> +        return bdrv_aio_flush(bs, cb, opaque);
> +    }
> +
> +    /* Insert a barrier into the queue */
> +    acb = qemu_aio_get(&blkqueue_aio_pool, NULL, cb, opaque);
> +
> +    ret = insert_barrier(context, acb);
> +    if (ret < 0) {

This return path is broken:

> +        cb(opaque, ret);

Missing BH.

> +        qemu_free(acb);

qemu_aio_release(acb);

If we want to invoke cb (via a BH) then we shouldn't release acb.  If
we do release it we need to return NULL but shouldn't invoke cb.

> +    }
> +
> +    return &acb->common;
> +}
> +
> +/*
> + * Flushes the queue (i.e. disables waiting for new requests to be batched) 
> and
> + * waits until all requests in the queue have completed.
> + *
> + * Note that unlike blkqueue_aio_flush this does not call bdrv_flush().
> + */
> +int blkqueue_flush(BlockQueue *bq)
> +{
> +    int res = 0;
> +
> +    bq->flushing = 1;
> +
> +    /* Process any left over requests */
> +    while ((bq->flushing > 0) &&
> +        (bq->in_flight_num || QTAILQ_FIRST(&bq->queue)))
> +    {
> +        blkqueue_process_request(bq);
> +        qemu_aio_wait();
> +    }
> +
> +    /*
> +     * bq->flushing contains the error if it could be handled by stopping the
> +     * VM, error_ret contains it if we're not allowed to do this.
> +     */
> +    if (bq->error_ret < 0) {
> +        res = bq->error_ret;
> +
> +        /*
> +         * Wait for AIO requests, so that the queue is really unused after
> +         * blkqueue_flush() and the caller can destroy it
> +         */
> +        if (res < 0) {

We already know bq->error_ret < 0.  This comparison is always true.

Stefan



reply via email to

[Prev in Thread] Current Thread [Next in Thread]