qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH 2/3] net: introduce lock to protect NetClientState's


From: Liu Ping Fan
Subject: [Qemu-devel] [PATCH 2/3] net: introduce lock to protect NetClientState's send_queue
Date: Sun, 3 Mar 2013 21:21:21 +0800

From: Liu Ping Fan <address@hidden>

Use nc->transfer_lock to protect the nc->peer->send_queue. All of the
deleter and senders will sync on this lock, so we can also survive across
unplug.

Signed-off-by: Liu Ping Fan <address@hidden>
---
 include/net/net.h   |    4 +++
 include/net/queue.h |    1 +
 net/hub.c           |   21 +++++++++++++-
 net/net.c           |   72 ++++++++++++++++++++++++++++++++++++++++++++++++---
 net/queue.c         |   15 +++++++++-
 5 files changed, 105 insertions(+), 8 deletions(-)

diff --git a/include/net/net.h b/include/net/net.h
index 24563ef..3e4b9df 100644
--- a/include/net/net.h
+++ b/include/net/net.h
@@ -63,6 +63,8 @@ typedef struct NetClientInfo {
 } NetClientInfo;
 
 struct NetClientState {
+    /* protect peer's send_queue */
+    QemuMutex transfer_lock;
     NetClientInfo *info;
     int link_down;
     QTAILQ_ENTRY(NetClientState) next;
@@ -78,6 +80,7 @@ struct NetClientState {
 
 typedef struct NICState {
     NetClientState ncs[MAX_QUEUE_NUM];
+    NetClientState *pending_peer[MAX_QUEUE_NUM];
     NICConf *conf;
     void *opaque;
     bool peer_deleted;
@@ -105,6 +108,7 @@ NetClientState *qemu_find_vlan_client_by_name(Monitor *mon, 
int vlan_id,
                                               const char *client_str);
 typedef void (*qemu_nic_foreach)(NICState *nic, void *opaque);
 void qemu_foreach_nic(qemu_nic_foreach func, void *opaque);
+int qemu_can_send_packet_nolock(NetClientState *sender);
 int qemu_can_send_packet(NetClientState *nc);
 ssize_t qemu_sendv_packet(NetClientState *nc, const struct iovec *iov,
                           int iovcnt);
diff --git a/include/net/queue.h b/include/net/queue.h
index f60e57f..0ecd23b 100644
--- a/include/net/queue.h
+++ b/include/net/queue.h
@@ -67,6 +67,7 @@ ssize_t qemu_net_queue_send_iov(NetQueue *queue,
                                 NetPacketSent *sent_cb);
 
 void qemu_net_queue_purge(NetQueue *queue, NetClientState *from);
+void qemu_net_queue_purge_all(NetQueue *queue);
 bool qemu_net_queue_flush(NetQueue *queue);
 
 #endif /* QEMU_NET_QUEUE_H */
diff --git a/net/hub.c b/net/hub.c
index 81d2a04..97c3ac3 100644
--- a/net/hub.c
+++ b/net/hub.c
@@ -53,9 +53,14 @@ static ssize_t net_hub_receive(NetHub *hub, NetHubPort 
*source_port,
         if (port == source_port) {
             continue;
         }
-
+        qemu_mutex_lock(&port->nc.transfer_lock);
+        if (!port->nc.peer) {
+            qemu_mutex_unlock(&port->nc.transfer_lock);
+            continue;
+        }
         qemu_net_queue_append(port->nc.peer->send_queue, &port->nc,
                             QEMU_NET_PACKET_FLAG_NONE, buf, len, NULL);
+        qemu_mutex_unlock(&port->nc.transfer_lock);
         event_notifier_set(&port->e);
     }
     return len;
@@ -65,7 +70,13 @@ static void hub_port_deliver_packet(void *opaque)
 {
     NetHubPort *port = (NetHubPort *)opaque;
 
+    qemu_mutex_lock(&port->nc.transfer_lock);
+    if (!port->nc.peer) {
+        qemu_mutex_unlock(&port->nc.transfer_lock);
+        return;
+    }
     qemu_net_queue_flush(port->nc.peer->send_queue);
+    qemu_mutex_unlock(&port->nc.transfer_lock);
 }
 
 static ssize_t net_hub_receive_iov(NetHub *hub, NetHubPort *source_port,
@@ -78,10 +89,16 @@ static ssize_t net_hub_receive_iov(NetHub *hub, NetHubPort 
*source_port,
         if (port == source_port) {
             continue;
         }
-
+        qemu_mutex_lock(&port->nc.transfer_lock);
+        if (!port->nc.peer) {
+            qemu_mutex_unlock(&port->nc.transfer_lock);
+            continue;
+        }
         qemu_net_queue_append_iov(port->nc.peer->send_queue, &port->nc,
                             QEMU_NET_PACKET_FLAG_NONE, iov, iovcnt, NULL);
+        qemu_mutex_unlock(&port->nc.transfer_lock);
         event_notifier_set(&port->e);
+
     }
     return len;
 }
diff --git a/net/net.c b/net/net.c
index 544542b..0acb933 100644
--- a/net/net.c
+++ b/net/net.c
@@ -207,6 +207,7 @@ static void qemu_net_client_setup(NetClientState *nc,
         nc->peer = peer;
         peer->peer = nc;
     }
+    qemu_mutex_init(&nc->transfer_lock);
     QTAILQ_INSERT_TAIL(&net_clients, nc, next);
 
     nc->send_queue = qemu_new_net_queue(nc);
@@ -285,6 +286,7 @@ void *qemu_get_nic_opaque(NetClientState *nc)
 
 static void qemu_cleanup_net_client(NetClientState *nc)
 {
+    /* This is the place where may be out of big lock, when dev finalized */
     QTAILQ_REMOVE(&net_clients, nc, next);
 
     if (nc->info->cleanup) {
@@ -307,6 +309,28 @@ static void qemu_free_net_client(NetClientState *nc)
     }
 }
 
+/* exclude race with rx/tx path, flush out peer's queue */
+static void qemu_flushout_net_client(NetClientState *nc)
+{
+    NetClientState *peer;
+
+    /* sync on receive path */
+    peer = nc->peer;
+    if (peer) {
+        qemu_mutex_lock(&peer->transfer_lock);
+        peer->peer = NULL;
+        qemu_mutex_unlock(&peer->transfer_lock);
+    }
+
+    /* sync on send from this nc */
+    qemu_mutex_lock(&nc->transfer_lock);
+    nc->peer = NULL;
+    if (peer) {
+        qemu_net_queue_purge(peer->send_queue, nc);
+    }
+    qemu_mutex_unlock(&nc->transfer_lock);
+}
+
 void qemu_del_net_client(NetClientState *nc)
 {
     NetClientState *ncs[MAX_QUEUE_NUM];
@@ -337,7 +361,9 @@ void qemu_del_net_client(NetClientState *nc)
         }
 
         for (i = 0; i < queues; i++) {
+            qemu_flushout_net_client(ncs[i]);
             qemu_cleanup_net_client(ncs[i]);
+            nic->pending_peer[i] = ncs[i];
         }
 
         return;
@@ -346,6 +372,7 @@ void qemu_del_net_client(NetClientState *nc)
     assert(nc->info->type != NET_CLIENT_OPTIONS_KIND_NIC);
 
     for (i = 0; i < queues; i++) {
+        qemu_flushout_net_client(ncs[i]);
         qemu_cleanup_net_client(ncs[i]);
         qemu_free_net_client(ncs[i]);
     }
@@ -358,16 +385,19 @@ void qemu_del_nic(NICState *nic)
     /* If this is a peer NIC and peer has already been deleted, free it now. */
     if (nic->peer_deleted) {
         for (i = 0; i < queues; i++) {
-            qemu_free_net_client(qemu_get_subqueue(nic, i)->peer);
+            qemu_free_net_client(nic->pending_peer[i]);
         }
     }
 
     for (i = queues - 1; i >= 0; i--) {
+
         NetClientState *nc = qemu_get_subqueue(nic, i);
 
+        qemu_flushout_net_client(nc);
         qemu_cleanup_net_client(nc);
         qemu_free_net_client(nc);
     }
+
 }
 
 void qemu_foreach_nic(qemu_nic_foreach func, void *opaque)
@@ -383,7 +413,7 @@ void qemu_foreach_nic(qemu_nic_foreach func, void *opaque)
     }
 }
 
-int qemu_can_send_packet(NetClientState *sender)
+int qemu_can_send_packet_nolock(NetClientState *sender)
 {
     if (!sender->peer) {
         return 1;
@@ -398,6 +428,29 @@ int qemu_can_send_packet(NetClientState *sender)
     return 1;
 }
 
+int qemu_can_send_packet(NetClientState *sender)
+{
+    int ret = 1;
+
+    qemu_mutex_lock(&sender->transfer_lock);
+    if (!sender->peer) {
+        ret = 1;
+        goto unlock;
+    }
+
+    if (sender->peer->receive_disabled) {
+        ret = 0;
+        goto unlock;
+    } else if (sender->peer->info->can_receive &&
+               !sender->peer->info->can_receive(sender->peer)) {
+        ret = 0;
+        goto unlock;
+    }
+unlock:
+    qemu_mutex_unlock(&sender->transfer_lock);
+    return ret;
+}
+
 ssize_t qemu_deliver_packet(NetClientState *sender,
                             unsigned flags,
                             const uint8_t *data,
@@ -455,19 +508,24 @@ static ssize_t 
qemu_send_packet_async_with_flags(NetClientState *sender,
                                                  NetPacketSent *sent_cb)
 {
     NetQueue *queue;
+    ssize_t rslt;
 
 #ifdef DEBUG_NET
     printf("qemu_send_packet_async:\n");
     hex_dump(stdout, buf, size);
 #endif
 
+    qemu_mutex_lock(&sender->transfer_lock);
     if (sender->link_down || !sender->peer) {
+        qemu_mutex_lock(&sender->transfer_lock);
         return size;
     }
 
     queue = sender->peer->send_queue;
 
-    return qemu_net_queue_send(queue, sender, flags, buf, size, sent_cb);
+    rslt = qemu_net_queue_send(queue, sender, flags, buf, size, sent_cb);
+    qemu_mutex_unlock(&sender->transfer_lock);
+    return rslt;
 }
 
 ssize_t qemu_send_packet_async(NetClientState *sender,
@@ -535,16 +593,21 @@ ssize_t qemu_sendv_packet_async(NetClientState *sender,
                                 NetPacketSent *sent_cb)
 {
     NetQueue *queue;
+    ssize_t rslt;
 
+    qemu_mutex_lock(&sender->transfer_lock);
     if (sender->link_down || !sender->peer) {
+        qemu_mutex_unlock(&sender->transfer_lock);
         return iov_size(iov, iovcnt);
     }
 
     queue = sender->peer->send_queue;
 
-    return qemu_net_queue_send_iov(queue, sender,
+    rslt = qemu_net_queue_send_iov(queue, sender,
                                    QEMU_NET_PACKET_FLAG_NONE,
                                    iov, iovcnt, sent_cb);
+    qemu_mutex_unlock(&sender->transfer_lock);
+    return rslt;
 }
 
 ssize_t
@@ -984,6 +1047,7 @@ void do_info_network(Monitor *mon, const QDict *qdict)
             print_net_client(mon, peer);
         }
     }
+
 }
 
 void qmp_set_link(const char *name, bool up, Error **errp)
diff --git a/net/queue.c b/net/queue.c
index d4fb965..ad65523 100644
--- a/net/queue.c
+++ b/net/queue.c
@@ -24,6 +24,7 @@
 #include "net/queue.h"
 #include "qemu/queue.h"
 #include "net/net.h"
+#include "qom/object.h"
 
 /* The delivery handler may only return zero if it will call
  * qemu_net_queue_flush() when it determines that it is once again able
@@ -172,7 +173,7 @@ ssize_t qemu_net_queue_send(NetQueue *queue,
 {
     ssize_t ret;
 
-    if (queue->delivering || !qemu_can_send_packet(sender)) {
+    if (queue->delivering || !qemu_can_send_packet_nolock(sender)) {
         qemu_net_queue_append(queue, sender, flags, data, size, sent_cb);
         return 0;
     }
@@ -197,7 +198,7 @@ ssize_t qemu_net_queue_send_iov(NetQueue *queue,
 {
     ssize_t ret;
 
-    if (queue->delivering || !qemu_can_send_packet(sender)) {
+    if (queue->delivering || !qemu_can_send_packet_nolock(sender)) {
         qemu_net_queue_append_iov(queue, sender, flags, iov, iovcnt, sent_cb);
         return 0;
     }
@@ -225,6 +226,16 @@ void qemu_net_queue_purge(NetQueue *queue, NetClientState 
*from)
     }
 }
 
+void qemu_net_queue_purge_all(NetQueue *queue)
+{
+    NetPacket *packet, *next;
+
+    QTAILQ_FOREACH_SAFE(packet, &queue->packets, entry, next) {
+        QTAILQ_REMOVE(&queue->packets, packet, entry);
+            g_free(packet);
+    }
+}
+
 bool qemu_net_queue_flush(NetQueue *queue)
 {
     while (!QTAILQ_EMPTY(&queue->packets)) {
-- 
1.7.4.4




reply via email to

[Prev in Thread] Current Thread [Next in Thread]