qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH RFC] iothread: Add "spawns" property


From: Fam Zheng
Subject: [Qemu-devel] [PATCH RFC] iothread: Add "spawns" property
Date: Wed, 19 Oct 2016 16:04:50 +0800

The option specifies how many threads to spawn under the iothread
object. All threads share the same AioContext so they can safely run
(contend) together.

With AioContext going away, the spawns will natually enable the block
multi-queue work.

Signed-off-by: Fam Zheng <address@hidden>

---

Based on v2 of Paolo's RFifoLock removal series, with which the
symmetric contention on the single AioContext is no longer a busy
preempt loop.
---
 include/sysemu/iothread.h |  19 ++++--
 iothread.c                | 148 ++++++++++++++++++++++++++++++++++------------
 2 files changed, 125 insertions(+), 42 deletions(-)

diff --git a/include/sysemu/iothread.h b/include/sysemu/iothread.h
index 68ac2de..b50e76c 100644
--- a/include/sysemu/iothread.h
+++ b/include/sysemu/iothread.h
@@ -19,16 +19,25 @@
 
 #define TYPE_IOTHREAD "iothread"
 
-typedef struct {
-    Object parent_obj;
+typedef struct IOThread IOThread;
 
+typedef struct {
     QemuThread thread;
-    AioContext *ctx;
+    int thread_id;
+    IOThread *iothread;
     QemuMutex init_done_lock;
     QemuCond init_done_cond;    /* is thread initialization done? */
+    bool running;
+} IOThreadSpawn;
+
+struct IOThread {
+    Object parent_obj;
+
+    uint32_t nspawns;
+    IOThreadSpawn *spawns;
+    AioContext *ctx;
     bool stopping;
-    int thread_id;
-} IOThread;
+};
 
 #define IOTHREAD(obj) \
    OBJECT_CHECK(IOThread, obj, TYPE_IOTHREAD)
diff --git a/iothread.c b/iothread.c
index bd70344..bad00fb 100644
--- a/iothread.c
+++ b/iothread.c
@@ -39,49 +39,66 @@ AioContext *qemu_get_current_aio_context(void)
 
 static void *iothread_run(void *opaque)
 {
-    IOThread *iothread = opaque;
+    IOThreadSpawn *s = opaque;
+
+    s->running = true;
 
     rcu_register_thread();
 
-    my_iothread = iothread;
-    qemu_mutex_lock(&iothread->init_done_lock);
-    iothread->thread_id = qemu_get_thread_id();
-    qemu_cond_signal(&iothread->init_done_cond);
-    qemu_mutex_unlock(&iothread->init_done_lock);
+    my_iothread = s->iothread;
+    qemu_mutex_lock(&s->init_done_lock);
+    s->thread_id = qemu_get_thread_id();
+    qemu_cond_signal(&s->init_done_cond);
+    qemu_mutex_unlock(&s->init_done_lock);
 
-    while (!atomic_read(&iothread->stopping)) {
-        aio_poll(iothread->ctx, true);
+    while (!atomic_read(&s->iothread->stopping)) {
+        aio_poll(s->iothread->ctx, true);
     }
 
     rcu_unregister_thread();
+    s->running = false;
     return NULL;
 }
 
 static int iothread_stop(Object *object, void *opaque)
 {
+    int i;
     IOThread *iothread;
+    bool has_running = true;
 
     iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
     if (!iothread || !iothread->ctx) {
         return 0;
     }
     iothread->stopping = true;
-    aio_notify(iothread->ctx);
-    qemu_thread_join(&iothread->thread);
+    while (has_running) {
+        has_running = false;
+        aio_notify(iothread->ctx);
+        for (i = 0; i < iothread->nspawns; ++i) {
+            has_running |= iothread->spawns[i].running;
+        }
+    }
+    for (i = 0; i < iothread->nspawns; i++) {
+        qemu_thread_join(&iothread->spawns[i].thread);
+    }
     return 0;
 }
 
 static void iothread_instance_finalize(Object *obj)
 {
+    int i;
     IOThread *iothread = IOTHREAD(obj);
 
     iothread_stop(obj, NULL);
-    qemu_cond_destroy(&iothread->init_done_cond);
-    qemu_mutex_destroy(&iothread->init_done_lock);
-    if (!iothread->ctx) {
-        return;
+    for (i = 0; i < iothread->nspawns; ++i) {
+        IOThreadSpawn *s = &iothread->spawns[i];
+        qemu_cond_destroy(&s->init_done_cond);
+        qemu_mutex_destroy(&s->init_done_lock);
+        if (iothread->ctx) {
+            aio_context_unref(s->iothread->ctx);
+        }
     }
-    aio_context_unref(iothread->ctx);
+    g_free(iothread->spawns);
 }
 
 static void iothread_complete(UserCreatable *obj, Error **errp)
@@ -89,35 +106,81 @@ static void iothread_complete(UserCreatable *obj, Error 
**errp)
     Error *local_error = NULL;
     IOThread *iothread = IOTHREAD(obj);
     char *name, *thread_name;
+    int i;
 
     iothread->stopping = false;
-    iothread->thread_id = -1;
     iothread->ctx = aio_context_new(&local_error);
     if (!iothread->ctx) {
         error_propagate(errp, local_error);
         return;
     }
-
-    qemu_mutex_init(&iothread->init_done_lock);
-    qemu_cond_init(&iothread->init_done_cond);
+    if (!iothread->nspawns) {
+        iothread->nspawns = 1;
+    }
+    iothread->spawns = g_new0(IOThreadSpawn, iothread->nspawns);
 
     /* This assumes we are called from a thread with useful CPU affinity for us
      * to inherit.
      */
     name = object_get_canonical_path_component(OBJECT(obj));
-    thread_name = g_strdup_printf("IO %s", name);
-    qemu_thread_create(&iothread->thread, thread_name, iothread_run,
-                       iothread, QEMU_THREAD_JOINABLE);
-    g_free(thread_name);
+    for (i = 0; i < iothread->nspawns; ++i) {
+        IOThreadSpawn *s = &iothread->spawns[i];
+        s->thread_id = -1;
+        s->iothread = iothread;
+        qemu_mutex_init(&s->init_done_lock);
+        qemu_cond_init(&s->init_done_cond);
+        thread_name = g_strdup_printf("IO %s[%d]", name, i);
+        qemu_thread_create(&s->thread, thread_name, iothread_run,
+                           s, QEMU_THREAD_JOINABLE);
+        g_free(thread_name);
+    }
     g_free(name);
 
-    /* Wait for initialization to complete */
-    qemu_mutex_lock(&iothread->init_done_lock);
-    while (iothread->thread_id == -1) {
-        qemu_cond_wait(&iothread->init_done_cond,
-                       &iothread->init_done_lock);
+    for (i = 0; i < iothread->nspawns; ++i) {
+        IOThreadSpawn *s = &iothread->spawns[i];
+        /* Wait for initialization to complete */
+        qemu_mutex_lock(&s->init_done_lock);
+        while (s->thread_id == -1) {
+            qemu_cond_wait(&s->init_done_cond,
+                           &s->init_done_lock);
+        }
+        qemu_mutex_unlock(&s->init_done_lock);
     }
-    qemu_mutex_unlock(&iothread->init_done_lock);
+}
+
+static void iothread_set_spawns(Object *obj, Visitor *v,
+                                const char *name, void *opaque,
+                                Error **errp)
+{
+    IOThread *iothread = IOTHREAD(obj);
+
+    if (iothread->nspawns) {
+        error_setg(errp,
+                  "Modifying iothread spawns is not supported");
+        return;
+    }
+    visit_type_uint32(v, name, &iothread->nspawns, errp);
+    if (!iothread->nspawns) {
+        error_setg(errp, "Invalid iothread spawn number: %u",
+                   iothread->nspawns);
+    }
+}
+
+static void iothread_get_spawns(Object *obj, Visitor *v,
+                                const char *name, void *opaque,
+                                Error **errp)
+{
+    IOThread *iothread = IOTHREAD(obj);
+    visit_type_uint32(v, name, &iothread->nspawns, errp);
+}
+
+
+static void iothread_instance_init(Object *obj)
+{
+    object_property_add(obj, "spawns", "uint32",
+                        iothread_get_spawns,
+                        iothread_set_spawns,
+                        NULL, NULL, &error_abort);
 }
 
 static void iothread_class_init(ObjectClass *klass, void *class_data)
@@ -131,6 +194,7 @@ static const TypeInfo iothread_info = {
     .parent = TYPE_OBJECT,
     .class_init = iothread_class_init,
     .instance_size = sizeof(IOThread),
+    .instance_init = iothread_instance_init,
     .instance_finalize = iothread_instance_finalize,
     .interfaces = (InterfaceInfo[]) {
         {TYPE_USER_CREATABLE},
@@ -161,22 +225,32 @@ static int query_one_iothread(Object *object, void 
*opaque)
     IOThreadInfoList *elem;
     IOThreadInfo *info;
     IOThread *iothread;
+    char *id;
+    int i;
 
     iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
     if (!iothread) {
         return 0;
     }
 
-    info = g_new0(IOThreadInfo, 1);
-    info->id = iothread_get_id(iothread);
-    info->thread_id = iothread->thread_id;
+    id = iothread_get_id(iothread);
+    for (i = 0; i < iothread->nspawns; ++i) {
+        info = g_new0(IOThreadInfo, 1);
+        info->thread_id = iothread->spawns[i].thread_id;
+        if (iothread->nspawns > 1) {
+            info->id = g_strdup_printf("%s[%d]", id, i);
+        } else {
+            info->id = g_strdup(id);
+        }
 
-    elem = g_new0(IOThreadInfoList, 1);
-    elem->value = info;
-    elem->next = NULL;
+        elem = g_new0(IOThreadInfoList, 1);
+        elem->value = info;
+        elem->next = NULL;
 
-    **prev = elem;
-    *prev = &elem->next;
+        **prev = elem;
+        *prev = &elem->next;
+    }
+    g_free(id);
     return 0;
 }
 
-- 
2.7.4




reply via email to

[Prev in Thread] Current Thread [Next in Thread]