[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [PATCH v6 3/7] aio: add aio_context_acquire() and aio_conte
From: |
Stefan Hajnoczi |
Subject: |
[Qemu-devel] [PATCH v6 3/7] aio: add aio_context_acquire() and aio_context_release() |
Date: |
Mon, 3 Mar 2014 11:30:04 +0100 |
It can be useful to run an AioContext from a thread which normally does
not "own" the AioContext. For example, request draining can be
implemented by acquiring the AioContext and looping aio_poll() until all
requests have been completed.
The following pattern should work:
/* Event loop thread */
while (running) {
aio_context_acquire(ctx);
aio_poll(ctx, true);
aio_context_release(ctx);
}
/* Another thread */
aio_context_acquire(ctx);
bdrv_read(bs, 0x1000, buf, 1);
aio_context_release(ctx);
This patch implements aio_context_acquire() and aio_context_release().
Note that existing aio_poll() callers do not need to worry about
acquiring and releasing - it is only needed when multiple threads will
call aio_poll() on the same AioContext.
Signed-off-by: Stefan Hajnoczi <address@hidden>
---
async.c | 18 +++++++++++++++++
include/block/aio.h | 18 +++++++++++++++++
tests/test-aio.c | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 94 insertions(+)
diff --git a/async.c b/async.c
index 5fb3fa6..6930185 100644
--- a/async.c
+++ b/async.c
@@ -214,6 +214,7 @@ aio_ctx_finalize(GSource *source)
thread_pool_free(ctx->thread_pool);
aio_set_event_notifier(ctx, &ctx->notifier, NULL);
event_notifier_cleanup(&ctx->notifier);
+ rfifolock_destroy(&ctx->lock);
qemu_mutex_destroy(&ctx->bh_lock);
g_array_free(ctx->pollfds, TRUE);
timerlistgroup_deinit(&ctx->tlg);
@@ -250,6 +251,12 @@ static void aio_timerlist_notify(void *opaque)
aio_notify(opaque);
}
+static void aio_rfifolock_cb(void *opaque)
+{
+ /* Kick owner thread in case they are blocked in aio_poll() */
+ aio_notify(opaque);
+}
+
AioContext *aio_context_new(void)
{
AioContext *ctx;
@@ -257,6 +264,7 @@ AioContext *aio_context_new(void)
ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
ctx->thread_pool = NULL;
qemu_mutex_init(&ctx->bh_lock);
+ rfifolock_init(&ctx->lock, aio_rfifolock_cb, ctx);
event_notifier_init(&ctx->notifier, false);
aio_set_event_notifier(ctx, &ctx->notifier,
(EventNotifierHandler *)
@@ -275,3 +283,13 @@ void aio_context_unref(AioContext *ctx)
{
g_source_unref(&ctx->source);
}
+
+void aio_context_acquire(AioContext *ctx)
+{
+ rfifolock_lock(&ctx->lock);
+}
+
+void aio_context_release(AioContext *ctx)
+{
+ rfifolock_unlock(&ctx->lock);
+}
diff --git a/include/block/aio.h b/include/block/aio.h
index 2efdf41..a92511b 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -19,6 +19,7 @@
#include "qemu/queue.h"
#include "qemu/event_notifier.h"
#include "qemu/thread.h"
+#include "qemu/rfifolock.h"
#include "qemu/timer.h"
typedef struct BlockDriverAIOCB BlockDriverAIOCB;
@@ -47,6 +48,9 @@ typedef void IOHandler(void *opaque);
struct AioContext {
GSource source;
+ /* Protects all fields from multi-threaded access */
+ RFifoLock lock;
+
/* The list of registered AIO handlers */
QLIST_HEAD(, AioHandler) aio_handlers;
@@ -104,6 +108,20 @@ void aio_context_ref(AioContext *ctx);
*/
void aio_context_unref(AioContext *ctx);
+/* Take ownership of the AioContext. If the AioContext will be shared between
+ * threads, a thread must have ownership when calling aio_poll().
+ *
+ * Note that multiple threads calling aio_poll() means timers, BHs, and
+ * callbacks may be invoked from a different thread than they were registered
+ * from. Therefore, code must use AioContext acquire/release or use
+ * fine-grained synchronization to protect shared state if other threads will
+ * be accessing it simultaneously.
+ */
+void aio_context_acquire(AioContext *ctx);
+
+/* Relinquish ownership of the AioContext. */
+void aio_context_release(AioContext *ctx);
+
/**
* aio_bh_new: Allocate a new bottom half structure.
*
diff --git a/tests/test-aio.c b/tests/test-aio.c
index 592721e..d384b0b 100644
--- a/tests/test-aio.c
+++ b/tests/test-aio.c
@@ -112,6 +112,63 @@ static void test_notify(void)
g_assert(!aio_poll(ctx, false));
}
+typedef struct {
+ QemuMutex start_lock;
+ bool thread_acquired;
+} AcquireTestData;
+
+static void *test_acquire_thread(void *opaque)
+{
+ AcquireTestData *data = opaque;
+
+ /* Wait for other thread to let us start */
+ qemu_mutex_lock(&data->start_lock);
+ qemu_mutex_unlock(&data->start_lock);
+
+ aio_context_acquire(ctx);
+ aio_context_release(ctx);
+
+ data->thread_acquired = true; /* success, we got here */
+
+ return NULL;
+}
+
+static void dummy_notifier_read(EventNotifier *unused)
+{
+ g_assert(false); /* should never be invoked */
+}
+
+static void test_acquire(void)
+{
+ QemuThread thread;
+ EventNotifier notifier;
+ AcquireTestData data;
+
+ /* Dummy event notifier ensures aio_poll() will block */
+ event_notifier_init(¬ifier, false);
+ aio_set_event_notifier(ctx, ¬ifier, dummy_notifier_read);
+ g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */
+
+ qemu_mutex_init(&data.start_lock);
+ qemu_mutex_lock(&data.start_lock);
+ data.thread_acquired = false;
+
+ qemu_thread_create(&thread, test_acquire_thread,
+ &data, QEMU_THREAD_JOINABLE);
+
+ /* Block in aio_poll(), let other thread kick us and acquire context */
+ aio_context_acquire(ctx);
+ qemu_mutex_unlock(&data.start_lock); /* let the thread run */
+ g_assert(!aio_poll(ctx, true));
+ aio_context_release(ctx);
+
+ qemu_thread_join(&thread);
+ aio_set_event_notifier(ctx, ¬ifier, NULL);
+ event_notifier_cleanup(¬ifier);
+
+ g_assert(data.thread_acquired);
+}
+
static void test_bh_schedule(void)
{
BHTestData data = { .n = 0 };
@@ -775,6 +832,7 @@ int main(int argc, char **argv)
g_test_init(&argc, &argv, NULL);
g_test_add_func("/aio/notify", test_notify);
+ g_test_add_func("/aio/acquire", test_acquire);
g_test_add_func("/aio/bh/schedule", test_bh_schedule);
g_test_add_func("/aio/bh/schedule10", test_bh_schedule10);
g_test_add_func("/aio/bh/cancel", test_bh_cancel);
--
1.8.5.3
- [Qemu-devel] [PATCH v6 0/7] dataplane: switch to N:M devices-per-thread model, Stefan Hajnoczi, 2014/03/03
- [Qemu-devel] [PATCH v6 1/7] object: add object_get_canonical_path_component(), Stefan Hajnoczi, 2014/03/03
- [Qemu-devel] [PATCH v6 2/7] rfifolock: add recursive FIFO lock, Stefan Hajnoczi, 2014/03/03
- [Qemu-devel] [PATCH v6 4/7] iothread: add I/O thread object, Stefan Hajnoczi, 2014/03/03
- [Qemu-devel] [PATCH v6 3/7] aio: add aio_context_acquire() and aio_context_release(),
Stefan Hajnoczi <=
- [Qemu-devel] [PATCH v6 5/7] qdev: make get_pointer() handle temporary strings, Stefan Hajnoczi, 2014/03/03
- [Qemu-devel] [PATCH v6 6/7] iothread: add "iothread" qdev property type, Stefan Hajnoczi, 2014/03/03
- [Qemu-devel] [PATCH v6 7/7] dataplane: replace internal thread with IOThread, Stefan Hajnoczi, 2014/03/03
- Re: [Qemu-devel] [PATCH v6 0/7] dataplane: switch to N:M devices-per-thread model, Stefan Hajnoczi, 2014/03/11