qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH 1/3] sheepdog: make send/recv operations non-blockin


From: MORITA Kazutaka
Subject: [Qemu-devel] [PATCH 1/3] sheepdog: make send/recv operations non-blocking
Date: Tue, 29 Mar 2011 21:13:06 +0900

This patch avoids retrying send/recv in AIO path when the sheepdog
connection is not ready for the operation.

Signed-off-by: MORITA Kazutaka <address@hidden>
---
 block/sheepdog.c |  417 +++++++++++++++++++++++++++++++++++++-----------------
 1 files changed, 289 insertions(+), 128 deletions(-)

diff --git a/block/sheepdog.c b/block/sheepdog.c
index a54e0de..cedf806 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -242,6 +242,19 @@ static inline int is_snapshot(struct SheepdogInode *inode)
 
 typedef struct SheepdogAIOCB SheepdogAIOCB;
 
+enum ConnectionState {
+    C_IO_HEADER,
+    C_IO_DATA,
+    C_IO_END,
+    C_IO_CLOSED,
+};
+
+enum AIOReqState {
+    AIO_PENDING,            /* not ready for sending this request */
+    AIO_SEND_OBJREQ,        /* send this request */
+    AIO_RECV_OBJREQ,        /* receive a result of this request */
+};
+
 typedef struct AIOReq {
     SheepdogAIOCB *aiocb;
     unsigned int iov_offset;
@@ -253,6 +266,9 @@ typedef struct AIOReq {
     uint8_t flags;
     uint32_t id;
 
+    enum AIOReqState state;
+    struct SheepdogObjReq hdr;
+
     QLIST_ENTRY(AIOReq) outstanding_aio_siblings;
     QLIST_ENTRY(AIOReq) aioreq_siblings;
 } AIOReq;
@@ -348,12 +364,14 @@ static const char * sd_strerror(int err)
  * 1. In the sd_aio_readv/writev, read/write requests are added to the
  *    QEMU Bottom Halves.
  *
- * 2. In sd_readv_writev_bh_cb, the callbacks of BHs, we send the I/O
- *    requests to the server and link the requests to the
- *    outstanding_list in the BDRVSheepdogState.  we exits the
- *    function without waiting for receiving the response.
+ * 2. In sd_readv_writev_bh_cb, the callbacks of BHs, we set up the
+ *    I/O requests to the server and link the requests to the
+ *    outstanding_list in the BDRVSheepdogState.
+ *
+ * 3. We send the request in aio_send_request, the fd handler to the
+ *    sheepdog connection.
  *
- * 3. We receive the response in aio_read_response, the fd handler to
+ * 4. We receive the response in aio_read_response, the fd handler to
  *    the sheepdog connection.  If metadata update is needed, we send
  *    the write request to the vdi object in sd_write_done, the write
  *    completion function.  The AIOCB callback is not called until all
@@ -377,8 +395,6 @@ static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, 
SheepdogAIOCB *acb,
     aio_req->flags = flags;
     aio_req->id = s->aioreq_seq_num++;
 
-    QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req,
-                      outstanding_aio_siblings);
     QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings);
 
     return aio_req;
@@ -640,20 +656,17 @@ static int do_readv_writev(int sockfd, struct iovec *iov, 
int len,
 again:
     ret = do_send_recv(sockfd, iov, len, iov_offset, write);
     if (ret < 0) {
-        if (errno == EINTR || errno == EAGAIN) {
+        if (errno == EINTR) {
             goto again;
         }
+        if (errno == EAGAIN) {
+            return 0;
+        }
         error_report("failed to recv a rsp, %s\n", strerror(errno));
-        return 1;
-    }
-
-    iov_offset += ret;
-    len -= ret;
-    if (len) {
-        goto again;
+        return -errno;
     }
 
-    return 0;
+    return ret;
 }
 
 static int do_readv(int sockfd, struct iovec *iov, int len, int iov_offset)
@@ -666,30 +679,30 @@ static int do_writev(int sockfd, struct iovec *iov, int 
len, int iov_offset)
     return do_readv_writev(sockfd, iov, len, iov_offset, 1);
 }
 
-static int do_read_write(int sockfd, void *buf, int len, int write)
+static int do_read_write(int sockfd, void *buf, int len, int skip, int write)
 {
     struct iovec iov;
 
     iov.iov_base = buf;
-    iov.iov_len = len;
+    iov.iov_len = len + skip;
 
-    return do_readv_writev(sockfd, &iov, len, 0, write);
+    return do_readv_writev(sockfd, &iov, len, skip, write);
 }
 
-static int do_read(int sockfd, void *buf, int len)
+static int do_read(int sockfd, void *buf, int len, int skip)
 {
-    return do_read_write(sockfd, buf, len, 0);
+    return do_read_write(sockfd, buf, len, skip, 0);
 }
 
-static int do_write(int sockfd, void *buf, int len)
+static int do_write(int sockfd, void *buf, int len, int skip)
 {
-    return do_read_write(sockfd, buf, len, 1);
+    return do_read_write(sockfd, buf, len, skip, 1);
 }
 
 static int send_req(int sockfd, SheepdogReq *hdr, void *data,
                     unsigned int *wlen)
 {
-    int ret;
+    int ret, done = 0;
     struct iovec iov[2];
 
     iov[0].iov_base = hdr;
@@ -700,19 +713,23 @@ static int send_req(int sockfd, SheepdogReq *hdr, void 
*data,
         iov[1].iov_len = *wlen;
     }
 
-    ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0);
-    if (ret) {
-        error_report("failed to send a req, %s\n", strerror(errno));
-        ret = -1;
+    while (done < sizeof(*hdr) + *wlen) {
+        ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen - done, done);
+        if (ret < 0) {
+            error_report("failed to send a req, %s\n", strerror(errno));
+            ret = -1;
+        }
+        done += ret;
     }
 
-    return ret;
+    return 0;
 }
 
+/* This function shouldn't be used for asynchronous I/O */
 static int do_req(int sockfd, SheepdogReq *hdr, void *data,
                   unsigned int *wlen, unsigned int *rlen)
 {
-    int ret;
+    int ret, done;
 
     ret = send_req(sockfd, hdr, data, wlen);
     if (ret) {
@@ -720,33 +737,39 @@ static int do_req(int sockfd, SheepdogReq *hdr, void 
*data,
         goto out;
     }
 
-    ret = do_read(sockfd, hdr, sizeof(*hdr));
-    if (ret) {
-        error_report("failed to get a rsp, %s\n", strerror(errno));
-        ret = -1;
-        goto out;
+    done = 0;
+    while (done < sizeof(*hdr)) {
+        ret = do_read(sockfd, hdr, sizeof(*hdr) - done, done);
+        if (ret < 0) {
+            error_report("failed to get a rsp, %s\n", strerror(errno));
+            ret = -1;
+            goto out;
+        }
+        done += ret;
     }
 
     if (*rlen > hdr->data_length) {
         *rlen = hdr->data_length;
     }
 
-    if (*rlen) {
-        ret = do_read(sockfd, data, *rlen);
-        if (ret) {
+    done = 0;
+    while (done < *rlen) {
+        ret = do_read(sockfd, data, *rlen - done, done);
+        if (ret < 0) {
             error_report("failed to get the data, %s\n", strerror(errno));
             ret = -1;
             goto out;
         }
+        done += ret;
     }
     ret = 0;
 out:
     return ret;
 }
 
-static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
-                           struct iovec *iov, int niov, int create,
-                           enum AIOCBState aiocb_type);
+static void setup_aio_header(BDRVSheepdogState *s, AIOReq *aio_req, int create,
+                              enum AIOCBState aiocb_type);
+static int sd_update_fd_handler(BDRVSheepdogState *s);
 
 /*
  * This function searchs pending requests to the object `oid', and
@@ -756,10 +779,12 @@ static void send_pending_req(BDRVSheepdogState *s, 
uint64_t oid, uint32_t id)
 {
     AIOReq *aio_req, *next;
     SheepdogAIOCB *acb;
-    int ret;
 
     QLIST_FOREACH_SAFE(aio_req, &s->outstanding_aio_head,
                        outstanding_aio_siblings, next) {
+        if (aio_req->state != AIO_PENDING) {
+            continue;
+        }
         if (id == aio_req->id) {
             continue;
         }
@@ -768,15 +793,9 @@ static void send_pending_req(BDRVSheepdogState *s, 
uint64_t oid, uint32_t id)
         }
 
         acb = aio_req->aiocb;
-        ret = add_aio_request(s, aio_req, acb->qiov->iov,
-                              acb->qiov->niov, 0, acb->aiocb_type);
-        if (ret < 0) {
-            error_report("add_aio_request is failed\n");
-            free_aio_req(s, aio_req);
-            if (QLIST_EMPTY(&acb->aioreq_head)) {
-                sd_finish_aiocb(acb);
-            }
-        }
+        aio_req->state = AIO_SEND_OBJREQ;
+        setup_aio_header(s, aio_req, 0, acb->aiocb_type);
+        sd_update_fd_handler(s);
     }
 }
 
@@ -788,38 +807,92 @@ static void send_pending_req(BDRVSheepdogState *s, 
uint64_t oid, uint32_t id)
  */
 static void aio_read_response(void *opaque)
 {
-    SheepdogObjRsp rsp;
+    static SheepdogObjRsp rsp;
     BDRVSheepdogState *s = opaque;
     int fd = s->fd;
     int ret;
-    AIOReq *aio_req = NULL;
-    SheepdogAIOCB *acb;
+    static AIOReq *aio_req;
+    static SheepdogAIOCB *acb;
     int rest;
     unsigned long idx;
+    static int done;
+    static enum ConnectionState conn_state = C_IO_HEADER;
 
     if (QLIST_EMPTY(&s->outstanding_aio_head)) {
         return;
     }
 
-    /* read a header */
-    ret = do_read(fd, &rsp, sizeof(rsp));
-    if (ret) {
-        error_report("failed to get the header, %s\n", strerror(errno));
-        return;
-    }
+    switch (conn_state) {
+    case C_IO_HEADER:
+        /* read a header */
+        ret = do_read(fd, &rsp, sizeof(rsp) - done, done);
+        if (ret < 0) {
+            error_report("failed to get the header, %s\n", strerror(errno));
+            conn_state = C_IO_CLOSED;
+            break;
+        }
+        done += ret;
+        if (done < sizeof(rsp)) {
+            break;
+        }
+        done = 0;
 
-    /* find the right aio_req from the outstanding_aio list */
-    QLIST_FOREACH(aio_req, &s->outstanding_aio_head, outstanding_aio_siblings) 
{
-        if (aio_req->id == rsp.id) {
+        /* find the right aio_req from the outstanding_aio list */
+        QLIST_FOREACH(aio_req, &s->outstanding_aio_head,
+                      outstanding_aio_siblings) {
+            if (aio_req->state == AIO_RECV_OBJREQ && aio_req->id == rsp.id) {
+                break;
+            }
+        }
+        if (!aio_req) {
+            error_report("bug: cannot find aio_req %x\n", rsp.id);
+            return;
+        }
+        acb = aio_req->aiocb;
+
+        if (rsp.result != SD_RES_SUCCESS) {
+            acb->ret = -EIO;
+            error_report("%s\n", sd_strerror(rsp.result));
+            conn_state = C_IO_END;
             break;
         }
+
+        if (acb->aiocb_type == AIOCB_WRITE_UDATA) {
+            conn_state = C_IO_END;
+            break;
+        }
+        conn_state = C_IO_DATA;
+    case C_IO_DATA:
+        ret = do_readv(fd, acb->qiov->iov, aio_req->data_len - done,
+                       aio_req->iov_offset + done);
+        if (ret < 0) {
+            error_report("failed to get the data, %s\n", strerror(errno));
+            conn_state = C_IO_CLOSED;
+        }
+
+        done += ret;
+        if (done < aio_req->data_len) {
+            break;
+        }
+        done = 0;
+        conn_state = C_IO_END;
+        break;
+    default:
+        error_report("bug: invalid rx state %d", conn_state);
+        break;
     }
-    if (!aio_req) {
-        error_report("cannot find aio_req %x\n", rsp.id);
-        return;
+
+    if (conn_state == C_IO_CLOSED) {
+        acb->ret = -EIO;
+        rest = free_aio_req(s, aio_req);
+        if (!rest) {
+            acb->aio_done_func(acb);
+        }
     }
 
-    acb = aio_req->aiocb;
+    if (conn_state != C_IO_END) {
+        return;
+    }
 
     switch (acb->aiocb_type) {
     case AIOCB_WRITE_UDATA:
@@ -848,19 +921,10 @@ static void aio_read_response(void *opaque)
         }
         break;
     case AIOCB_READ_UDATA:
-        ret = do_readv(fd, acb->qiov->iov, rsp.data_length,
-                       aio_req->iov_offset);
-        if (ret) {
-            error_report("failed to get the data, %s\n", strerror(errno));
-            return;
-        }
         break;
     }
 
-    if (rsp.result != SD_RES_SUCCESS) {
-        acb->ret = -EIO;
-        error_report("%s\n", sd_strerror(rsp.result));
-    }
+    conn_state = C_IO_HEADER;
 
     rest = free_aio_req(s, aio_req);
     if (!rest) {
@@ -870,6 +934,8 @@ static void aio_read_response(void *opaque)
          */
         acb->aio_done_func(acb);
     }
+
+    sd_update_fd_handler(s);
 }
 
 static int aio_flush_request(void *opaque)
@@ -905,6 +971,129 @@ static int set_nodelay(int fd)
 }
 
 /*
+ * Send I/O requests.
+ *
+ * This function is registered as a fd handler, and called from the
+ * main loop when s->fd is ready for sending requests.
+ */
+static void aio_send_request(void *opaque)
+{
+    BDRVSheepdogState *s = opaque;
+    int ret, rest;
+    AIOReq *areq;
+    static AIOReq *aio_req;
+    static SheepdogAIOCB *acb;
+    static int done;
+    static enum ConnectionState conn_state = C_IO_HEADER;
+
+    set_cork(s->fd, 1);
+
+    switch (conn_state) {
+    case C_IO_HEADER:
+        if (!aio_req) {
+            /* find the oldest aio_req from the outstanding_aio list */
+            QLIST_FOREACH(areq, &s->outstanding_aio_head,
+                          outstanding_aio_siblings) {
+                if (areq->state == AIO_SEND_OBJREQ) {
+                    aio_req = areq;
+                }
+            }
+            if (!aio_req) {
+                error_report("bug: cannot find aio_req to be send");
+                return;
+            }
+            acb = aio_req->aiocb;
+        }
+
+        /* send a header */
+        ret = do_write(s->fd, &aio_req->hdr, sizeof(aio_req->hdr) - done, 
done);
+        if (ret < 0) {
+            error_report("failed to send a req, %s\n", strerror(errno));
+            conn_state = C_IO_CLOSED;
+            break;
+        }
+        done += ret;
+        if (done < sizeof(aio_req->hdr)) {
+            break;
+        }
+        done = 0;
+        if (acb->aiocb_type == AIOCB_READ_UDATA) {
+            conn_state = C_IO_END;
+            break;
+        }
+        conn_state = C_IO_DATA;
+    case C_IO_DATA:
+        if (is_data_obj(aio_req->oid)) {
+            ret = do_writev(s->fd, acb->qiov->iov, aio_req->data_len - done,
+                            aio_req->iov_offset + done);
+        } else {
+            ret = do_write(s->fd, (uint8_t *)&s->inode + aio_req->offset,
+                           aio_req->data_len - done, done);
+        }
+        if (ret < 0) {
+            error_report("failed to send a data, %s\n", strerror(errno));
+            conn_state = C_IO_CLOSED;
+        }
+        done += ret;
+        if (done < aio_req->data_len) {
+            break;
+        }
+        done = 0;
+        conn_state = C_IO_END;
+        break;
+    default:
+        error_report("bug: invalid tx state %d", conn_state);
+        break;
+    }
+
+    set_cork(s->fd, 0);
+
+    if (conn_state == C_IO_CLOSED) {
+        acb->ret = -EIO;
+        rest = free_aio_req(s, aio_req);
+        if (!rest) {
+            /*
+             * We've finished all requests which belong to the AIOCB, so
+             * we can call the callback now.
+             */
+            acb->aio_done_func(acb);
+        }
+    }
+
+    if (conn_state != C_IO_END) {
+        return;
+    }
+
+    aio_req->state = AIO_RECV_OBJREQ;
+
+    conn_state = C_IO_HEADER;
+    aio_req = NULL;
+
+    sd_update_fd_handler(s);
+}
+
+/*
+ * Check outstanding requests and set proper fd handlers
+ */
+static int sd_update_fd_handler(BDRVSheepdogState *s)
+{
+    IOHandler *io_read = NULL, *io_write = NULL;
+    AIOReq *areq;
+
+    QLIST_FOREACH(areq, &s->outstanding_aio_head, outstanding_aio_siblings) {
+        if (areq->state == AIO_SEND_OBJREQ) {
+            io_write = aio_send_request;
+        }
+        if (areq->state == AIO_RECV_OBJREQ) {
+            io_read = aio_read_response;
+        }
+    }
+
+    return qemu_aio_set_fd_handler(s->fd, io_read, io_write,
+                                   aio_flush_request, NULL, s);
+}
+
+/*
  * Return a socket discriptor to read/write objects.
  *
  * We cannot use this discriptor for other operations because
@@ -929,8 +1118,6 @@ static int get_sheep_fd(BDRVSheepdogState *s)
         return -1;
     }
 
-    qemu_aio_set_fd_handler(fd, aio_read_response, NULL, aio_flush_request,
-                            NULL, s);
     return fd;
 }
 
@@ -1053,14 +1240,12 @@ out:
     return ret;
 }
 
-static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
-                           struct iovec *iov, int niov, int create,
-                           enum AIOCBState aiocb_type)
+static void setup_aio_header(BDRVSheepdogState *s, AIOReq *aio_req, int create,
+                             enum AIOCBState aiocb_type)
 {
     int nr_copies = s->inode.nr_copies;
     SheepdogObjReq hdr;
     unsigned int wlen;
-    int ret;
     uint64_t oid = aio_req->oid;
     unsigned int datalen = aio_req->data_len;
     uint64_t offset = aio_req->offset;
@@ -1096,26 +1281,7 @@ static int add_aio_request(BDRVSheepdogState *s, AIOReq 
*aio_req,
 
     hdr.id = aio_req->id;
 
-    set_cork(s->fd, 1);
-
-    /* send a header */
-    ret = do_write(s->fd, &hdr, sizeof(hdr));
-    if (ret) {
-        error_report("failed to send a req, %s\n", strerror(errno));
-        return -EIO;
-    }
-
-    if (wlen) {
-        ret = do_writev(s->fd, iov, wlen, aio_req->iov_offset);
-        if (ret) {
-            error_report("failed to send a data, %s\n", strerror(errno));
-            return -EIO;
-        }
-    }
-
-    set_cork(s->fd, 0);
-
-    return 0;
+    memcpy(&aio_req->hdr, &hdr, sizeof(hdr));
 }
 
 static int read_write_object(int fd, char *buf, uint64_t oid, int copies,
@@ -1440,9 +1606,7 @@ static int sd_truncate(BlockDriverState *bs, int64_t 
offset)
  */
 static void sd_write_done(SheepdogAIOCB *acb)
 {
-    int ret;
     BDRVSheepdogState *s = acb->common.bs->opaque;
-    struct iovec iov;
     AIOReq *aio_req;
     uint32_t offset, data_len, mn, mx;
 
@@ -1457,22 +1621,20 @@ static void sd_write_done(SheepdogAIOCB *acb)
         s->min_dirty_data_idx = UINT32_MAX;
         s->max_dirty_data_idx = 0;
 
-        iov.iov_base = &s->inode;
-        iov.iov_len = sizeof(s->inode);
         aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
                                 data_len, offset, 0, 0, offset);
-        ret = add_aio_request(s, aio_req, &iov, 1, 0, AIOCB_WRITE_UDATA);
-        if (ret) {
-            free_aio_req(s, aio_req);
-            acb->ret = -EIO;
-            goto out;
-        }
+        aio_req->state = AIO_SEND_OBJREQ;
+        setup_aio_header(s, aio_req, 0, AIOCB_WRITE_UDATA);
 
         acb->aio_done_func = sd_finish_aiocb;
         acb->aiocb_type = AIOCB_WRITE_UDATA;
+
+        QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req,
+                          outstanding_aio_siblings);
+        sd_update_fd_handler(s);
         return;
     }
-out:
+
     sd_finish_aiocb(acb);
 }
 
@@ -1525,13 +1687,12 @@ out:
 }
 
 /*
- * Send I/O requests to the server.
+ * Set up I/O requests
  *
- * This function sends requests to the server, links the requests to
- * the outstanding_list in BDRVSheepdogState, and exits without
- * waiting the response.  The responses are received in the
- * `aio_read_response' function which is called from the main loop as
- * a fd handler.
+ * This function creates asynchronous I/O requests and links them to
+ * the outstanding_list in BDRVSheepdogState.  The requests are sent
+ * in the `aio_send_requests' function which is called from the main
+ * loop as a fd handler.
  */
 static void sd_readv_writev_bh_cb(void *p)
 {
@@ -1553,6 +1714,7 @@ static void sd_readv_writev_bh_cb(void *p)
          * In the case we open the snapshot VDI, Sheepdog creates the
          * writable VDI when we do a write operation first.
          */
+        /* FIXME: we shouldn't block here */
         ret = sd_create_branch(s);
         if (ret) {
             acb->ret = -EIO;
@@ -1592,6 +1754,9 @@ static void sd_readv_writev_bh_cb(void *p)
         }
 
         aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, old_oid, 
done);
+        aio_req->state = AIO_SEND_OBJREQ;
+        QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req,
+                          outstanding_aio_siblings);
 
         if (create) {
             AIOReq *areq;
@@ -1609,19 +1774,13 @@ static void sd_readv_writev_bh_cb(void *p)
                      */
                     aio_req->flags = 0;
                     aio_req->base_oid = 0;
+                    aio_req->state = AIO_PENDING;
                     goto done;
                 }
             }
         }
 
-        ret = add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
-                              create, acb->aiocb_type);
-        if (ret < 0) {
-            error_report("add_aio_request is failed\n");
-            free_aio_req(s, aio_req);
-            acb->ret = -EIO;
-            goto out;
-        }
+        setup_aio_header(s, aio_req, create, acb->aiocb_type);
     done:
         offset = 0;
         idx++;
@@ -1631,6 +1790,8 @@ out:
     if (QLIST_EMPTY(&acb->aioreq_head)) {
         sd_finish_aiocb(acb);
     }
+
+    sd_update_fd_handler(s);
 }
 
 static BlockDriverAIOCB *sd_aio_writev(BlockDriverState *bs, int64_t 
sector_num,
-- 
1.5.6.5




reply via email to

[Prev in Thread] Current Thread [Next in Thread]