qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] Re: [PATCH] NBD: Convert the NBD driver to use the AIO inte


From: Kevin Wolf
Subject: [Qemu-devel] Re: [PATCH] NBD: Convert the NBD driver to use the AIO interface.
Date: Wed, 06 Apr 2011 13:30:19 +0200
User-agent: Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.1.15) Gecko/20101027 Fedora/3.0.10-1.fc12 Thunderbird/3.0.10

Am 24.02.2011 17:49, schrieb Nick Thomas:
> This preserves the previous behaviour where the NBD server is
> unavailable or goes away during guest execution, but switches the
> NBD backend to present the AIO interface instead of the sync IO
> interface.
> 
> We also split read & write requests into 1 MiB blocks (minus header).
> This is a hard limit in the NBD servers (including qemu-nbd), but
> never seemed to come up with the previous backend code.
> 
> All IO except setup and teardown is asynchronous, and we (in theory)
> handle canceled requests properly too.
> 
> Signed-off-by: Nick Thomas <address@hidden>

Actually, the patch doesn't even apply. So just a quick review...

>  block/nbd.c |  652 +++++++++++++++++++++++++++++++++++++++++++++++++++++-----
>  nbd.c       |   76 +++++--
>  nbd.h       |   13 +-
>  3 files changed, 662 insertions(+), 79 deletions(-)
> 
> diff --git a/block/nbd.c b/block/nbd.c
> index 1d6b225..d6594e4 100644
> --- a/block/nbd.c
> +++ b/block/nbd.c
> @@ -34,7 +34,12 @@
>  #include <sys/types.h>
>  #include <unistd.h>
>  
> -#define EN_OPTSTR ":exportname="
> +#define EN_OPTSTR   ":exportname="
> +#define SECTOR_SIZE 512
> +
> +/* 1MiB minus header size */
> +#define NBD_MAX_READ      ((1024*1024) - sizeof(struct nbd_reply))
> +#define NBD_MAX_WRITE     ((1024*1024) - sizeof(struct nbd_request))
>  
>  /* #define DEBUG_NBD */
>  
> @@ -45,17 +50,148 @@
>  #define logout(fmt, ...) ((void)0)
>  #endif
>  
> -typedef struct BDRVNBDState {
> +/* 
> +  * Here's how the I/O works.
> +  * qemu creates a BDRVNBDState for us, which is the context for all reads
> +  * and writes.
> +  * 
> +  * nbd_open is called to connect to the NBD server and set up on-read and 
> +  * on-write handlers (aio_read_response, aio_write_request). 
> +  *
> +  * nbd_aio_readv/writev, called by qemu, create an NBDAIOCB (representing 
> the
> +  * I/O request to qemu).
> +  * For read requests, read/writev creates a single AIOReq containing the NBD
> +  * header. For write requests, 1 or more AIOReqs are created, containing the
> +  * NBD header and the write data. These are pushed to reqs_to_send_head in 
> the
> +  * BDRVNBDState and the list in the NBDAIOCB.
> +  *
> +  * Each time aio_write_request is called by qemu, it gets the first AIOReq
> +  * in the reqs_to_send_head and writes the data to the socket.
> +  * If this results in the whole AIOReq being written to the socket, it moves
> +  * the AIOReq to the reqs_for_reply_head in the BDRVNBDState. If the AIOReq
> +  * isn't finished, then it's left where it is. to have more of it written
> +  * next callback.
> +  *
> +  * If there's an unrecoverable error writing to the socket, we disconnect 
> and
> +  * return the entire acb as -EIO
> +  *
> +  * Each aio_read_response, we check the BDRVNBDState's current_req attribute
> +  * to see if we're in the middle of a read. If not, we read a header's 
> worth of
> +  * data, then try to find an AIOReq in the reqs_for_reply_head.
> +  * 
> +  * If we don't find one, then we're in a weird error state.
> +  * 
> +  * Once we have our AIOReq, we remove it from reqs_for_reply_head and put it
> +  * in the current_req attribute, then read from the socket to the buffer (if
> +  * needed). If that completes the AIOReq, we clear the current_req attribute
> +  * and deallocate the AIOReq.
> +  * If the AIOReq is complete, and that's the last one for the NBDAIOCB, we 
> call
> +  * the 'done' callback' and return.
> +  * If the AIOReq isn't complete, we just return. It'll be completed in 
> future
> +  * callbacks, since it's now the current_req
> +  * 
> +  * If there'an unrecoverable error reading from the socket, [...]

Something's missing here? ;-)

> + */
> +
> +typedef struct NBDAIOCB NBDAIOCB;
> +typedef struct BDRVNBDState BDRVNBDState;
> +
> +static int nbd_establish_connection(BDRVNBDState *s);
> +static void nbd_teardown_connection(BDRVNBDState *s);
> +
> +typedef struct AIOReq {
> +    NBDAIOCB *aiocb; /* Which QEMU operation this belongs to */
> +
> +    /* Where on the NBDAIOCB's iov does this request start? */
> +    off_t iov_offset;
> +
> +    /* The NBD request header pertaining to this AIOReq.
> +     * This specifies the handle of the request, the read offset and length.
> +     */
> +    NBDRequest nbd_req_hdr;
> +
> +    /* How many bytes have been written to the NBD server so far. This will
> +     * vary between 0 and sizeof(nbd_req_hdr) + nbd_req_hdr.len
> +     */
> +    size_t bytes_sent;
> +
> +    /* How many bytes have been read from the NBD server so far. Varies 
> between
> +     * 0 and sizeof(nbd_rsp_hdr) + nbd_req_hdr.len
> +     */
> +    size_t bytes_got;
> +
> +    /* Used to record this in the state object. waiting_sent is used to work
> +     * out which queue the AIOReq is in. Before it's been sent, it's in the
> +     * reqs_to_send_head. After being sent, if it's not current_req, it's in
> +     * reqs_for_reply_head.
> +     */
> +    QTAILQ_ENTRY(AIOReq) socket_siblings;
> +    bool waiting_sent;
> +
> +    /* Used to enter this into an NBDAIOCB */
> +    QLIST_ENTRY(AIOReq) aioreq_siblings;
> +} AIOReq;
> +
> +struct BDRVNBDState {
> +    /* File descriptor for the socket to the NBD server */
>      int sock;
> +
> +    /* Size of the file being served */
>      off_t size;
> +
> +    /* block size */
>      size_t blocksize;
> -    char *export_name; /* An NBD server may export several devices */
>  
>      /* If it begins with  '/', this is a UNIX domain socket. Otherwise,
>       * it's a string of the form <hostname|ip4|\[ip6\]>:port
>       */
>      char *host_spec;
> -} BDRVNBDState;
> +
> +    /* An NBD server may export several devices - this is the one we want*/
> +    char *export_name; 
> +
> +    /* Used to generate unique NBD handles */
> +    uint64_t aioreq_seq_num;
> +
> +    /* AIOReqs yet to be transmitted */
> +    QTAILQ_HEAD(reqs_to_send_head, AIOReq) reqs_to_send_head;
> +    
> +    /* AIOReqs that have been transmitted and are awaiting a reply */
> +    QTAILQ_HEAD(reqs_for_reply_head, AIOReq) reqs_for_reply_head;
> +
> +    /* AIOReq that is currently being read from the socket */
> +    AIOReq *current_req;
> +
> +    /* Used in aio_read_response. We may need to store received header bytes
> +     * between reads - we don't have an AIOReq at that point.
> +     */
> +    uint8_t nbd_rsp_buf[sizeof(NBDReply)];
> +    size_t nbd_rsp_offset;
> +
> +};
> +
> +enum AIOCBState {
> +    AIOCB_WRITE_UDATA,
> +    AIOCB_READ_UDATA,
> +};
> +
> +struct NBDAIOCB {
> +    BlockDriverAIOCB common;
> +    QEMUIOVector *qiov;
> +    QEMUBH *bh;
> +
> +    enum AIOCBState aiocb_type;
> +
> +    int64_t sector_num;
> +    int nb_sectors;
> +    int ret;
> +
> +    bool canceled;
> +
> +    void (*aio_done_func)(NBDAIOCB *);
> +
> +    QLIST_HEAD(aioreq_head, AIOReq) aioreq_head;
> +};
>  
>  static int nbd_config(BDRVNBDState *s, const char *filename, int flags)
>  {
> @@ -103,9 +239,307 @@ out:
>      return err;
>  }
>  
> -static int nbd_establish_connection(BlockDriverState *bs)
> +static inline AIOReq *nbd_alloc_aio_req(BDRVNBDState *s, NBDAIOCB *acb,
> +                                        size_t data_len,
> +                                        off_t offset,
> +                                        off_t iov_offset)
> +{
> +    AIOReq *aio_req;
> +
> +    aio_req = qemu_malloc(sizeof(*aio_req));
> +    aio_req->aiocb = acb;
> +    aio_req->iov_offset = iov_offset;
> +    aio_req->nbd_req_hdr.from = offset;
> +    aio_req->nbd_req_hdr.len = data_len;
> +    aio_req->nbd_req_hdr.handle = s->aioreq_seq_num++;
> +
> +    if(acb->aiocb_type == AIOCB_READ_UDATA) {
> +        aio_req->nbd_req_hdr.type = NBD_CMD_READ;
> +    } else {
> +        aio_req->nbd_req_hdr.type = NBD_CMD_WRITE;
> +    }
> +
> +    aio_req->bytes_sent = 0;
> +    aio_req->bytes_got = 0;
> +
> +    QTAILQ_INSERT_TAIL(&s->reqs_to_send_head, aio_req, socket_siblings);
> +    aio_req->waiting_sent = true;
> +    QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings);
> +    return aio_req;
> +}
> +
> +static int nbd_aio_flush_request(void *opaque)
> +{
> +    BDRVNBDState *s = opaque;
> +
> +    return !(QTAILQ_EMPTY(&s->reqs_to_send_head) &&
> +             QTAILQ_EMPTY(&s->reqs_for_reply_head) &&
> +             (s->current_req == NULL));
> +}
> +
> +static inline int free_aio_req(BDRVNBDState *s, AIOReq *aio_req)
> +{
> +    NBDAIOCB *acb = aio_req->aiocb;
> +    QLIST_REMOVE(aio_req, aioreq_siblings);
> +    qemu_free(aio_req);
> +
> +    return !QLIST_EMPTY(&acb->aioreq_head);
> +}
> +
> +static void nbd_finish_aiocb(NBDAIOCB *acb)
> +{
> +    acb->common.cb(acb->common.opaque, acb->ret);
> +    qemu_aio_release(acb);
> +}
> +
> +static void nbd_handle_io_err(BDRVNBDState *s, AIOReq *aio_req, int err)
> +{
> +    NBDAIOCB *acb;
> +    AIOReq *a;
> +
> +    /* These are fine - no need to do anything */
> +    if(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
> +        return;
> +    }
> +
> +    /* These errors mean the request failed. So we need to trash the acb
> +     * (and all associated AIOReqs) and return -EIO. Partial reads are
> +     * fine. Partial writes aren't great, but no worse than (say) a write
> +     * to a physical disc that hits a bad sector.
> +     */
> +    if (aio_req == NULL) {
> +        logout("Error %i (%s) on NBD I/O. Killing NBD\n", err, 
> strerror(err));
> +    } else {
> +        acb = aio_req->aiocb;
> +        logout("Error %i (%s) on NBD request (handle %lu). Killing NBD\n", 
> err,
> +               sterror(err), aio_req->nbd_req_hdr.handle);
> +        acb->ret = -EIO;

Why don't we use the real error code?

> +        QLIST_FOREACH(a, &acb->aioreq_head, aioreq_siblings) {
> +            if (aio_req->waiting_sent) {
> +                QTAILQ_REMOVE(&s->reqs_to_send_head, aio_req, 
> socket_siblings);
> +            } else {
> +                if (s->current_req == aio_req) {
> +                    s->current_req = NULL;
> +                } else {
> +                    QTAILQ_REMOVE(&s->reqs_for_reply_head, aio_req,
> +                                  socket_siblings);
> +                }
> +            }

I think this logic should be part of free_aio_req.

> +            free_aio_req(s, a);
> +        }
> +        nbd_finish_aiocb(acb);
> +    }
> +
> +    nbd_teardown_connection(s);

And now there's no way to get the disk back to life without a reboot? Do
I understand correctly that now trying to access the disk will always
return -EBADF?

> +    return;

Unnecessary return;

> +}
> +
> +static void nbd_aio_write_request(void *opaque)
> +{
> +    BDRVNBDState *s = opaque;
> +    AIOReq *aio_req = NULL;
> +    NBDAIOCB *acb;
> +    size_t total;
> +    ssize_t ret;
> +
> +    if (QTAILQ_EMPTY(&s->reqs_to_send_head)) {
> +        return;
> +    }
> +
> +    aio_req = QTAILQ_FIRST(&s->reqs_to_send_head);
> +    acb = aio_req->aiocb;
> +
> +    if(acb->aiocb_type == AIOCB_WRITE_UDATA) { 

Space after if is missing

> +        total = sizeof(NBDRequest) + aio_req->nbd_req_hdr.len;
> +    } else {
> +        total = sizeof(NBDRequest);
> +    }
> +
> +    /* Since we've not written (all of) the header yet, get on with it.
> +     * We always grab the *head* of the queue in this callback, so we
> +     * won't interleave writes to the socket.
> +     *
> +     * Creating the header buffer on the fly isn't ideal in the case of many
> +     * retries, but almost all the time, this will happen exactly once.
> +     */
> +    if (aio_req->bytes_sent < sizeof(NBDRequest)) {
> +        logout("Buffer not written in full, doing so\n");
> +        uint8_t buf[sizeof(NBDRequest)];
> +        QEMUIOVector hdr;
> +        nbd_request_to_buf(&aio_req->nbd_req_hdr, buf);
> +        qemu_iovec_init(&hdr, 1);
> +        qemu_iovec_add(&hdr, &buf, sizeof(NBDRequest));
> +        ret = writev(s->sock, hdr.iov, hdr.niov);
> +        qemu_iovec_destroy(&hdr);
> +
> +        if (ret == -1) {
> +            nbd_handle_io_err(s, aio_req, socket_error());
> +            return;
> +        } else {
> +            logout("Written %zu bytes to socket (request is %zu bytes)\n", 
> ret,
> +                   sizeof(NBDRequest));
> +            aio_req->bytes_sent += ret;
> +        }
> +    }
> +
> +    /* If the header is sent & we're doing a write request, send data */
> +    if (acb->aiocb_type == AIOCB_WRITE_UDATA &&  
> +        aio_req->bytes_sent >= sizeof(NBDRequest) &&  
> +        aio_req->bytes_sent < total) {
> +        logout("Write request - putting data in socket\n");
> +        off_t offset = (aio_req->bytes_sent - sizeof(NBDRequest)) + 
> +                        aio_req->iov_offset;
> +
> +        ret = nbd_wr_aio(s->sock, acb->qiov, total - aio_req->bytes_sent,
> +                         offset, false);
> +
> +        if (ret < 0 ) {

I guess this is the space that was missing above. :-)

> +            nbd_handle_io_err(s, aio_req, -ret);
> +            return;
> +        } else {
> +            logout("Written %zu bytes to socket\n", ret);
> +            aio_req->bytes_sent += ret;
> +        }
> +    }
> +
> +    /* Request written. aio_read_response gets the reply */
> +    if (aio_req->bytes_sent == total) {
> +      logout("aio_req written to socket, moving to reqs_for_reply\n");
> +      aio_req->waiting_sent = false;
> +      QTAILQ_REMOVE(&s->reqs_to_send_head, aio_req, socket_siblings);
> +      QTAILQ_INSERT_TAIL(&s->reqs_for_reply_head, aio_req, socket_siblings);
> +    }
> +
> +    return;
> +}
> +
> +static void nbd_aio_read_response(void *opaque)
> +{
> +    BDRVNBDState *s = opaque;
> +    AIOReq *aio_req = NULL;
> +    NBDAIOCB *acb;
> +    NBDReply rsp;
> +
> +    size_t total;
> +    ssize_t ret;
> +    int rest;
> +
> +    if (s->current_req == NULL && QTAILQ_EMPTY(&s->reqs_for_reply_head)) {
> +        return;
> +    }
> +
> +    /* Build our nbd_reply object if we've got it */
> +    if (s->current_req && (s->nbd_rsp_offset == sizeof(NBDReply))) {
> +        nbd_buf_to_reply((uint8_t *)&s->nbd_rsp_buf, &rsp);
> +    }
> +
> +    if (s->current_req == NULL) {
> +        /* Try to read a header */

Factor this whole block out in its own function?

> +        QEMUIOVector hdr;
> +        qemu_iovec_init(&hdr, 1);
> +        qemu_iovec_add(&hdr, ((&s->nbd_rsp_buf) + s->nbd_rsp_offset),
> +                              (sizeof(NBDReply) - s->nbd_rsp_offset));
> +        ret = readv(s->sock, hdr.iov, hdr.niov);
> +        qemu_iovec_destroy(&hdr);
> +
> +        if (ret == -1) {
> +          nbd_handle_io_err(s, aio_req, socket_error());
> +          return;
> +        }
> +
> +        s->nbd_rsp_offset += ret;
> +
> +        if (s->nbd_rsp_offset == sizeof(NBDReply)) {
> +            /* Turn data into NBDReply, find the matching aio_req */
> +            nbd_buf_to_reply((uint8_t *)&s->nbd_rsp_buf, &rsp);
> +
> +            /* Check the magic */
> +            if (rsp.magic != NBD_REPLY_MAGIC) {
> +                logout("Received invalid NBD response magic!\n");
> +                nbd_handle_io_err(s, NULL, EIO);
> +                return;
> +            }
> +
> +            QTAILQ_FOREACH(aio_req, &s->reqs_for_reply_head, 
> socket_siblings) {
> +                if (aio_req->nbd_req_hdr.handle == rsp.handle) {
> +                    s->current_req = aio_req;
> +                    s->current_req->bytes_got = sizeof(NBDReply);
> +                    break;
> +                }
> +            }
> +
> +            if (!s->current_req) {
> +                logout("cannot find aio_req for handle %lu\n", rsp.handle);
> +                nbd_handle_io_err(s, NULL, EIO);
> +                return;
> +            }
> +            QTAILQ_REMOVE(&s->reqs_for_reply_head, aio_req, socket_siblings);
> +        } else {
> +            /* We haven't finished reading the entire header yet. */
> +            return;
> +        }
> +    }
> +
> +    /* s->current_req and rsp are both usable now */
> +    aio_req = s->current_req;
> +    acb = aio_req->aiocb;
> +
> +    total = sizeof(NBDReply);
> +    if (acb->aiocb_type == AIOCB_READ_UDATA) {
> +        total += aio_req->nbd_req_hdr.len;
> +    }
> +
> +    if (acb->aiocb_type == AIOCB_READ_UDATA && aio_req->bytes_got < total) {
> +        off_t offset = (aio_req->bytes_got - sizeof(NBDReply)) + 
> +                        aio_req->iov_offset;
> +        QEMUIOVector *qiov = acb->qiov;
> +        uint8_t *buf = NULL;
> +
> +        if (acb->canceled) {
> +            buf = qemu_malloc(total - offset);
> +            qemu_iovec_init(qiov, 1);
> +            qemu_iovec_add(qiov, buf, total - offset);
> +        }
> +
> +        ret = nbd_wr_aio(s->sock, qiov, total - offset, offset, true);
> +
> +        if (acb->canceled) {
> +            qemu_iovec_destroy(qiov);
> +            qemu_free(buf);
> +        }
> +
> +        if (ret < 0) {
> +            nbd_handle_io_err(s, aio_req, -ret);
> +            return;
> +        } else {
> +            aio_req->bytes_got += ret;
> +        }
> +    }
> +
> +    /* Entire request has been read */
> +    if (total == aio_req->bytes_got) {
> +        logout("Read all bytes of the response; removing response from 
> s->current_req\n");
> +        s->nbd_rsp_offset = 0;
> +        s->current_req = NULL;
> +        if (rsp.error != 0) {
> +            acb->ret = -EIO;
> +            logout("NBD request resulted in error %i\n", rsp.error);
> +        }
> +
> +        /* Free the aio_req. If the NBDAIOCB is finished, notify QEMU */
> +        rest = free_aio_req(s, aio_req);
> +        if (!rest) {
> +            logout("acb complete\n");
> +            acb->aio_done_func(acb);
> +        }
> +    }
> +
> +    logout("Leaving nbd_aio_read_response\n");
> +    return;

Another useless return.

> +}
> +
> +static int nbd_establish_connection(BDRVNBDState *s)
>  {
> -    BDRVNBDState *s = bs->opaque;
>      int sock;
>      int ret;
>      off_t size;
> @@ -139,23 +573,29 @@ static int nbd_establish_connection(BlockDriverState 
> *bs)
>      s->sock = sock;
>      s->size = size;
>      s->blocksize = blocksize;
> +    s->nbd_rsp_offset = 0;
> +
> +    qemu_aio_set_fd_handler(sock, nbd_aio_read_response, 
> nbd_aio_write_request,
> +                            nbd_aio_flush_request, NULL, s);
>  
>      logout("Established connection with NBD server\n");
>      return 0;
>  }
>  
> -static void nbd_teardown_connection(BlockDriverState *bs)
> +static void nbd_teardown_connection(BDRVNBDState *s)
>  {
> -    BDRVNBDState *s = bs->opaque;
>      struct nbd_request request;
>  
>      request.type = NBD_CMD_DISC;
> -    request.handle = (uint64_t)(intptr_t)bs;
> +    request.handle = s->aioreq_seq_num++;
>      request.from = 0;
>      request.len = 0;
>      nbd_send_request(s->sock, &request);
>  
> +    qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL);
>      closesocket(s->sock);
> +    logout("Connection to NBD server closed\n");
> +    return;
>  }
>  
>  static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
> @@ -169,95 +609,193 @@ static int nbd_open(BlockDriverState *bs, const char* 
> filename, int flags)
>          return result;
>      }
>  
> +    QTAILQ_INIT(&s->reqs_to_send_head);
> +    QTAILQ_INIT(&s->reqs_for_reply_head);
> +
> +    s->current_req = NULL;
> +    s->aioreq_seq_num = 0;
> +    s->nbd_rsp_offset = 0;
> +
>      /* establish TCP connection, return error if it fails
>       * TODO: Configurable retry-until-timeout behaviour.
>       */
> -    result = nbd_establish_connection(bs);
> +    result = nbd_establish_connection(s);
>  
>      return result;
>  }
>  
> -static int nbd_read(BlockDriverState *bs, int64_t sector_num,
> -                    uint8_t *buf, int nb_sectors)
> +static void nbd_close(BlockDriverState *bs)
>  {
>      BDRVNBDState *s = bs->opaque;
> -    struct nbd_request request;
> -    struct nbd_reply reply;
>  
> -    request.type = NBD_CMD_READ;
> -    request.handle = (uint64_t)(intptr_t)bs;
> -    request.from = sector_num * 512;;
> -    request.len = nb_sectors * 512;
> +    nbd_teardown_connection(s);
> +    qemu_free(s->export_name);
> +    qemu_free(s->host_spec);
>  
> -    if (nbd_send_request(s->sock, &request) == -1)
> -        return -errno;
> +    return;
> +}
>  
> -    if (nbd_receive_reply(s->sock, &reply) == -1)
> -        return -errno;
> +/* We remove all the aiocbs currently sat in reqs_to_send_head (excepting the
> + * first, if any bytes have been transmitted). So we don't need to check for
> + * canceled in aio_write_request at all.If that finishes the acb, we call its
> + * completion function. Otherwise, we leave it alone. 
> + * 
> + * in nbd_aio_read_response, when we're handling a read request for an acb 
> with
> + * canceled = true, we allocate a QEMUIOVector of the appropriate size to do
> + * the read, and throw the bytes away. Everything else goes on as normal.
> + */
> +static void nbd_aio_cancel(BlockDriverAIOCB *blockacb) {
> +    NBDAIOCB *acb = (NBDAIOCB *)blockacb;
> +    BDRVNBDState *s = acb->common.bs->opaque;
> +    AIOReq *a;
> +
> +    QLIST_FOREACH(a, &acb->aioreq_head, aioreq_siblings) {
> +        if(a->waiting_sent && a->bytes_sent == 0) {
> +            QTAILQ_REMOVE(&s->reqs_to_send_head, a, socket_siblings);
> +            free_aio_req(s, a);
> +        }
> +    }
> +
> +    acb->canceled = true;
> +    acb->ret = -EIO;
> +
> +    if QLIST_EMPTY(&acb->aioreq_head) {
> +        nbd_finish_aiocb(acb);
> +    }
> +}
> +
> +static AIOPool nbd_aio_pool = {
> +    .aiocb_size = sizeof(NBDAIOCB),
> +    .cancel = nbd_aio_cancel,
> +};
> +
> +static NBDAIOCB *nbd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
> +                                   int64_t sector_num, int nb_sectors,
> +                                   BlockDriverCompletionFunc *cb, void 
> *opaque)
> +{
> +    NBDAIOCB *acb;
>  
> -    if (reply.error !=0)
> -        return -reply.error;
> +    acb = qemu_aio_get(&nbd_aio_pool, bs, cb, opaque);
>  
> -    if (reply.handle != request.handle)
> +    acb->qiov = qiov;
> +    acb->sector_num = sector_num;
> +    acb->nb_sectors = nb_sectors;
> +
> +    acb->canceled = false;
> +
> +    acb->aio_done_func = NULL;
> +    acb->bh = NULL;
> +    acb->ret = 0;
> +
> +    QLIST_INIT(&acb->aioreq_head);
> +    return acb;
> +}
> +
> +static int nbd_schedule_bh(QEMUBHFunc *cb, NBDAIOCB *acb)
> +{
> +    if (acb->bh) {
> +        logout("bug: %d %d\n", acb->aiocb_type, acb->aiocb_type);
>          return -EIO;
> +    }
>  
> -    if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len)
> +    acb->bh = qemu_bh_new(cb, acb);
> +    if (!acb->bh) {
> +        logout("oom: %d %d\n", acb->aiocb_type, acb->aiocb_type);
>          return -EIO;
> +    }
> +
> +    qemu_bh_schedule(acb->bh);
>  
>      return 0;
>  }
>  
> -static int nbd_write(BlockDriverState *bs, int64_t sector_num,
> -                     const uint8_t *buf, int nb_sectors)
> +static void nbd_readv_writev_bh_cb(void *p)
>  {
> -    BDRVNBDState *s = bs->opaque;
> -    struct nbd_request request;
> -    struct nbd_reply reply;
> +    NBDAIOCB *acb = p;
>  
> -    request.type = NBD_CMD_WRITE;
> -    request.handle = (uint64_t)(intptr_t)bs;
> -    request.from = sector_num * 512;;
> -    request.len = nb_sectors * 512;
> +    size_t len, done = 0;
> +    size_t total = acb->nb_sectors * SECTOR_SIZE;
>  
> -    if (nbd_send_request(s->sock, &request) == -1)
> -        return -errno;
> +    /* Where the read/write starts from */
> +    off_t offset = acb->sector_num * SECTOR_SIZE;
> +    BDRVNBDState *s = acb->common.bs->opaque;
>  
> -    if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len)
> -        return -EIO;
> +    AIOReq *aio_req;
> +    
> +    logout("Entering nbd_readv_writev_bh_cb\n");
>  
> -    if (nbd_receive_reply(s->sock, &reply) == -1)
> -        return -errno;
> +    qemu_bh_delete(acb->bh);
> +    acb->bh = NULL;
>  
> -    if (reply.error !=0)
> -        return -reply.error;
> +    while (done < total) {
> +        len = (total - done);
>  
> -    if (reply.handle != request.handle)
> -        return -EIO;
> +        /* Split read & write requests into segments if needed */
> +        if (acb->aiocb_type == AIOCB_READ_UDATA && len > NBD_MAX_READ) {
> +            len = NBD_MAX_READ;
> +        }
>  
> -    return 0;
> +        if (acb->aiocb_type == AIOCB_WRITE_UDATA && len > NBD_MAX_WRITE) {
> +            len = NBD_MAX_WRITE;
> +        }
> +        
> +        logout("Allocating an aio_req of %zu bytes\n", len);
> +        aio_req = nbd_alloc_aio_req(s, acb, len, offset + done, done);
> +
> +        done += len;
> +    }
> +
> +    if (QLIST_EMPTY(&acb->aioreq_head)) {
> +        logout("acb->ioreq_head empty, so finishing acb now\n");
> +        nbd_finish_aiocb(acb);
> +    }
> +    logout("Leaving nbd_readv_writev_bh_cb\n");
> +    return;
>  }
>  
> -static void nbd_close(BlockDriverState *bs)
> +static BlockDriverAIOCB *nbd_aio_readv(BlockDriverState *bs,
> +        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> +        BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    NBDAIOCB *acb;
> +
> +    acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
> +    acb->aiocb_type = AIOCB_READ_UDATA;
> +    acb->aio_done_func = nbd_finish_aiocb;
> +
> +    nbd_schedule_bh(nbd_readv_writev_bh_cb, acb);
> +    return &acb->common;
> +}
> +
> +static BlockDriverAIOCB *nbd_aio_writev(BlockDriverState *bs,
> +        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> +        BlockDriverCompletionFunc *cb, void *opaque)
>  {
> -    nbd_teardown_connection(bs);
> +    NBDAIOCB *acb;
> +
> +    acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
> +    acb->aiocb_type = AIOCB_WRITE_UDATA;
> +    acb->aio_done_func = nbd_finish_aiocb;
> +
> +    nbd_schedule_bh(nbd_readv_writev_bh_cb, acb);
> +    return &acb->common;
>  }
>  
>  static int64_t nbd_getlength(BlockDriverState *bs)
>  {
>      BDRVNBDState *s = bs->opaque;
> -
>      return s->size;
>  }
>  
>  static BlockDriver bdrv_nbd = {
> -    .format_name     = "nbd",
> -    .instance_size   = sizeof(BDRVNBDState),
> -    .bdrv_file_open  = nbd_open,
> -    .bdrv_read               = nbd_read,
> -    .bdrv_write              = nbd_write,
> -    .bdrv_close              = nbd_close,
> -    .bdrv_getlength  = nbd_getlength,
> -    .protocol_name   = "nbd",
> +    .format_name     = "nbd",
> +    .instance_size   = sizeof(BDRVNBDState),
> +    .bdrv_file_open  = nbd_open,
> +    .bdrv_aio_readv  = nbd_aio_readv,
> +    .bdrv_aio_writev = nbd_aio_writev,
> +    .bdrv_close      = nbd_close,
> +    .bdrv_getlength  = nbd_getlength,
> +    .protocol_name   = "nbd"
>  };
>  
>  static void bdrv_nbd_init(void)
> diff --git a/nbd.c b/nbd.c
> index a3e6d52..26d33c5 100644
> --- a/nbd.c
> +++ b/nbd.c
> @@ -31,7 +31,7 @@
>  
>  #include "qemu_socket.h"
>  
> -//#define DEBUG_NBD
> +#define DEBUG_NBD
>  
>  #ifdef DEBUG_NBD
>  #define TRACE(msg, ...) do { \
> @@ -49,10 +49,6 @@
>  
>  /* This is all part of the "official" NBD API */
>  
> -#define NBD_REPLY_SIZE          (4 + 4 + 8)
> -#define NBD_REQUEST_MAGIC       0x25609513
> -#define NBD_REPLY_MAGIC         0x67446698
> -
>  #define NBD_SET_SOCK            _IO(0xab, 0)
>  #define NBD_SET_BLKSIZE         _IO(0xab, 1)
>  #define NBD_SET_SIZE            _IO(0xab, 2)
> @@ -107,6 +103,30 @@ size_t nbd_wr_sync(int fd, void *buffer, size_t size, 
> bool do_read)
>      return offset;
>  }
>  
> +ssize_t nbd_wr_aio(int fd, QEMUIOVector *qiov, size_t len, off_t offset,
> +                   bool do_read)

Isn't this name misleading? The function is completely synchronous. It
just happens not to block because it's only called when the socket is ready.

Kevin



reply via email to

[Prev in Thread] Current Thread [Next in Thread]