[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH 1/5] threadpool: move globals into struct Thread
From: |
Paolo Bonzini |
Subject: |
Re: [Qemu-devel] [PATCH 1/5] threadpool: move globals into struct ThreadPool |
Date: |
Wed, 06 Mar 2013 18:25:48 +0100 |
User-agent: |
Mozilla/5.0 (X11; Linux x86_64; rv:17.0) Gecko/20130219 Thunderbird/17.0.3 |
Il 06/03/2013 16:45, Stefan Hajnoczi ha scritto:
> Move global variables into a struct so multiple thread pools can be
> supported in the future.
>
> This patch does not change thread-pool.h interfaces. There is still a
> global thread pool and it is not yet possible to create/destroy
> individual thread pools. Moving the variables into a struct first makes
> later patches easier to review.
>
> Signed-off-by: Stefan Hajnoczi <address@hidden>
> ---
> thread-pool.c | 195
> ++++++++++++++++++++++++++++++++++------------------------
> trace-events | 4 +-
> 2 files changed, 117 insertions(+), 82 deletions(-)
>
> diff --git a/thread-pool.c b/thread-pool.c
> index e3ca64d..8a957b9 100644
> --- a/thread-pool.c
> +++ b/thread-pool.c
> @@ -24,7 +24,9 @@
> #include "qemu/event_notifier.h"
> #include "block/thread-pool.h"
>
> -static void do_spawn_thread(void);
> +typedef struct ThreadPool ThreadPool;
> +
> +static void do_spawn_thread(ThreadPool *pool);
>
> typedef struct ThreadPoolElement ThreadPoolElement;
>
> @@ -37,6 +39,7 @@ enum ThreadState {
>
> struct ThreadPoolElement {
> BlockDriverAIOCB common;
> + ThreadPool *pool;
> ThreadPoolFunc *func;
> void *arg;
>
> @@ -54,49 +57,56 @@ struct ThreadPoolElement {
> QLIST_ENTRY(ThreadPoolElement) all;
> };
>
> -static EventNotifier notifier;
> -static QemuMutex lock;
> -static QemuCond check_cancel;
> -static QemuSemaphore sem;
> -static int max_threads = 64;
> -static QEMUBH *new_thread_bh;
> -
> -/* The following variables are protected by the global mutex. */
> -static QLIST_HEAD(, ThreadPoolElement) head;
> -
> -/* The following variables are protected by lock. */
> -static QTAILQ_HEAD(, ThreadPoolElement) request_list;
> -static int cur_threads;
> -static int idle_threads;
> -static int new_threads; /* backlog of threads we need to create */
> -static int pending_threads; /* threads created but not running yet */
> -static int pending_cancellations; /* whether we need a cond_broadcast */
> -
> -static void *worker_thread(void *unused)
> +struct ThreadPool {
> + EventNotifier notifier;
> + QemuMutex lock;
> + QemuCond check_cancel;
> + QemuSemaphore sem;
> + int max_threads;
> + QEMUBH *new_thread_bh;
> +
> + /* The following variables are only accessed from one AioContext. */
> + QLIST_HEAD(, ThreadPoolElement) head;
> +
> + /* The following variables are protected by lock. */
> + QTAILQ_HEAD(, ThreadPoolElement) request_list;
> + int cur_threads;
> + int idle_threads;
> + int new_threads; /* backlog of threads we need to create */
> + int pending_threads; /* threads created but not running yet */
> + int pending_cancellations; /* whether we need a cond_broadcast */
> +};
> +
> +/* Currently there is only one thread pool instance. */
> +static ThreadPool global_pool;
> +
> +static void *worker_thread(void *opaque)
> {
> - qemu_mutex_lock(&lock);
> - pending_threads--;
> - do_spawn_thread();
> + ThreadPool *pool = opaque;
> +
> + qemu_mutex_lock(&pool->lock);
> + pool->pending_threads--;
> + do_spawn_thread(pool);
>
> while (1) {
> ThreadPoolElement *req;
> int ret;
>
> do {
> - idle_threads++;
> - qemu_mutex_unlock(&lock);
> - ret = qemu_sem_timedwait(&sem, 10000);
> - qemu_mutex_lock(&lock);
> - idle_threads--;
> - } while (ret == -1 && !QTAILQ_EMPTY(&request_list));
> + pool->idle_threads++;
> + qemu_mutex_unlock(&pool->lock);
> + ret = qemu_sem_timedwait(&pool->sem, 10000);
> + qemu_mutex_lock(&pool->lock);
> + pool->idle_threads--;
> + } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
> if (ret == -1) {
> break;
> }
>
> - req = QTAILQ_FIRST(&request_list);
> - QTAILQ_REMOVE(&request_list, req, reqs);
> + req = QTAILQ_FIRST(&pool->request_list);
> + QTAILQ_REMOVE(&pool->request_list, req, reqs);
> req->state = THREAD_ACTIVE;
> - qemu_mutex_unlock(&lock);
> + qemu_mutex_unlock(&pool->lock);
>
> ret = req->func(req->arg);
>
> @@ -105,45 +115,47 @@ static void *worker_thread(void *unused)
> smp_wmb();
> req->state = THREAD_DONE;
>
> - qemu_mutex_lock(&lock);
> - if (pending_cancellations) {
> - qemu_cond_broadcast(&check_cancel);
> + qemu_mutex_lock(&pool->lock);
> + if (pool->pending_cancellations) {
> + qemu_cond_broadcast(&pool->check_cancel);
> }
>
> - event_notifier_set(¬ifier);
> + event_notifier_set(&pool->notifier);
> }
>
> - cur_threads--;
> - qemu_mutex_unlock(&lock);
> + pool->cur_threads--;
> + qemu_mutex_unlock(&pool->lock);
> return NULL;
> }
>
> -static void do_spawn_thread(void)
> +static void do_spawn_thread(ThreadPool *pool)
> {
> QemuThread t;
>
> /* Runs with lock taken. */
> - if (!new_threads) {
> + if (!pool->new_threads) {
> return;
> }
>
> - new_threads--;
> - pending_threads++;
> + pool->new_threads--;
> + pool->pending_threads++;
>
> - qemu_thread_create(&t, worker_thread, NULL, QEMU_THREAD_DETACHED);
> + qemu_thread_create(&t, worker_thread, pool, QEMU_THREAD_DETACHED);
> }
>
> static void spawn_thread_bh_fn(void *opaque)
> {
> - qemu_mutex_lock(&lock);
> - do_spawn_thread();
> - qemu_mutex_unlock(&lock);
> + ThreadPool *pool = opaque;
> +
> + qemu_mutex_lock(&pool->lock);
> + do_spawn_thread(pool);
> + qemu_mutex_unlock(&pool->lock);
> }
>
> -static void spawn_thread(void)
> +static void spawn_thread(ThreadPool *pool)
> {
> - cur_threads++;
> - new_threads++;
> + pool->cur_threads++;
> + pool->new_threads++;
> /* If there are threads being created, they will spawn new workers, so
> * we don't spend time creating many threads in a loop holding a mutex or
> * starving the current vcpu.
> @@ -151,23 +163,25 @@ static void spawn_thread(void)
> * If there are no idle threads, ask the main thread to create one, so we
> * inherit the correct affinity instead of the vcpu affinity.
> */
> - if (!pending_threads) {
> - qemu_bh_schedule(new_thread_bh);
> + if (!pool->pending_threads) {
> + qemu_bh_schedule(pool->new_thread_bh);
> }
> }
>
> static void event_notifier_ready(EventNotifier *notifier)
> {
> + ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
> ThreadPoolElement *elem, *next;
>
> event_notifier_test_and_clear(notifier);
> restart:
> - QLIST_FOREACH_SAFE(elem, &head, all, next) {
> + QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
> if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
> continue;
> }
> if (elem->state == THREAD_DONE) {
> - trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
> + trace_thread_pool_complete(pool, elem, elem->common.opaque,
> + elem->ret);
> }
> if (elem->state == THREAD_DONE && elem->common.cb) {
> QLIST_REMOVE(elem, all);
> @@ -186,34 +200,36 @@ restart:
>
> static int thread_pool_active(EventNotifier *notifier)
> {
> - return !QLIST_EMPTY(&head);
> + ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
> + return !QLIST_EMPTY(&pool->head);
> }
>
> static void thread_pool_cancel(BlockDriverAIOCB *acb)
> {
> ThreadPoolElement *elem = (ThreadPoolElement *)acb;
> + ThreadPool *pool = elem->pool;
>
> trace_thread_pool_cancel(elem, elem->common.opaque);
>
> - qemu_mutex_lock(&lock);
> + qemu_mutex_lock(&pool->lock);
> if (elem->state == THREAD_QUEUED &&
> /* No thread has yet started working on elem. we can try to "steal"
> * the item from the worker if we can get a signal from the
> * semaphore. Because this is non-blocking, we can do it with
> * the lock taken and ensure that elem will remain THREAD_QUEUED.
> */
> - qemu_sem_timedwait(&sem, 0) == 0) {
> - QTAILQ_REMOVE(&request_list, elem, reqs);
> + qemu_sem_timedwait(&pool->sem, 0) == 0) {
> + QTAILQ_REMOVE(&pool->request_list, elem, reqs);
> elem->state = THREAD_CANCELED;
> - event_notifier_set(¬ifier);
> + event_notifier_set(&pool->notifier);
> } else {
> - pending_cancellations++;
> + pool->pending_cancellations++;
> while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE)
> {
> - qemu_cond_wait(&check_cancel, &lock);
> + qemu_cond_wait(&pool->check_cancel, &pool->lock);
> }
> - pending_cancellations--;
> + pool->pending_cancellations--;
> }
> - qemu_mutex_unlock(&lock);
> + qemu_mutex_unlock(&pool->lock);
> }
>
> static const AIOCBInfo thread_pool_aiocb_info = {
> @@ -224,24 +240,26 @@ static const AIOCBInfo thread_pool_aiocb_info = {
> BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
> BlockDriverCompletionFunc *cb, void *opaque)
> {
> + ThreadPool *pool = &global_pool;
> ThreadPoolElement *req;
>
> req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
> req->func = func;
> req->arg = arg;
> req->state = THREAD_QUEUED;
> + req->pool = pool;
>
> - QLIST_INSERT_HEAD(&head, req, all);
> + QLIST_INSERT_HEAD(&pool->head, req, all);
>
> - trace_thread_pool_submit(req, arg);
> + trace_thread_pool_submit(pool, req, arg);
>
> - qemu_mutex_lock(&lock);
> - if (idle_threads == 0 && cur_threads < max_threads) {
> - spawn_thread();
> + qemu_mutex_lock(&pool->lock);
> + if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
> + spawn_thread(pool);
> }
> - QTAILQ_INSERT_TAIL(&request_list, req, reqs);
> - qemu_mutex_unlock(&lock);
> - qemu_sem_post(&sem);
> + QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
> + qemu_mutex_unlock(&pool->lock);
> + qemu_sem_post(&pool->sem);
> return &req->common;
> }
>
> @@ -272,18 +290,35 @@ void thread_pool_submit(ThreadPoolFunc *func, void *arg)
> thread_pool_submit_aio(func, arg, NULL, NULL);
> }
>
> +static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
> +{
> + memset(pool, 0, sizeof(*pool));
> + event_notifier_init(&pool->notifier, false);
> + qemu_mutex_init(&pool->lock);
> + qemu_cond_init(&pool->check_cancel);
> + qemu_sem_init(&pool->sem, 0);
> + pool->max_threads = 64;
> + if (ctx) {
> + pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
> + } else {
> + pool->new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, pool);
> + }
> +
> + QLIST_INIT(&pool->head);
> + QTAILQ_INIT(&pool->request_list);
> +
> + if (ctx) {
> + aio_set_event_notifier(ctx, &pool->notifier, event_notifier_ready,
> + thread_pool_active);
> + } else {
> + qemu_aio_set_event_notifier(&pool->notifier, event_notifier_ready,
> + thread_pool_active);
> + }
Usual comment here about if (ctx). Otherwise looks good.
> +}
> +
> static void thread_pool_init(void)
> {
> - QLIST_INIT(&head);
> - event_notifier_init(¬ifier, false);
> - qemu_mutex_init(&lock);
> - qemu_cond_init(&check_cancel);
> - qemu_sem_init(&sem, 0);
> - qemu_aio_set_event_notifier(¬ifier, event_notifier_ready,
> - thread_pool_active);
> -
> - QTAILQ_INIT(&request_list);
> - new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
> + thread_pool_init_one(&global_pool, NULL);
> }
>
> block_init(thread_pool_init)
> diff --git a/trace-events b/trace-events
> index a27ae43..f16c021 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -115,8 +115,8 @@ virtio_blk_data_plane_complete_request(void *s, unsigned
> int head, int ret) "dat
> vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring
> physical %#"PRIx64" desc %p avail %p used %p"
>
> # thread-pool.c
> -thread_pool_submit(void *req, void *opaque) "req %p opaque %p"
> -thread_pool_complete(void *req, void *opaque, int ret) "req %p opaque %p ret
> %d"
> +thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p
> opaque %p"
> +thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p
> req %p opaque %p ret %d"
> thread_pool_cancel(void *req, void *opaque) "req %p opaque %p"
>
> # posix-aio-compat.c
>
Reviewed-by: Paolo Bnzini
- [Qemu-devel] [PATCH 0/5] threadpool: support multiple ThreadPools, Stefan Hajnoczi, 2013/03/06
- [Qemu-devel] [PATCH 3/5] aio: add a ThreadPool instance to AioContext, Stefan Hajnoczi, 2013/03/06
- [Qemu-devel] [PATCH 1/5] threadpool: move globals into struct ThreadPool, Stefan Hajnoczi, 2013/03/06
- [Qemu-devel] [PATCH 4/5] main-loop: add qemu_get_aio_context(), Stefan Hajnoczi, 2013/03/06
- [Qemu-devel] [PATCH 5/5] threadpool: drop global thread pool, Stefan Hajnoczi, 2013/03/06
- [Qemu-devel] [PATCH 2/5] threadpool: add thread_pool_new() and thread_pool_free(), Stefan Hajnoczi, 2013/03/06