qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH] Support NBD client under win32/MinGW


From: Or Goshen
Subject: [Qemu-devel] [PATCH] Support NBD client under win32/MinGW
Date: Mon, 24 Feb 2014 11:13:21 +0200

From: Or Goshen <address@hidden>

---
 aio-win32.c         |  244 +++++++++++++++++++++++++++++++++++++++++----------
 block/Makefile.objs |    4 +-
 block/nbd-client.h  |    2 +-
 include/block/aio.h |    2 -
 include/block/nbd.h |    2 +-
 main-loop.c         |    2 -
 nbd.c               |    4 +-
 qemu-coroutine-io.c |    4 +-
 8 files changed, 208 insertions(+), 56 deletions(-)

diff --git a/aio-win32.c b/aio-win32.c
index 23f4e5b..7f716b1 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -22,12 +22,76 @@
 
 struct AioHandler {
     EventNotifier *e;
+    IOHandler *io_read;
+    IOHandler *io_write;
     EventNotifierHandler *io_notify;
     GPollFD pfd;
     int deleted;
+    void *opaque;
     QLIST_ENTRY(AioHandler) node;
 };
 
+void aio_set_fd_handler(AioContext *ctx,
+                        int fd,
+                        IOHandler *io_read,
+                        IOHandler *io_write,
+                        void *opaque)
+{
+       /* fd is a SOCKET in our case */
+       AioHandler *node;
+
+       QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+               if (node->pfd.fd == fd && !node->deleted)
+                               break;
+       }
+
+       /* Are we deleting the fd handler? */
+       if (!io_read && !io_write) {
+               if (node) {
+                       /* If the lock is held, just mark the node as deleted */
+                       if (ctx->walking_handlers) {
+                               node->deleted = 1;
+                               node->pfd.revents = 0;
+                       } else {
+                               /* Otherwise, delete it for real.  We can't 
just mark it as
+                                * deleted because deleted nodes are only 
cleaned up after
+                                * releasing the walking_handlers lock.
+                                */
+                               QLIST_REMOVE(node, node);
+                               CloseHandle((HANDLE)node->e);
+                               g_free(node);
+                       }
+               }
+       } else {
+               if (node == NULL) {
+                       /* Alloc and insert if it's not already there */
+                       node = g_malloc0(sizeof(AioHandler));
+                       node->pfd.fd = fd;
+                       QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+               }
+               /* Create event */
+               HANDLE event = WSACreateEvent();
+               long lNetworkEvents = 0;
+
+               if (node->io_read)
+                       lNetworkEvents |= FD_READ;
+               if (node->io_write)
+                       lNetworkEvents |= FD_WRITE;
+
+               WSAEventSelect(node->pfd.fd, event, lNetworkEvents);
+               node->e = (EventNotifier *)event;
+
+               /* Update handler with latest information */
+               node->pfd.events = (io_read != NULL ? G_IO_IN : 0);
+               node->pfd.events |= (io_write != NULL ? G_IO_OUT : 0);
+               node->opaque = opaque;
+               node->io_read = io_read;
+               node->io_write = io_write;
+       }
+
+       aio_notify(ctx);
+}
+
 void aio_set_event_notifier(AioContext *ctx,
                             EventNotifier *e,
                             EventNotifierHandler *io_notify)
@@ -81,14 +145,88 @@ bool aio_pending(AioContext *ctx)
     AioHandler *node;
 
     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+       // HANDLE ?
         if (node->pfd.revents && node->io_notify) {
             return true;
         }
+
+        // SOCKET ?
+               int revents;
+
+               revents = node->pfd.revents & node->pfd.events;
+               if ((revents & G_IO_IN) && node->io_read) {
+                       return true;
+               }
+               if ((revents & G_IO_OUT) && node->io_write) {
+                       return true;
+               }
     }
 
     return false;
 }
 
+static bool aio_dispatch(AioContext *ctx)
+{
+    AioHandler *node;
+    bool progress = false;
+
+    /*
+     * We have to walk very carefully in case qemu_aio_set_fd_handler is
+     * called while we're walking.
+     */
+    node = QLIST_FIRST(&ctx->aio_handlers);
+       while (node) {
+               AioHandler *tmp = node;
+
+               ctx->walking_handlers++;
+
+               if (!node->deleted) {
+
+                       // HANDLE ?
+                       if (node->pfd.revents && node->io_notify) {
+                               node->pfd.revents = 0;
+                               node->io_notify(node->e);
+
+                               /* aio_notify() does not count as progress */
+                               if (node->e != &ctx->notifier) {
+                                       progress = true;
+                               }
+                       }
+
+                       // SOCKET ?
+                       int revents = node->pfd.revents & node->pfd.events;
+                       node->pfd.revents = 0;
+
+                       if ((revents & G_IO_IN) && node->io_read) {
+                               node->io_read(node->opaque);
+
+                               /* aio_notify() does not count as progress */
+                               if (node->opaque != &ctx->notifier) {
+                                       progress = true;
+                               }
+                       }
+                       if ((revents & G_IO_OUT) && node->io_write) {
+                               node->io_write(node->opaque);
+                               progress = true;
+                       }
+               }
+
+               node = QLIST_NEXT(node, node);
+
+               ctx->walking_handlers--;
+
+               if (!ctx->walking_handlers && tmp->deleted) {
+                       QLIST_REMOVE(tmp, node);
+                       g_free(tmp);
+               }
+       }
+
+       /* Run our timers */
+       progress |= timerlistgroup_run_timers(&ctx->tlg);
+
+    return progress;
+}
+
 bool aio_poll(AioContext *ctx, bool blocking)
 {
     AioHandler *node;
@@ -96,6 +234,8 @@ bool aio_poll(AioContext *ctx, bool blocking)
     bool progress;
     int count;
     int timeout;
+    fd_set rfds, wfds;
+    struct timeval tv0 = { .tv_sec = 0, .tv_usec = 0};
 
     progress = false;
 
@@ -109,41 +249,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
         progress = true;
     }
 
-    /* Run timers */
-    progress |= timerlistgroup_run_timers(&ctx->tlg);
-
-    /*
-     * Then dispatch any pending callbacks from the GSource.
-     *
-     * We have to walk very carefully in case qemu_aio_set_fd_handler is
-     * called while we're walking.
-     */
-    node = QLIST_FIRST(&ctx->aio_handlers);
-    while (node) {
-        AioHandler *tmp;
-
-        ctx->walking_handlers++;
-
-        if (node->pfd.revents && node->io_notify) {
-            node->pfd.revents = 0;
-            node->io_notify(node->e);
-
-            /* aio_notify() does not count as progress */
-            if (node->e != &ctx->notifier) {
-                progress = true;
-            }
-        }
-
-        tmp = node;
-        node = QLIST_NEXT(node, node);
-
-        ctx->walking_handlers--;
-
-        if (!ctx->walking_handlers && tmp->deleted) {
-            QLIST_REMOVE(tmp, node);
-            g_free(tmp);
-        }
-    }
+    progress = aio_dispatch(ctx);
 
     if (progress && !blocking) {
         return true;
@@ -151,12 +257,42 @@ bool aio_poll(AioContext *ctx, bool blocking)
 
     ctx->walking_handlers++;
 
-    /* fill fd sets */
+    FD_ZERO(&rfds);
+    FD_ZERO(&wfds);
     count = 0;
     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
-        if (!node->deleted && node->io_notify) {
+        if (node->deleted)
+            continue;
+
+       /* HANDLE ? */
+        if (node->io_notify) {
             events[count++] = event_notifier_get_handle(node->e);
         }
+
+        /* SOCKET ? */
+        else if (node->io_read || node->io_write) {
+            if (node->io_read)
+                FD_SET ((SOCKET)node->pfd.fd, &rfds);
+            if (node->io_write)
+                FD_SET ((SOCKET)node->pfd.fd, &wfds);
+
+                       events[count++] = (HANDLE)node->e;
+               }
+    }
+
+    if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
+        QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+            node->pfd.revents = 0;
+            if (FD_ISSET(node->pfd.fd, &rfds)) {
+                node->pfd.revents |= G_IO_IN;
+                blocking = false;
+            }
+
+            if (FD_ISSET(node->pfd.fd, &wfds)) {
+                node->pfd.revents |= G_IO_OUT;
+                   blocking = false;
+            }
+        }
     }
 
     ctx->walking_handlers--;
@@ -184,6 +320,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
 
             ctx->walking_handlers++;
 
+            // Handle ?
             if (!node->deleted &&
                 event_notifier_get_handle(node->e) == events[ret - 
WAIT_OBJECT_0] &&
                 node->io_notify) {
@@ -195,6 +332,27 @@ bool aio_poll(AioContext *ctx, bool blocking)
                 }
             }
 
+            // SOCKET ?
+                       if (!node->deleted &&
+                               ((HANDLE)node->e == events[ret - 
WAIT_OBJECT_0])) {
+
+                               // what happened ?
+                               WSANETWORKEVENTS ev;
+                               ev.lNetworkEvents = 0xC0FFEE;
+
+                               WSAEnumNetworkEvents(node->pfd.fd, 
(HANDLE)node->e, &ev);
+
+                               if ((ev.lNetworkEvents & FD_READ) != 0 && 
node->io_read) {
+                                       node->io_read(node->opaque);
+                                       progress = true;
+                               }
+
+                               if ((ev.lNetworkEvents & FD_WRITE) != 0 && 
node->io_write) {
+                                       node->io_write(node->opaque);
+                                       progress = true;
+                               }
+                       }
+
             tmp = node;
             node = QLIST_NEXT(node, node);
 
@@ -210,14 +368,10 @@ bool aio_poll(AioContext *ctx, bool blocking)
         events[ret - WAIT_OBJECT_0] = events[--count];
     }
 
-    if (blocking) {
-        /* Run the timers a second time. We do this because otherwise aio_wait
-         * will not note progress - and will stop a drain early - if we have
-         * a timer that was not ready to run entering g_poll but is ready
-         * after g_poll. This will only do anything if a timer has expired.
-         */
-        progress |= timerlistgroup_run_timers(&ctx->tlg);
-    }
+    /* Run dispatch even if there were no readable fds to run timers */
+       if (aio_dispatch(ctx)) {
+               progress = true;
+       }
 
     return progress;
 }
diff --git a/block/Makefile.objs b/block/Makefile.objs
index 4e8c91e..e28f916 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -1,4 +1,4 @@
-block-obj-y += raw_bsd.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o 
vvfat.o
+block-obj-y += raw_bsd.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o 
vvfat.o nbd.o
 block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o 
qcow2-cache.o
 block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
 block-obj-y += qed-check.o
@@ -10,7 +10,7 @@ block-obj-$(CONFIG_POSIX) += raw-posix.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 
 ifeq ($(CONFIG_POSIX),y)
-block-obj-y += nbd.o nbd-client.o sheepdog.o
+block-obj-y += nbd-client.o sheepdog.o
 block-obj-$(CONFIG_LIBISCSI) += iscsi.o
 block-obj-$(CONFIG_CURL) += curl.o
 block-obj-$(CONFIG_RBD) += rbd.o
diff --git a/block/nbd-client.h b/block/nbd-client.h
index f2a6337..d02acc1 100644
--- a/block/nbd-client.h
+++ b/block/nbd-client.h
@@ -19,7 +19,7 @@
 typedef struct NbdClientSession {
     int sock;
     uint32_t nbdflags;
-    off_t size;
+    uint64_t size;
     size_t blocksize;
 
     CoMutex send_mutex;
diff --git a/include/block/aio.h b/include/block/aio.h
index 2efdf41..effc8c2 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -199,7 +199,6 @@ bool aio_pending(AioContext *ctx);
  */
 bool aio_poll(AioContext *ctx, bool blocking);
 
-#ifdef CONFIG_POSIX
 /* Register a file descriptor and associated callbacks.  Behaves very similarly
  * to qemu_set_fd_handler2.  Unlike qemu_set_fd_handler2, these callbacks will
  * be invoked when using qemu_aio_wait().
@@ -212,7 +211,6 @@ void aio_set_fd_handler(AioContext *ctx,
                         IOHandler *io_read,
                         IOHandler *io_write,
                         void *opaque);
-#endif
 
 /* Register an event notifier and associated callbacks.  Behaves very similarly
  * to event_notifier_set_handler.  Unlike event_notifier_set_handler, these 
callbacks
diff --git a/include/block/nbd.h b/include/block/nbd.h
index c90f5e4..7a84882 100644
--- a/include/block/nbd.h
+++ b/include/block/nbd.h
@@ -69,7 +69,7 @@ int unix_socket_outgoing(const char *path);
 int unix_socket_incoming(const char *path);
 
 int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
-                          off_t *size, size_t *blocksize);
+                          uint64_t *size, size_t *blocksize);
 int nbd_init(int fd, int csock, uint32_t flags, off_t size, size_t blocksize);
 ssize_t nbd_send_request(int csock, struct nbd_request *request);
 ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply);
diff --git a/main-loop.c b/main-loop.c
index c3c9c28..0c82193 100644
--- a/main-loop.c
+++ b/main-loop.c
@@ -503,7 +503,6 @@ bool qemu_aio_wait(void)
     return aio_poll(qemu_aio_context, true);
 }
 
-#ifdef CONFIG_POSIX
 void qemu_aio_set_fd_handler(int fd,
                              IOHandler *io_read,
                              IOHandler *io_write,
@@ -511,7 +510,6 @@ void qemu_aio_set_fd_handler(int fd,
 {
     aio_set_fd_handler(qemu_aio_context, fd, io_read, io_write, opaque);
 }
-#endif
 
 void qemu_aio_set_event_notifier(EventNotifier *notifier,
                                  EventNotifierHandler *io_read)
diff --git a/nbd.c b/nbd.c
index 030f56b..475503d 100644
--- a/nbd.c
+++ b/nbd.c
@@ -149,7 +149,7 @@ ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool 
do_read)
             err = socket_error();
 
             /* recoverable error */
-            if (err == EINTR || (offset > 0 && err == EAGAIN)) {
+            if (err == EINTR || (offset > 0 && (err == EAGAIN || err == 
EWOULDBLOCK))) {
                 continue;
             }
 
@@ -434,7 +434,7 @@ fail:
 }
 
 int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
-                          off_t *size, size_t *blocksize)
+                          uint64_t *size, size_t *blocksize)
 {
     char buf[256];
     uint64_t magic, s;
diff --git a/qemu-coroutine-io.c b/qemu-coroutine-io.c
index 054ca70..eb89817 100644
--- a/qemu-coroutine-io.c
+++ b/qemu-coroutine-io.c
@@ -34,13 +34,15 @@ qemu_co_sendv_recvv(int sockfd, struct iovec *iov, unsigned 
iov_cnt,
 {
     size_t done = 0;
     ssize_t ret;
+    int err;
     while (done < bytes) {
         ret = iov_send_recv(sockfd, iov, iov_cnt,
                             offset + done, bytes - done, do_send);
         if (ret > 0) {
             done += ret;
         } else if (ret < 0) {
-            if (errno == EAGAIN) {
+               err = socket_error();
+            if (err == EAGAIN || err == EWOULDBLOCK) {
                 qemu_coroutine_yield();
             } else if (done == 0) {
                 return -1;
-- 
1.7.9




reply via email to

[Prev in Thread] Current Thread [Next in Thread]