qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH 2/2] NBD: Convert the NBD driver to use the AIO inte


From: nick
Subject: [Qemu-devel] [PATCH 2/2] NBD: Convert the NBD driver to use the AIO interface.
Date: Thu, 28 Apr 2011 16:20:02 +0100

From: Nick Thomas <address@hidden>

This preserves the previous behaviour where the NBD server is
unavailable or goes away during guest execution, but switches the
NBD backend to present the AIO interface instead of the sync IO
interface.

We also split read & write requests into 1 MiB blocks (minus header).
This is a hard limit in various NBD servers (including qemu-nbd), but
never seemed to come up with the previous backend code.

All IO except setup and teardown is asynchronous, and we (in theory)
handle canceled requests properly too.

Automatic reconnect logic is TODO, and will be coming in a future
patch.

We register the nbd_aio_write_request callback only whilst there are
outstanding aio_reqs that haven't had an NBD request sent over the
wire for them yet. Whenever the send queue is emptied, we unregister
the callback - this avoids a tight loop in vl.c

Signed-off-by: Nick Thomas <address@hidden>
---
 block/nbd.c |  727 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
 nbd.c       |   74 +++++--
 nbd.h       |   13 +-
 3 files changed, 732 insertions(+), 82 deletions(-)

diff --git a/block/nbd.c b/block/nbd.c
index 7a52f62..8986ccf 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -34,7 +34,12 @@
 #include <sys/types.h>
 #include <unistd.h>
 
-#define EN_OPTSTR ":exportname="
+#define EN_OPTSTR   ":exportname="
+#define SECTOR_SIZE 512
+
+/* 1MiB minus header size */
+#define NBD_MAX_READ      ((1024*1024) - sizeof(NBDReply))
+#define NBD_MAX_WRITE     ((1024*1024) - sizeof(NBDRequest))
 
 /* #define DEBUG_NBD */
 
@@ -45,17 +50,161 @@
 #define logout(fmt, ...) ((void)0)
 #endif
 
-typedef struct BDRVNBDState {
+/*
+ * Here's how the I/O works.
+ * qemu creates a BDRVNBDState for us, which is the context for all reads
+ * and writes.
+ *
+ * nbd_open is called to connect to the NBD server and set up an on-read
+ * handler (nbd_aio_read_response)
+ *
+ * nbd_aio_readv/writev, called by qemu, create an NBDAIOCB (representing the
+ * I/O request to qemu).
+ * For read requests, read/writev creates a single AIOReq containing the NBD
+ * header. For write requests, 1 or more AIOReqs are created, containing the
+ * NBD header and the write data. These are pushed to reqs_to_send_head in the
+ * BDRVNBDState and the list in the NBDAIOCB. We then register a write request
+ * callback, which results in nbd_aio_write_request being called from the
+ * select() in vlc:main_loop_wait
+ *
+ * Each time nbd_aio_write_request is called, it gets the first AIOReq in the
+ * reqs_to_send_head and writes the data to the socket.
+ * If this results in the whole AIOReq being written to the socket, it moves
+ * the AIOReq to the reqs_for_reply_head in the BDRVNBDState. If the AIOReq
+ * isn't finished, then it's left where it is. to have more of it written
+ * next time. Before exiting, we unregister the write request handler if the
+ * reqs_to_send_head queue is empty. This avoids a tight loop around the
+ * aforementioned select (since the socket is almost always ready for writing).
+ *
+ * If there's an unrecoverable error writing to the socket, we disconnect and
+ * return the entire acb with that error.
+ *
+ * Each nbd_aio_read_response, we check the BDRVNBDState's current_req 
attribute
+ * to see if we're in the middle of a read. If not, we read a header's worth of
+ * data, then try to find an AIOReq in the reqs_for_reply_head. If we don't
+ * find one, that is very odd, so we teardown the connection and return an
+ * I/O error.
+ *
+ * Once we have our AIOReq, we remove it from reqs_for_reply_head and put it
+ * in the current_req attribute, then read from the socket to the buffer (if
+ * needed). If that completes the AIOReq, we clear the current_req attribute
+ * and deallocate the AIOReq.
+ *   - If the AIOReq is complete, and that's the last one for the NBDAIOCB, we
+ *     call the 'done' callback' and return.
+ *   - If the AIOReq isn't complete, we just return. It'll be completed in
+ *     future callbacks, since it's now the current_req
+ *   - If there's an unrecoverable error reading from the socket (EBADF, say).
+ *     we invalidate the AIOReq and teardown the connection.
+ *
+ * Currently, there is no reconnection logic, meaning that once the connection
+ * has been broken or an error condition has occured, the only way to regain
+ * functionality is to call nbd_close() and nbd_open - disconnect & reconnect
+ * the drive, or restart the whole process. There's plenty of scope to improve
+ * upon that.
+ */
+
+typedef struct NBDAIOCB NBDAIOCB;
+typedef struct BDRVNBDState BDRVNBDState;
+
+static int nbd_establish_connection(BDRVNBDState *s);
+static void nbd_teardown_connection(BDRVNBDState *s);
+static void nbd_register_write_request_handler(BDRVNBDState *s);
+static void nbd_unregister_write_request_handler(BDRVNBDState *s);
+
+typedef struct AIOReq {
+    NBDAIOCB *aiocb; /* Which QEMU operation this belongs to */
+
+    /* Where on the NBDAIOCB's iov does this request start? */
+    off_t iov_offset;
+
+    /* The NBD request header pertaining to this AIOReq.
+     * This specifies the handle of the request, the read offset and length.
+     */
+    NBDRequest nbd_req_hdr;
+
+    /* How many bytes have been written to the NBD server so far. This will
+     * vary between 0 and sizeof(nbd_req_hdr) + nbd_req_hdr.len
+     */
+    size_t bytes_sent;
+
+    /* How many bytes *of the payload* have been read from the NBD server so
+     * far. Varies between 0 and nbd_req_hdr.len - header byte count is kept in
+     * BDRVNBDState->nbd_rsp_offset.
+     */
+    size_t bytes_got;
+
+    /* Used to record this in the state object. waiting_sent is used to work
+     * out which queue the AIOReq is in. Before it's been sent, it's in the
+     * reqs_to_send_head. After being sent, if it's not current_req, it's in
+     * reqs_for_reply_head.
+     */
+    QTAILQ_ENTRY(AIOReq) socket_siblings;
+    bool waiting_sent;
+
+    /* Used to enter this into an NBDAIOCB */
+    QLIST_ENTRY(AIOReq) aioreq_siblings;
+} AIOReq;
+
+struct BDRVNBDState {
+    /* File descriptor for the socket to the NBD server */
     int sock;
+
+    /* Size of the file being served */
     off_t size;
+
+    /* block size */
     size_t blocksize;
-    char *export_name; /* An NBD server may export several devices */
 
     /* If it begins with  '/', this is a UNIX domain socket. Otherwise,
      * it's a string of the form <hostname|ip4|\[ip6\]>:port
      */
     char *host_spec;
-} BDRVNBDState;
+
+    /* An NBD server may export several devices - this is the one we want */
+    char *export_name;
+
+    /* Used to generate unique NBD handles */
+    uint64_t aioreq_seq_num;
+
+    /* AIOReqs yet to be transmitted */
+    QTAILQ_HEAD(reqs_to_send_head, AIOReq) reqs_to_send_head;
+
+    /* AIOReqs that have been transmitted and are awaiting a reply */
+    QTAILQ_HEAD(reqs_for_reply_head, AIOReq) reqs_for_reply_head;
+
+    /* AIOReq that is currently being read from the socket */
+    AIOReq *current_req;
+
+    /* Used in nbd_aio_read_response. We may need to store received header 
bytes
+     * between reads - we don't have an AIOReq at that point.
+     */
+    uint8_t nbd_rsp_buf[sizeof(NBDReply)];
+    size_t nbd_rsp_offset;
+
+};
+
+enum AIOCBState {
+    AIOCB_WRITE_UDATA,
+    AIOCB_READ_UDATA,
+};
+
+struct NBDAIOCB {
+    BlockDriverAIOCB common;
+    QEMUIOVector *qiov;
+    QEMUBH *bh;
+
+    enum AIOCBState aiocb_type;
+
+    int64_t sector_num;
+    int nb_sectors;
+    int ret;
+
+    bool canceled;
+
+    void (*aio_done_func)(NBDAIOCB *);
+
+    QLIST_HEAD(aioreq_head, AIOReq) aioreq_head;
+};
 
 static int nbd_config(BDRVNBDState *s, const char *filename, int flags)
 {
@@ -103,9 +252,342 @@ out:
     return err;
 }
 
-static int nbd_establish_connection(BlockDriverState *bs)
+static inline AIOReq *nbd_alloc_aio_req(BDRVNBDState *s, NBDAIOCB *acb,
+                                        size_t data_len,
+                                        off_t offset,
+                                        off_t iov_offset)
+{
+    AIOReq *aio_req;
+
+    aio_req = qemu_malloc(sizeof(*aio_req));
+    aio_req->aiocb = acb;
+    aio_req->iov_offset = iov_offset;
+    aio_req->nbd_req_hdr.from = offset;
+    aio_req->nbd_req_hdr.len = data_len;
+    aio_req->nbd_req_hdr.handle = s->aioreq_seq_num++;
+
+    if (acb->aiocb_type == AIOCB_READ_UDATA) {
+        aio_req->nbd_req_hdr.type = NBD_CMD_READ;
+    } else {
+        aio_req->nbd_req_hdr.type = NBD_CMD_WRITE;
+    }
+
+    aio_req->bytes_sent = 0;
+    aio_req->bytes_got = 0;
+
+    QTAILQ_INSERT_TAIL(&s->reqs_to_send_head, aio_req, socket_siblings);
+    aio_req->waiting_sent = true;
+    QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings);
+    return aio_req;
+}
+
+static int nbd_aio_flush_request(void *opaque)
+{
+    BDRVNBDState *s = opaque;
+    int resp;
+
+    resp = !(QTAILQ_EMPTY(&s->reqs_to_send_head) &&
+             QTAILQ_EMPTY(&s->reqs_for_reply_head) &&
+             (s->current_req == NULL));
+    logout("flush_request: %i\n", resp);
+    return resp;
+}
+
+static inline bool free_aio_req(BDRVNBDState *s, AIOReq *aio_req)
+{
+    NBDAIOCB *acb = aio_req->aiocb;
+
+    if (aio_req->waiting_sent) {
+        QTAILQ_REMOVE(&s->reqs_to_send_head, aio_req, socket_siblings);
+    }
+
+    if (s->current_req == aio_req) {
+        s->current_req = NULL;
+    } else {
+        QTAILQ_REMOVE(&s->reqs_for_reply_head, aio_req, socket_siblings);
+    }
+
+    QLIST_REMOVE(aio_req, aioreq_siblings);
+    qemu_free(aio_req);
+
+    return !QLIST_EMPTY(&acb->aioreq_head);
+}
+
+static void nbd_finish_aiocb(NBDAIOCB *acb)
+{
+    acb->common.cb(acb->common.opaque, acb->ret);
+    qemu_aio_release(acb);
+}
+
+static void nbd_handle_io_err(BDRVNBDState *s, AIOReq *aio_req, int err)
+{
+    NBDAIOCB *acb;
+    AIOReq *a;
+
+    /* These are fine - no need to do anything */
+    if (err == EAGAIN || err == EWOULDBLOCK || err == EINTR) {
+        logout("Recoverable error %i (%s) - returning\n", err, strerror(err));
+        return;
+    }
+
+    /* These errors mean the request failed. So we need to trash the acb
+     * (and all associated AIOReqs) and return the error. Partial reads are
+     * fine. Partial writes aren't great, but no worse than (say) a write
+     * to a physical disc that hits a bad sector.
+     */
+    if (aio_req == NULL) {
+        logout("Error %i (%s) on NBD I/O. Killing NBD\n", err, strerror(err));
+    } else {
+        acb = aio_req->aiocb;
+        logout("Error %i (%s) on NBD request (handle %lu). Killing NBD\n", err,
+               strerror(err), aio_req->nbd_req_hdr.handle);
+        acb->ret = -err;
+        QLIST_FOREACH(a, &acb->aioreq_head, aioreq_siblings) {
+            free_aio_req(s, a);
+        }
+        nbd_finish_aiocb(acb);
+    }
+
+    nbd_teardown_connection(s);
+}
+
+static void nbd_aio_write_request(void *opaque)
+{
+    BDRVNBDState *s = opaque;
+    AIOReq *aio_req = NULL;
+    NBDAIOCB *acb;
+    size_t total;
+    ssize_t ret;
+
+    if (QTAILQ_EMPTY(&s->reqs_to_send_head)) {
+        logout("Nothing to do in aio_write_request so unregistering 
handler\n");
+        nbd_unregister_write_request_handler(s);
+        return;
+    }
+
+    aio_req = QTAILQ_FIRST(&s->reqs_to_send_head);
+    acb = aio_req->aiocb;
+
+    if (acb->aiocb_type == AIOCB_WRITE_UDATA) {
+        total = sizeof(NBDRequest) + aio_req->nbd_req_hdr.len;
+    } else {
+        total = sizeof(NBDRequest);
+    }
+
+    /* Since we've not written (all of) the header yet, get on with it.
+     * We always grab the *head* of the queue in this callback, so we
+     * won't interleave writes to the socket.
+     *
+     * Creating the header buffer on the fly isn't ideal in the case of many
+     * retries, but almost all the time, this will happen exactly once.
+     */
+    if (aio_req->bytes_sent < sizeof(NBDRequest)) {
+        logout("Buffer not written in full, doing so\n");
+        uint8_t buf[sizeof(NBDRequest)];
+        QEMUIOVector hdr;
+        nbd_request_to_buf(&aio_req->nbd_req_hdr, buf);
+        qemu_iovec_init(&hdr, 1);
+        qemu_iovec_add(&hdr, &buf, sizeof(NBDRequest));
+        ret = writev(s->sock, hdr.iov, hdr.niov);
+        qemu_iovec_destroy(&hdr);
+
+        if (ret == -1) {
+            nbd_handle_io_err(s, aio_req, socket_error());
+            return;
+        } else {
+            logout("Written %zu bytes to socket (request is %zu bytes)\n", ret,
+                   sizeof(NBDRequest));
+            aio_req->bytes_sent += ret;
+        }
+    }
+
+    /* If the header is sent & we're doing a write request, send data */
+    if (acb->aiocb_type == AIOCB_WRITE_UDATA &&
+        aio_req->bytes_sent >= sizeof(NBDRequest) &&
+        aio_req->bytes_sent < total) {
+        logout("Write request - putting data in socket\n");
+        off_t offset = (aio_req->bytes_sent - sizeof(NBDRequest)) +
+                        aio_req->iov_offset;
+
+        ret = nbd_qiov_wr(s->sock, acb->qiov, total - aio_req->bytes_sent,
+                          offset, false);
+
+        if (ret < 0) {
+            nbd_handle_io_err(s, aio_req, -ret);
+            return;
+        } else {
+            logout("Written %zu bytes to socket\n", ret);
+            aio_req->bytes_sent += ret;
+        }
+    }
+
+    /* Request written. nbd_aio_read_response gets the reply */
+    if (aio_req->bytes_sent == total) {
+        logout("aio_req written to socket, moving to reqs_for_reply\n");
+        aio_req->waiting_sent = false;
+        QTAILQ_REMOVE(&s->reqs_to_send_head, aio_req, socket_siblings);
+        QTAILQ_INSERT_TAIL(&s->reqs_for_reply_head, aio_req, socket_siblings);
+    }
+
+    if (QTAILQ_EMPTY(&s->reqs_to_send_head)) {
+        logout("Write queue empty, unregistering write request handler\n");
+        nbd_unregister_write_request_handler(s);
+    }
+
+}
+
+static inline bool nbd_find_next_aioreq(BDRVNBDState *s)
+{
+    uint8_t *buf = &s->nbd_rsp_buf[s->nbd_rsp_offset];
+    size_t cnt = sizeof(NBDReply) - s->nbd_rsp_offset;
+    ssize_t ret;
+    NBDReply rsp;
+    AIOReq *aio_req;
+
+    /* Try to get enough bytes so we have a complete NBDReply */
+    ret = read(s->sock, buf, cnt);
+    logout("read %zu bytes\n", ret);
+
+    /* I/O error means we've failed. */
+    if (ret == -1) {
+        nbd_handle_io_err(s, aio_req, socket_error());
+        return false;
+    }
+
+    s->nbd_rsp_offset += ret;
+
+    /* We don't have enough data to make a full header */
+    if (s->nbd_rsp_offset < sizeof(NBDReply)) {
+        return false;
+    }
+
+    /* Turn data into NBDReply, find the matching aio_req */
+    nbd_buf_to_reply(&s->nbd_rsp_buf[0], &rsp);
+
+    /* Check the magic */
+    if (rsp.magic != NBD_REPLY_MAGIC) {
+        logout("Received invalid NBD response magic!\n");
+        nbd_handle_io_err(s, NULL, EIO);
+        return false;
+    }
+
+    QTAILQ_FOREACH(aio_req, &s->reqs_for_reply_head, socket_siblings) {
+        if (aio_req->nbd_req_hdr.handle == rsp.handle) {
+            s->current_req = aio_req;
+            break;
+        }
+    }
+
+    if (s->current_req) {
+        /* Mission accomplished! */
+        logout("Found next aio_req\n");
+        QTAILQ_REMOVE(&s->reqs_for_reply_head, aio_req, socket_siblings);
+        return true;
+    }
+
+    /* The handle in the reply head doesn't match any AIOReq. Fail. */
+    logout("cannot find aio_req for handle %lu\n", rsp.handle);
+    nbd_handle_io_err(s, NULL, EIO);
+    return false;
+}
+
+static void nbd_aio_read_response(void *opaque)
+{
+    BDRVNBDState *s = opaque;
+    uint8_t *buf = NULL; /* Used if the aiocb has been canceled */
+    AIOReq *aio_req = NULL;
+    NBDAIOCB *acb;
+    NBDReply rsp;
+
+    size_t total = 0; /* number of payload bytes read */
+    ssize_t ret;
+    int rest;
+
+    /* We're not in the middle of a request */
+    if (s->current_req == NULL) {
+        /* No outstanding requests */
+        if (QTAILQ_EMPTY(&s->reqs_for_reply_head)) {
+            logout("No request outstanding, exiting\n");
+            return;
+        }
+
+        /* Couldn't grab the next aioreq */
+        if (!nbd_find_next_aioreq(s)) {
+            logout("Failed to find a new aio_req to work on, exiting\n");
+            return;
+        }
+    }
+
+    /* From here on, s->current_req and s->nbd_rsp_buf are known to be good */
+    nbd_buf_to_reply(&s->nbd_rsp_buf[0], &rsp);
+    aio_req = s->current_req;
+    acb = aio_req->aiocb;
+
+    /* NBD server returned an error for this operation */
+    if (rsp.error != 0) {
+        logout("NBD request resulted in error: %i\n", rsp.error);
+        acb->ret = -EIO;
+
+        rest = free_aio_req(s, aio_req);
+        if (!rest) {
+            logout("Signalling completion for this ACB\n");
+            acb->aio_done_func(acb);
+        }
+
+        return;
+    }
+
+    if (acb->aiocb_type == AIOCB_READ_UDATA) {
+        total = aio_req->nbd_req_hdr.len;
+    }
+
+    if (acb->aiocb_type == AIOCB_READ_UDATA && aio_req->bytes_got < total) {
+
+        size_t remaining = total - aio_req->bytes_got;
+        QEMUIOVector *qiov = acb->qiov;
+        off_t qiov_offset = aio_req->bytes_got + aio_req->iov_offset;
+
+        if (acb->canceled) {
+            buf = qemu_malloc(remaining);
+            qemu_iovec_init(qiov, 1);
+            qemu_iovec_add(qiov, buf, remaining);
+            qiov_offset = 0;
+        }
+
+        ret = nbd_qiov_wr(s->sock, qiov, remaining, qiov_offset, true);
+        logout("Read %zu of %zu bytes remaining\n", ret, remaining);
+
+        if (acb->canceled) {
+            qemu_iovec_destroy(qiov);
+            qemu_free(buf);
+        }
+
+        if (ret < 0) {
+            nbd_handle_io_err(s, aio_req, -ret);
+            return;
+        }
+
+        aio_req->bytes_got += ret;
+    }
+
+    /* Entire request has been read */
+    if (total == aio_req->bytes_got) {
+        logout("Read all bytes of the response; clearing s->current_req\n");
+        s->nbd_rsp_offset = 0;
+
+        /* Free the aio_req. If the NBDAIOCB is finished, notify QEMU */
+        rest = free_aio_req(s, aio_req);
+        if (!rest) {
+            logout("acb complete\n");
+            acb->aio_done_func(acb);
+        }
+    }
+
+    logout("Leaving function\n");
+}
+
+static int nbd_establish_connection(BDRVNBDState *s)
 {
-    BDRVNBDState *s = bs->opaque;
     int sock;
     int ret;
     off_t size;
@@ -139,23 +621,30 @@ static int nbd_establish_connection(BlockDriverState *bs)
     s->sock = sock;
     s->size = size;
     s->blocksize = blocksize;
+    s->nbd_rsp_offset = 0;
+
+    qemu_aio_set_fd_handler(sock, nbd_aio_read_response, NULL,
+                            nbd_aio_flush_request, NULL, s);
 
     logout("Established connection with NBD server\n");
     return 0;
 }
 
-static void nbd_teardown_connection(BlockDriverState *bs)
+static void nbd_teardown_connection(BDRVNBDState *s)
 {
-    BDRVNBDState *s = bs->opaque;
     struct nbd_request request;
 
     request.type = NBD_CMD_DISC;
-    request.handle = (uint64_t)(intptr_t)bs;
+    request.handle = s->aioreq_seq_num++;
     request.from = 0;
     request.len = 0;
     nbd_send_request(s->sock, &request);
 
+    qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL);
     closesocket(s->sock);
+    s->sock = -1;
+
+    logout("Connection to NBD server closed\n");
 }
 
 static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
@@ -169,99 +658,217 @@ static int nbd_open(BlockDriverState *bs, const char* 
filename, int flags)
         return result;
     }
 
+    QTAILQ_INIT(&s->reqs_to_send_head);
+    QTAILQ_INIT(&s->reqs_for_reply_head);
+
+    s->current_req = NULL;
+    s->aioreq_seq_num = 0;
+    s->nbd_rsp_offset = 0;
+    s->sock = -1;
+
     /* establish TCP connection, return error if it fails
      * TODO: Configurable retry-until-timeout behaviour.
      */
-    result = nbd_establish_connection(bs);
+    result = nbd_establish_connection(s);
 
     return result;
 }
 
-static int nbd_read(BlockDriverState *bs, int64_t sector_num,
-                    uint8_t *buf, int nb_sectors)
+static void nbd_close(BlockDriverState *bs)
 {
     BDRVNBDState *s = bs->opaque;
-    struct nbd_request request;
-    struct nbd_reply reply;
+    qemu_free(s->export_name);
+    qemu_free(s->host_spec);
 
-    request.type = NBD_CMD_READ;
-    request.handle = (uint64_t)(intptr_t)bs;
-    request.from = sector_num * 512;;
-    request.len = nb_sectors * 512;
+    nbd_teardown_connection(s);
+}
 
-    if (nbd_send_request(s->sock, &request) == -1)
-        return -errno;
+static void nbd_register_write_request_handler(BDRVNBDState *s)
+{
+    int sock = s->sock;
+    if (sock == -1) {
+        logout("Register write request handler tried when socket closed\n");
+        return;
+    }
 
-    if (nbd_receive_reply(s->sock, &reply) == -1)
-        return -errno;
+    qemu_aio_set_fd_handler(sock, nbd_aio_read_response, nbd_aio_write_request,
+                            nbd_aio_flush_request, NULL, s);
+}
 
-    if (reply.error !=0)
-        return -reply.error;
+static void nbd_unregister_write_request_handler(BDRVNBDState *s)
+{
+    int sock = s->sock;
+    if (s->sock == -1) { 
+        logout("Unregister write request handler tried when socket closed\n");
+        return;
+    }
+
+    qemu_aio_set_fd_handler(s->sock, nbd_aio_read_response, NULL,
+                            nbd_aio_flush_request, NULL, s);
+}
+
+/* We remove all the aiocbs currently sat in reqs_to_send_head (excepting the
+ * first, if any bytes have been transmitted). So we don't need to check for
+ * canceled in aio_write_request at all.If that finishes the acb, we call its
+ * completion function. Otherwise, we leave it alone.
+ *
+ * in nbd_aio_read_response, when we're handling a read request for an acb with
+ * canceled = true, we allocate a QEMUIOVector of the appropriate size to do
+ * the read, and throw the bytes away. Everything else goes on as normal.
+ */
+static void nbd_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+    NBDAIOCB *acb = (NBDAIOCB *)blockacb;
+    BDRVNBDState *s = acb->common.bs->opaque;
+    AIOReq *a;
+
+    QLIST_FOREACH(a, &acb->aioreq_head, aioreq_siblings) {
+        free_aio_req(s, a);
+    }
+
+    acb->canceled = true;
+    acb->ret = -EIO;
+
+    if QLIST_EMPTY(&acb->aioreq_head) {
+        nbd_finish_aiocb(acb);
+    }
+}
 
-    if (reply.handle != request.handle)
+static AIOPool nbd_aio_pool = {
+    .aiocb_size = sizeof(NBDAIOCB),
+    .cancel = nbd_aio_cancel,
+};
+
+static NBDAIOCB *nbd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
+                                   int64_t sector_num, int nb_sectors,
+                                   BlockDriverCompletionFunc *cb, void *opaque)
+{
+    NBDAIOCB *acb;
+
+    acb = qemu_aio_get(&nbd_aio_pool, bs, cb, opaque);
+
+    acb->qiov = qiov;
+    acb->sector_num = sector_num;
+    acb->nb_sectors = nb_sectors;
+
+    acb->canceled = false;
+
+    acb->aio_done_func = NULL;
+    acb->bh = NULL;
+    acb->ret = 0;
+
+    QLIST_INIT(&acb->aioreq_head);
+    return acb;
+}
+
+static int nbd_schedule_bh(QEMUBHFunc *cb, NBDAIOCB *acb)
+{
+    if (acb->bh) {
+        logout("bug: %d %d\n", acb->aiocb_type, acb->aiocb_type);
         return -EIO;
+    }
 
-    if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len)
+    acb->bh = qemu_bh_new(cb, acb);
+    if (!acb->bh) {
+        logout("oom: %d %d\n", acb->aiocb_type, acb->aiocb_type);
         return -EIO;
+    }
+
+    qemu_bh_schedule(acb->bh);
 
     return 0;
 }
 
-static int nbd_write(BlockDriverState *bs, int64_t sector_num,
-                     const uint8_t *buf, int nb_sectors)
+static void nbd_readv_writev_bh_cb(void *p)
 {
-    BDRVNBDState *s = bs->opaque;
-    struct nbd_request request;
-    struct nbd_reply reply;
+    NBDAIOCB *acb = p;
 
-    request.type = NBD_CMD_WRITE;
-    request.handle = (uint64_t)(intptr_t)bs;
-    request.from = sector_num * 512;;
-    request.len = nb_sectors * 512;
+    size_t len, done = 0;
+    size_t total = acb->nb_sectors * SECTOR_SIZE;
 
-    if (nbd_send_request(s->sock, &request) == -1)
-        return -errno;
+    /* Where the read/write starts from */
+    off_t offset = acb->sector_num * SECTOR_SIZE;
+    BDRVNBDState *s = acb->common.bs->opaque;
 
-    if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len)
-        return -EIO;
+    AIOReq *aio_req;
 
-    if (nbd_receive_reply(s->sock, &reply) == -1)
-        return -errno;
+    logout("Entering nbd_readv_writev_bh_cb\n");
 
-    if (reply.error !=0)
-        return -reply.error;
+    qemu_bh_delete(acb->bh);
+    acb->bh = NULL;
 
-    if (reply.handle != request.handle)
-        return -EIO;
+    while (done < total) {
+        len = (total - done);
 
-    return 0;
+        /* Split read & write requests into segments if needed */
+        if (acb->aiocb_type == AIOCB_READ_UDATA && len > NBD_MAX_READ) {
+            len = NBD_MAX_READ;
+        }
+
+        if (acb->aiocb_type == AIOCB_WRITE_UDATA && len > NBD_MAX_WRITE) {
+            len = NBD_MAX_WRITE;
+        }
+
+        logout("Allocating an aio_req of %zu bytes\n", len);
+        aio_req = nbd_alloc_aio_req(s, acb, len, offset + done, done);
+
+        done += len;
+    }
+
+    if (QLIST_EMPTY(&acb->aioreq_head)) {
+        logout("acb->ioreq_head empty, so finishing acb now\n");
+        nbd_finish_aiocb(acb);
+    } else {
+      logout("Requests to make - registering write request callback\n");
+      nbd_register_write_request_handler(s);
+    }
+
+    logout("Leaving nbd_readv_writev_bh_cb\n");
 }
 
-static void nbd_close(BlockDriverState *bs)
+static BlockDriverAIOCB *nbd_aio_readv(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
 {
-    BDRVNBDState *s = bs->opaque;
-    qemu_free(s->export_name);
-    qemu_free(s->host_spec);
+    NBDAIOCB *acb;
 
-    nbd_teardown_connection(bs);
+    acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+    acb->aiocb_type = AIOCB_READ_UDATA;
+    acb->aio_done_func = nbd_finish_aiocb;
+
+    nbd_schedule_bh(nbd_readv_writev_bh_cb, acb);
+    return &acb->common;
+}
+
+static BlockDriverAIOCB *nbd_aio_writev(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    NBDAIOCB *acb;
+
+    acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+    acb->aiocb_type = AIOCB_WRITE_UDATA;
+    acb->aio_done_func = nbd_finish_aiocb;
+
+    nbd_schedule_bh(nbd_readv_writev_bh_cb, acb);
+    return &acb->common;
 }
 
 static int64_t nbd_getlength(BlockDriverState *bs)
 {
     BDRVNBDState *s = bs->opaque;
-
     return s->size;
 }
 
 static BlockDriver bdrv_nbd = {
-    .format_name       = "nbd",
-    .instance_size     = sizeof(BDRVNBDState),
-    .bdrv_file_open    = nbd_open,
-    .bdrv_read         = nbd_read,
-    .bdrv_write                = nbd_write,
-    .bdrv_close                = nbd_close,
-    .bdrv_getlength    = nbd_getlength,
-    .protocol_name     = "nbd",
+    .format_name     = "nbd",
+    .instance_size   = sizeof(BDRVNBDState),
+    .bdrv_file_open  = nbd_open,
+    .bdrv_aio_readv  = nbd_aio_readv,
+    .bdrv_aio_writev = nbd_aio_writev,
+    .bdrv_close      = nbd_close,
+    .bdrv_getlength  = nbd_getlength,
+    .protocol_name   = "nbd"
 };
 
 static void bdrv_nbd_init(void)
diff --git a/nbd.c b/nbd.c
index 0dcd86b..48b80a9 100644
--- a/nbd.c
+++ b/nbd.c
@@ -49,10 +49,6 @@
 
 /* This is all part of the "official" NBD API */
 
-#define NBD_REPLY_SIZE          (4 + 4 + 8)
-#define NBD_REQUEST_MAGIC       0x25609513
-#define NBD_REPLY_MAGIC         0x67446698
-
 #define NBD_SET_SOCK            _IO(0xab, 0)
 #define NBD_SET_BLKSIZE         _IO(0xab, 1)
 #define NBD_SET_SIZE            _IO(0xab, 2)
@@ -107,6 +103,30 @@ size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool 
do_read)
     return offset;
 }
 
+ssize_t nbd_qiov_wr(int fd, QEMUIOVector *qiov, size_t len, off_t offset,
+                    bool do_read)
+{
+    ssize_t ret;
+    QEMUIOVector spec;
+
+    qemu_iovec_init(&spec, qiov->niov);
+    qemu_iovec_copy(&spec, qiov, offset, len);
+
+    if (do_read) {
+        ret = readv(fd, spec.iov, spec.niov);
+    } else {
+        ret = writev(fd, spec.iov, spec.niov);
+    }
+
+    qemu_iovec_destroy(&spec);
+
+    if (ret == -1) {
+        return -socket_error();
+    }
+
+    return ret;
+}
+
 static void combine_addr(char *buf, size_t len, const char* address,
                          uint16_t port)
 {
@@ -429,15 +449,31 @@ int nbd_client(int fd)
 }
 #endif
 
-int nbd_send_request(int csock, struct nbd_request *request)
+/* Put the NBD header into a buffer, ready for wire transmission.
+ * Endianness is dealt with here. The caller needs to allocate a
+ * buffer of sizeof(NBDRequest) bytes.
+ */
+void nbd_request_to_buf(NBDRequest *request, uint8_t *buf)
 {
-    uint8_t buf[4 + 4 + 8 + 8 + 4];
+    request->magic = NBD_REQUEST_MAGIC;
+    cpu_to_be32w((uint32_t *)(buf +  0), request->magic);
+    cpu_to_be32w((uint32_t *)(buf +  4), request->type);
+    cpu_to_be64w((uint64_t *)(buf +  8), request->handle);
+    cpu_to_be64w((uint64_t *)(buf + 16), request->from);
+    cpu_to_be32w((uint32_t *)(buf + 24), request->len);
+}
+
+void nbd_buf_to_reply(const uint8_t *buf, NBDReply *reply)
+{
+    reply->magic  = be32_to_cpup((uint32_t *)buf);
+    reply->error  = be32_to_cpup((uint32_t *)(buf + 4));
+    reply->handle = be64_to_cpup((uint64_t *)(buf + 8));
+}
 
-    cpu_to_be32w((uint32_t*)buf, NBD_REQUEST_MAGIC);
-    cpu_to_be32w((uint32_t*)(buf + 4), request->type);
-    cpu_to_be64w((uint64_t*)(buf + 8), request->handle);
-    cpu_to_be64w((uint64_t*)(buf + 16), request->from);
-    cpu_to_be32w((uint32_t*)(buf + 24), request->len);
+int nbd_send_request(int csock, NBDRequest *request)
+{
+    uint8_t buf[sizeof(NBDRequest)];
+    nbd_request_to_buf(request, buf);
 
     TRACE("Sending request to client: "
           "{ .from = %" PRIu64", .len = %u, .handle = %" PRIu64", .type=%i}",
@@ -490,8 +526,7 @@ static int nbd_receive_request(int csock, struct 
nbd_request *request)
 
 int nbd_receive_reply(int csock, struct nbd_reply *reply)
 {
-    uint8_t buf[NBD_REPLY_SIZE];
-    uint32_t magic;
+    uint8_t buf[sizeof(NBDReply)];
 
     memset(buf, 0xAA, sizeof(buf));
 
@@ -506,17 +541,14 @@ int nbd_receive_reply(int csock, struct nbd_reply *reply)
        [ 4 ..  7]    error   (0 == no error)
        [ 7 .. 15]    handle
      */
-
-    magic = be32_to_cpup((uint32_t*)buf);
-    reply->error  = be32_to_cpup((uint32_t*)(buf + 4));
-    reply->handle = be64_to_cpup((uint64_t*)(buf + 8));
+    nbd_buf_to_reply((uint8_t *)&buf, reply);
 
     TRACE("Got reply: "
           "{ magic = 0x%x, .error = %d, handle = %" PRIu64" }",
-          magic, reply->error, reply->handle);
+          reply->magic, reply->error, reply->handle);
 
-    if (magic != NBD_REPLY_MAGIC) {
-        LOG("invalid magic (got 0x%x)", magic);
+    if (reply->magic != NBD_REPLY_MAGIC) {
+        LOG("invalid magic (got 0x%x)", reply->magic);
         errno = EINVAL;
         return -1;
     }
@@ -558,7 +590,7 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, 
uint64_t dev_offset,
         return -1;
 
     if (request.len + NBD_REPLY_SIZE > data_size) {
-        LOG("len (%u) is larger than max len (%u)",
+        LOG("len (%lu) is larger than max len (%u)",
             request.len + NBD_REPLY_SIZE, data_size);
         errno = EINVAL;
         return -1;
diff --git a/nbd.h b/nbd.h
index b38d0d0..8eea104 100644
--- a/nbd.h
+++ b/nbd.h
@@ -39,15 +39,23 @@ struct nbd_reply {
     uint64_t handle;
 } __attribute__ ((__packed__));
 
+typedef struct nbd_request NBDRequest;
+typedef struct nbd_reply NBDReply;
+
 enum {
     NBD_CMD_READ = 0,
     NBD_CMD_WRITE = 1,
     NBD_CMD_DISC = 2
 };
 
-#define NBD_DEFAULT_PORT       10809
+#define NBD_DEFAULT_PORT     10809
+#define NBD_REPLY_SIZE       sizeof(NBDReply)
+#define NBD_REQUEST_MAGIC    0x25609513
+#define NBD_REPLY_MAGIC      0x67446698
 
 size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read);
+ssize_t nbd_qiov_wr(int fd, QEMUIOVector *qiov, size_t len, off_t offset,
+                    bool do_read);
 int tcp_socket_outgoing(const char *address, uint16_t port);
 int tcp_socket_incoming(const char *address, uint16_t port);
 int tcp_socket_outgoing_spec(const char *address_and_port);
@@ -55,6 +63,9 @@ int tcp_socket_incoming_spec(const char *address_and_port);
 int unix_socket_outgoing(const char *path);
 int unix_socket_incoming(const char *path);
 
+void nbd_request_to_buf(NBDRequest *request, uint8_t *buf);
+void nbd_buf_to_reply(const uint8_t *buf, NBDReply *reply);
+
 int nbd_negotiate(int csock, off_t size);
 int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
                           off_t *size, size_t *blocksize);
-- 
1.7.0.4




reply via email to

[Prev in Thread] Current Thread [Next in Thread]