qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH 3/3 v2] block/nbd: Make the NBD block device use


From: Nicholas Thomas
Subject: Re: [Qemu-devel] [PATCH 3/3 v2] block/nbd: Make the NBD block device use the AIO interface
Date: Fri, 18 Feb 2011 12:16:31 +0000

On Thu, 2011-02-17 at 19:28 +0000, Nicholas Thomas wrote:

> Additional testing has revealed that this code breaks the stock
> nbd-server (the one on sourceforge) when large (well, 1.3MiB) write
> requests are sent to it.

....NBD has a limit of 1MB on the size of write requests.
NBD_BUFFER_SIZE in qemu-nbd.c - and I'm sure that's what's knocking out
the standard NBD server too.

I didn't see any option to tell QEMU to split up writes to a certain
size before handing them off to the block driver, so I split the writes
up into multiple acbs. Reworked patch:


This preserves the previous behaviour where the NBD server is
unavailable or goes away during guest execution, but switches the
NBD backend to present the AIO interface instead of the sync IO
interface.

We also split write requests into 1 MiB blocks (minus request size).
This is a hard limit in the NBD servers (including qemu-nbd), but
never seemed to come up with the previous backend code.
---
 block/nbd.c |  555
++++++++++++++++++++++++++++++++++++++++++++++++++---------
 1 files changed, 470 insertions(+), 85 deletions(-)

diff --git a/block/nbd.c b/block/nbd.c
index c8dc763..59de69d 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -1,11 +1,12 @@
 /*
- * QEMU Block driver for  NBD
+ * QEMU Block driver for  NBD - asynchronous IO
  *
  * Copyright (C) 2008 Bull S.A.S.
  *     Author: Laurent Vivier <address@hidden>
  *
  * Some parts:
  *    Copyright (C) 2007 Anthony Liguori <address@hidden>
+ *    Copyright (C) 2011 Nick Thomas <address@hidden>
  *
  * Permission is hereby granted, free of charge, to any person
obtaining a copy
  * of this software and associated documentation files (the
"Software"), to deal
@@ -27,66 +28,135 @@
  */
 
 #include "qemu-common.h"
+#include "qemu_socket.h"
 #include "nbd.h"
 #include "module.h"
 
 #include <sys/types.h>
 #include <unistd.h>
 
-#define EN_OPTSTR ":exportname="
+#define EN_OPTSTR      ":exportname="
+#define SECTOR_SIZE    512
+
+/* 1MiB minus request header size */
+#define MAX_NBD_WRITE      ((1024*1024) - (4 + 4 + 8 + 8 + 4))
+
+#define DEBUG_NBD
+
+#if defined(DEBUG_NBD)
+#define logout(fmt, ...) \
+                fprintf(stderr, "nbd\t%-24s" fmt, __func__,
##__VA_ARGS__)
+#else
+#define logout(fmt, ...) ((void)0)
+#endif
+
+
+typedef struct NBDAIOCB NBDAIOCB;
+
+typedef struct AIOReq {
+    NBDAIOCB *aiocb;
+    off_t iov_offset; /* Where on the iov does this req start? */
+    off_t offset;     /* Starting point of the read */
+
+    size_t data_len;
+    uint8_t flags;
+    uint64_t handle;
+
+    QLIST_ENTRY(AIOReq) outstanding_aio_siblings;
+    QLIST_ENTRY(AIOReq) aioreq_siblings;
+} AIOReq;
+
 
 typedef struct BDRVNBDState {
     int sock;
     off_t size;
     size_t blocksize;
+
+    /* Filled in by nbd_config. Store host_spec because DNS may change
*/
+    bool tcp_conn;      /* True, we use TCP. False, UNIX domain sockets
*/
+    char *export_name;  /* An NBD server may export several devices
*/
+    char *host_spec;    /* Path to socket (UNIX) or hostname/IP (TCP)
*/
+    uint16_t tcp_port;
+
+    /* We use these for asynchronous I/O */
+    uint64_t aioreq_seq_num;
+    QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head;
 } BDRVNBDState;
 
-static int nbd_open(BlockDriverState *bs, const char* filename, int
flags)
+enum AIOCBState {
+    AIOCB_WRITE_UDATA,
+    AIOCB_READ_UDATA,
+};
+
+struct NBDAIOCB {
+    BlockDriverAIOCB common;
+
+    QEMUIOVector *qiov;
+
+    int64_t sector_num;
+    int nb_sectors;
+
+    int ret;
+    enum AIOCBState aiocb_type;
+
+    QEMUBH *bh;
+    void (*aio_done_func)(NBDAIOCB *);
+
+    int canceled;
+
+    QLIST_HEAD(aioreq_head, AIOReq) aioreq_head;
+};
+
+static inline int free_aio_req(BDRVNBDState *s, AIOReq *aio_req)
 {
-    BDRVNBDState *s = bs->opaque;
-    uint32_t nbdflags;
+    NBDAIOCB *acb = aio_req->aiocb;
+    QLIST_REMOVE(aio_req, outstanding_aio_siblings);
+    QLIST_REMOVE(aio_req, aioreq_siblings);
+    qemu_free(aio_req);
 
+    return !QLIST_EMPTY(&acb->aioreq_head);
+}
+
+static int nbd_config(BDRVNBDState *s, const char* filename, int flags)
+{
     char *file;
-    char *name;
-    const char *host;
+    char *export_name;
+    const char *host_spec;
     const char *unixpath;
-    int sock;
-    off_t size;
-    size_t blocksize;
-    int ret;
     int err = -EINVAL;
 
     file = qemu_strdup(filename);
 
-    name = strstr(file, EN_OPTSTR);
-    if (name) {
-        if (name[strlen(EN_OPTSTR)] == 0) {
+    export_name = strstr(file, EN_OPTSTR);
+    if (export_name) {
+        if (export_name[strlen(EN_OPTSTR)] == 0) {
             goto out;
         }
-        name[0] = 0;
-        name += strlen(EN_OPTSTR);
+        export_name[0] = 0; /* truncate 'file' */
+        export_name += strlen(EN_OPTSTR);
+        s->export_name = qemu_strdup(export_name);
     }
 
-    if (!strstart(file, "nbd:", &host)) {
+    /* extract the host_spec - fail if it's not nbd:* */
+    if (!strstart(file, "nbd:", &host_spec)) {
         goto out;
     }
 
-    if (strstart(host, "unix:", &unixpath)) {
-
-        if (unixpath[0] != '/') {
+    /* are we a UNIX or TCP socket? */
+    if (strstart(host_spec, "unix:", &unixpath)) {
+        if (unixpath[0] != '/') { /* We demand  an absolute path*/
             goto out;
         }
-
-        sock = unix_socket_outgoing(unixpath);
-
+        s->tcp_conn = false;
+        s->host_spec = qemu_strdup(unixpath);
     } else {
+        /* We should have an <IPv4 address>:<port> string to split up
*/
         uint16_t port = NBD_DEFAULT_PORT;
         char *p, *r;
         char hostname[128];
 
-        pstrcpy(hostname, 128, host);
-
-        p = strchr(hostname, ':');
+        pstrcpy(hostname, 128, host_spec);
+        p = strchr(hostname, ':'); /* FIXME: IPv6 */
         if (p != NULL) {
             *p = '\0';
             p++;
@@ -96,121 +166,436 @@ static int nbd_open(BlockDriverState *bs, const
char* filename, int flags)
                 goto out;
             }
         }
+        s->tcp_conn = true;
+        s->host_spec = qemu_strdup(hostname);
+        s->tcp_port = port;
+    }
+
+    err = 0;
+
+out:
+    qemu_free(file);
+    if (err != 0) {
+        if (s->export_name != NULL) {
+            qemu_free(s->export_name);
+        }
+        if (s->host_spec != NULL) {
+            qemu_free(s->host_spec);
+        }
+    }
+    return err;
+}
+
+static void aio_read_response(void *opaque)
+{
+    BDRVNBDState *s = opaque;
+    struct nbd_reply reply;
+
+    AIOReq *aio_req = NULL;
+    NBDAIOCB *acb;
+    int rest;
+
+    if (QLIST_EMPTY(&s->outstanding_aio_head)) {
+        return;
+    }
+
+    /* read the header */
+    if (nbd_receive_reply(s->sock, &reply) == -1) {
+        logout("Failed to read response from socket\n");
+        /* Having failed to read the reply header, we can't know which
+         * aio_req this corresponds to - so we can't signal a failure.
+         */
+        return;
+    }
+
+    /* find the right aio_req from the outstanding_aio list */
+    QLIST_FOREACH(aio_req, &s->outstanding_aio_head,
outstanding_aio_siblings) {
+        if (aio_req->handle == reply.handle) {
+            break;
+        }
+    }
+
+    if (!aio_req) {
+        logout("cannot find aio_req for handle %lu\n", reply.handle);
+        return;
+    }
+
+    acb = aio_req->aiocb;
+
+    if (acb->aiocb_type == AIOCB_READ_UDATA) {
+        off_t offset = 0;
+        int ret = 0;
+        size_t total = aio_req->data_len;
+
+        while (offset < total) {
+            ret = nbd_wr_aio(s->sock, acb->qiov->iov, total - offset,
offset,
+                             true);
+
+            if (ret == -1) {
+                logout("Error reading from NBD server: %i (%s)\n",
+                       errno, strerror(errno));
+                return;
+            }
+
+            offset += ret;
+        }
+    }
 
-        sock = tcp_socket_outgoing(hostname, port);
+    if (reply.error != 0) {
+        acb->ret = -EIO;
+        logout("NBD request resulted in error %i\n", reply.error);
     }
 
+    rest = free_aio_req(s, aio_req);
+    if (!rest) {
+        acb->aio_done_func(acb);
+    }
+
+    return;
+}
+
+static int aio_flush_request(void *opaque)
+{
+    BDRVNBDState *s = opaque;
+
+    return !QLIST_EMPTY(&s->outstanding_aio_head);
+}
+
+/*
+ * Connect to the NBD server specified in the state object
+ */
+static int nbd_establish_connection(BlockDriverState *bs)
+{
+    BDRVNBDState *s = bs->opaque;
+    int sock;
+    int ret;
+    off_t size;
+    size_t blocksize;
+    uint32_t nbdflags;
+
+    if (s->tcp_conn == true) {
+        sock = tcp_socket_outgoing(s->host_spec, s->tcp_port);
+    } else {
+        sock = unix_socket_outgoing(s->host_spec);
+    }
+
+    /* Failed to establish connection */
     if (sock == -1) {
-        err = -errno;
-        goto out;
+        logout("Failed to establish connection to NBD server\n");
+        return -errno;
     }
 
-    ret = nbd_receive_negotiate(sock, name, &nbdflags, &size,
&blocksize);
+    /* NBD handshake */
+    ret = nbd_receive_negotiate(sock, s->export_name, &nbdflags, &size,
+                                &blocksize);
     if (ret == -1) {
-        err = -errno;
-        goto out;
+        logout("Failed to negotiate with the NBD server\n");
+        closesocket(sock);
+        return -errno;
     }
 
+    /* Now that we're connected, set the socket to be non-blocking */
+    socket_set_nonblock(sock);
+
     s->sock = sock;
     s->size = size;
     s->blocksize = blocksize;
-    err = 0;
 
-out:
-    qemu_free(file);
-    return err;
+    /* Response handler. This is called when there is data to read */
+    qemu_aio_set_fd_handler(sock, aio_read_response, NULL,
aio_flush_request,
+                            NULL, s);
+    logout("Established connection with NBD server\n");
+    return 0;
 }
 
-static int nbd_read(BlockDriverState *bs, int64_t sector_num,
-                    uint8_t *buf, int nb_sectors)
+static void nbd_teardown_connection(BlockDriverState *bs)
 {
+    /* Send the final packet to the NBD server and close the socket */
     BDRVNBDState *s = bs->opaque;
     struct nbd_request request;
-    struct nbd_reply reply;
 
-    request.type = NBD_CMD_READ;
+    request.type = NBD_CMD_DISC;
     request.handle = (uint64_t)(intptr_t)bs;
-    request.from = sector_num * 512;;
-    request.len = nb_sectors * 512;
+    request.from = 0;
+    request.len = 0;
+    nbd_send_request(s->sock, &request);
 
-    if (nbd_send_request(s->sock, &request) == -1)
-        return -errno;
+    qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL);
+    closesocket(s->sock);
+    logout("Connection to NBD server closed\n");
+    return;
+}
 
-    if (nbd_receive_reply(s->sock, &reply) == -1)
-        return -errno;
+static int nbd_open(BlockDriverState *bs, const char* filename, int
flags)
+{
+    BDRVNBDState *s = bs->opaque;
+    int result;
 
-    if (reply.error !=0)
-        return -reply.error;
+    /* Pop the config into our state object. Exit if invalid. */
+    result = nbd_config(s, filename, flags);
 
-    if (reply.handle != request.handle)
-        return -EIO;
+    if (result != 0) {
+        return result;
+    }
 
-    if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len)
-        return -EIO;
+    QLIST_INIT(&s->outstanding_aio_head);
+
+    /* establish TCP connection, return error if it fails
+     * TODO: Configurable retry-until-timeout behaviour.
+     */
+    result = nbd_establish_connection(bs);
+    if (result != 0) {
+        return result;
+    }
 
     return 0;
 }
 
-static int nbd_write(BlockDriverState *bs, int64_t sector_num,
-                     const uint8_t *buf, int nb_sectors)
+static void nbd_close(BlockDriverState *bs)
 {
+    nbd_teardown_connection(bs);
     BDRVNBDState *s = bs->opaque;
+
+    if (s->export_name != NULL) {
+        qemu_free(s->export_name);
+    }
+    if (s->host_spec != NULL) {
+        qemu_free(s->host_spec);
+    }
+
+    return;
+}
+
+static int add_aio_request(BDRVNBDState *s, AIOReq *aio_req,
QEMUIOVector *qiov,
+                           enum AIOCBState aiocb_type)
+{
     struct nbd_request request;
-    struct nbd_reply reply;
 
-    request.type = NBD_CMD_WRITE;
-    request.handle = (uint64_t)(intptr_t)bs;
-    request.from = sector_num * 512;;
-    request.len = nb_sectors * 512;
+    request.from = aio_req->offset;
+    request.len = aio_req->data_len;
+    request.handle = aio_req->handle;
+
+    if (aiocb_type == AIOCB_READ_UDATA) {
+        request.type = NBD_CMD_READ;
+    } else {
+        request.type = NBD_CMD_WRITE;
+    }
 
-    if (nbd_send_request(s->sock, &request) == -1)
+    /* Write the request to the socket. Header first. */
+    if (nbd_send_request(s->sock, &request) == -1) {
+        /* TODO: retry handling. This leads to -EIO and request
cancellation */
+        logout("writing request header to server failed\n");
         return -errno;
+    }
 
-    if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) !=
request.len)
-        return -EIO;
+    /* If this is a write, send the data too */
+    if (aiocb_type == AIOCB_WRITE_UDATA) {
+        int ret = 0;
+        off_t offset = 0;
+        size_t total = aio_req->data_len;
+
+        while (offset < total) {
+            ret = nbd_wr_aio(s->sock, qiov->iov, total - offset,
+                             offset + aio_req->iov_offset, false);
+            if (ret == -1) {
+                logout("Error writing request data to NBD server: %i (%
s)\n",
+                       errno, strerror(errno));
+                return -EIO;
+            }
 
-    if (nbd_receive_reply(s->sock, &reply) == -1)
-        return -errno;
+            offset += ret;
+        }
+    }
 
-    if (reply.error !=0)
-        return -reply.error;
+    return 0;
+}
+
+static inline AIOReq *alloc_aio_req(BDRVNBDState *s, NBDAIOCB *acb,
+                                    size_t data_len,
+                                    off_t offset,
+                                    off_t iov_offset)
+{
+    AIOReq *aio_req;
 
-    if (reply.handle != request.handle)
+    aio_req = qemu_malloc(sizeof(*aio_req));
+    aio_req->aiocb = acb;
+    aio_req->iov_offset = iov_offset;
+    aio_req->offset = offset;
+    aio_req->data_len = data_len;
+    aio_req->handle = s->aioreq_seq_num++; /* FIXME: Trivially
guessable */
+
+    QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req,
+                      outstanding_aio_siblings);
+    QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings);
+
+    return aio_req;
+}
+
+static void nbd_finish_aiocb(NBDAIOCB *acb)
+{
+    if (!acb->canceled) {
+        acb->common.cb(acb->common.opaque, acb->ret);
+    }
+    qemu_aio_release(acb);
+}
+
+
+static void nbd_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+    NBDAIOCB *acb = (NBDAIOCB *)blockacb;
+
+    /*
+     * We cannot cancel the requests which are already sent to
+     * the servers, so we just complete the request with -EIO here.
+     */
+    acb->common.cb(acb->common.opaque, -EIO);
+    acb->canceled = 1;
+}
+
+static AIOPool nbd_aio_pool = {
+    .aiocb_size = sizeof(NBDAIOCB),
+    .cancel = nbd_aio_cancel,
+};
+
+static NBDAIOCB *nbd_aio_setup(BlockDriverState *bs, QEMUIOVector
*qiov,
+                                   int64_t sector_num, int nb_sectors,
+                                   BlockDriverCompletionFunc *cb, void
*opaque)
+{
+    NBDAIOCB *acb;
+
+    acb = qemu_aio_get(&nbd_aio_pool, bs, cb, opaque);
+
+    acb->qiov = qiov;
+
+    acb->sector_num = sector_num;
+    acb->nb_sectors = nb_sectors;
+
+    acb->aio_done_func = NULL;
+    acb->canceled = 0;
+    acb->bh = NULL;
+    acb->ret = 0;
+    QLIST_INIT(&acb->aioreq_head);
+    return acb;
+}
+
+static int nbd_schedule_bh(QEMUBHFunc *cb, NBDAIOCB *acb)
+{
+    if (acb->bh) {
+        logout("bug: %d %d\n", acb->aiocb_type, acb->aiocb_type);
         return -EIO;
+    }
+
+    acb->bh = qemu_bh_new(cb, acb);
+    if (!acb->bh) {
+        logout("oom: %d %d\n", acb->aiocb_type, acb->aiocb_type);
+        return -EIO;
+    }
+
+    qemu_bh_schedule(acb->bh);
 
     return 0;
 }
 
-static void nbd_close(BlockDriverState *bs)
+/*
+ * Send I/O requests to the server.
+ *
+ * This function sends requests to the server, links the requests to
+ * the outstanding_list in BDRVNBDState, and exits without waiting for
+ * the response.  The responses are received in the `aio_read_response'
+ * function which is called from the main loop as a fd handler.
+ * If this is a write request and it's >1MB, split it into multiple
AIOReqs
+ */
+static void nbd_readv_writev_bh_cb(void *p)
 {
-    BDRVNBDState *s = bs->opaque;
-    struct nbd_request request;
+    NBDAIOCB *acb = p;
+    int ret = 0;
 
-    request.type = NBD_CMD_DISC;
-    request.handle = (uint64_t)(intptr_t)bs;
-    request.from = 0;
-    request.len = 0;
-    nbd_send_request(s->sock, &request);
+    size_t len, done = 0;
+    size_t total = acb->nb_sectors * SECTOR_SIZE;
+
+    /* Where the read/write starts from */
+    size_t offset = acb->sector_num * SECTOR_SIZE;
+    BDRVNBDState *s = acb->common.bs->opaque;
+
+    AIOReq *aio_req;
 
-    close(s->sock);
+    qemu_bh_delete(acb->bh);
+    acb->bh = NULL;
+
+    while (done != total) {
+        len = (total - done);
+
+        /* Split write requests into 1MiB segments */
+        if(acb->aiocb_type == AIOCB_WRITE_UDATA && len > MAX_NBD_WRITE)
{
+          len = MAX_NBD_WRITE;
+        }
+
+        aio_req = alloc_aio_req(s, acb, len, offset + done, done);
+        ret = add_aio_request(s, aio_req, acb->qiov, acb->aiocb_type);
+
+        if (ret < 0) {
+            free_aio_req(s, aio_req);
+            acb->ret = -EIO;
+            goto out;
+        }
+
+        done += len;
+    }
+out:
+    if (QLIST_EMPTY(&acb->aioreq_head)) {
+        nbd_finish_aiocb(acb);
+    }
 }
 
+static BlockDriverAIOCB *nbd_aio_readv(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    NBDAIOCB *acb;
+    int i;
+
+    acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+    acb->aiocb_type = AIOCB_READ_UDATA;
+    acb->aio_done_func = nbd_finish_aiocb;
+
+    for (i = 0; i < qiov->niov; i++) {
+        memset(qiov->iov[i].iov_base, 0, qiov->iov[i].iov_len);
+    }
+
+    nbd_schedule_bh(nbd_readv_writev_bh_cb, acb);
+    return &acb->common;
+}
+
+static BlockDriverAIOCB *nbd_aio_writev(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    NBDAIOCB *acb;
+
+    acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+    acb->aiocb_type = AIOCB_WRITE_UDATA;
+    acb->aio_done_func = nbd_finish_aiocb;
+
+    nbd_schedule_bh(nbd_readv_writev_bh_cb, acb);
+    return &acb->common;
+}
+
+
 static int64_t nbd_getlength(BlockDriverState *bs)
 {
     BDRVNBDState *s = bs->opaque;
-
     return s->size;
 }
 
 static BlockDriver bdrv_nbd = {
-    .format_name       = "nbd",
-    .instance_size     = sizeof(BDRVNBDState),
-    .bdrv_file_open    = nbd_open,
-    .bdrv_read         = nbd_read,
-    .bdrv_write                = nbd_write,
-    .bdrv_close                = nbd_close,
-    .bdrv_getlength    = nbd_getlength,
-    .protocol_name     = "nbd",
+    .format_name     = "nbd",
+    .instance_size   = sizeof(BDRVNBDState),
+    .bdrv_file_open  = nbd_open,
+    .bdrv_aio_readv  = nbd_aio_readv,
+    .bdrv_aio_writev = nbd_aio_writev,
+    .bdrv_close      = nbd_close,
+    .bdrv_getlength  = nbd_getlength,
+    .protocol_name   = "nbd"
 };
 
 static void bdrv_nbd_init(void)
-- 
1.7.0.4







reply via email to

[Prev in Thread] Current Thread [Next in Thread]