[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with fu
From: |
Fam Zheng |
Subject: |
Re: [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux |
Date: |
Thu, 12 Jan 2017 21:34:45 +0800 |
User-agent: |
Mutt/1.7.1 (2016-10-04) |
On Wed, 01/04 14:26, Paolo Bonzini wrote:
> diff --git a/include/qemu/futex.h b/include/qemu/futex.h
> new file mode 100644
> index 0000000..852d612
> --- /dev/null
> +++ b/include/qemu/futex.h
> @@ -0,0 +1,36 @@
> +/*
> + * Wrappers around Linux futex syscall
> + *
> + * Copyright Red Hat, Inc. 2015
2015 - 2017, too?
> + *
> + * Author:
> + * Paolo Bonzini <address@hidden>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + *
> + */
> +
> +#include <sys/syscall.h>
> +#include <linux/futex.h>
> +
> +#define qemu_futex(...) syscall(__NR_futex, __VA_ARGS__)
> +
> +static inline void qemu_futex_wake(void *f, int n)
> +{
> + qemu_futex(f, FUTEX_WAKE, n, NULL, NULL, 0);
> +}
> +
> +static inline void qemu_futex_wait(void *f, unsigned val)
> +{
> + while (qemu_futex(f, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
> + switch (errno) {
> + case EWOULDBLOCK:
> + return;
> + case EINTR:
> + break; /* get out of switch and retry */
> + default:
> + abort();
> + }
> + }
> +}
> diff --git a/util/lockcnt.c b/util/lockcnt.c
> index 78ed1e4..40cc02a 100644
> --- a/util/lockcnt.c
> +++ b/util/lockcnt.c
> @@ -9,7 +9,288 @@
> #include "qemu/osdep.h"
> #include "qemu/thread.h"
> #include "qemu/atomic.h"
> +#include "trace.h"
>
> +#ifdef CONFIG_LINUX
> +#include "qemu/futex.h"
> +
> +/* On Linux, bits 0-1 are a futex-based lock, bits 2-31 are the counter.
> + * For the mutex algorithm see Ulrich Drepper's "Futexes Are Tricky" (ok,
> + * this is not the most relaxing citation I could make...). It is similar
> + * to mutex2 in the paper.
> + */
> +
> +#define QEMU_LOCKCNT_STATE_MASK 3
> +#define QEMU_LOCKCNT_STATE_FREE 0
> +#define QEMU_LOCKCNT_STATE_LOCKED 1
I find the macro names a bit incomplete in describing the semantics but maybe
you want to limit the length, making it harder to understand the mutex
implementation without reading the paper. How about adding a comment saying
"locked" is "locked but _not waited_" and "waiting" is "_locked_ and waited"?
It's up to you, because this is trivial compared to the real complexity of this
patch. :)
> +#define QEMU_LOCKCNT_STATE_WAITING 2
> +
> +#define QEMU_LOCKCNT_COUNT_STEP 4
> +#define QEMU_LOCKCNT_COUNT_SHIFT 2
> +
> +void qemu_lockcnt_init(QemuLockCnt *lockcnt)
> +{
> + lockcnt->count = 0;
> +}
> +
> +void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
> +{
> +}
> +
> +/* *val is the current value of lockcnt->count.
> + *
> + * If the lock is free, try a cmpxchg from *val to new_if_free; return
> + * true and set *val to the old value found by the cmpxchg in
> + * lockcnt->count.
> + *
> + * If the lock is taken, wait for it to be released and return false
> + * *without trying again to take the lock*. Again, set *val to the
> + * new value of lockcnt->count.
> + *
> + * new_if_free's bottom two bits must not be QEMU_LOCKCNT_STATE_LOCKED
> + * if calling this function a second time after it has returned
> + * false.
"and waited"? I think it is possible this function return false with the lock
actually being free, when ...
> + */
> +static bool qemu_lockcnt_cmpxchg_or_wait(QemuLockCnt *lockcnt, int *val,
> + int new_if_free, bool *waited)
> +{
> + /* Fast path for when the lock is free. */
> + if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_FREE) {
> + int expected = *val;
> +
> + trace_lockcnt_fast_path_attempt(lockcnt, expected, new_if_free);
> + *val = atomic_cmpxchg(&lockcnt->count, expected, new_if_free);
> + if (*val == expected) {
> + trace_lockcnt_fast_path_success(lockcnt, expected, new_if_free);
> + *val = new_if_free;
> + return true;
> + }
> + }
> +
> + /* The slow path moves from locked to waiting if necessary, then
> + * does a futex wait. Both steps can be repeated ad nauseam,
> + * only getting out of the loop if we can have another shot at the
> + * fast path. Once we can, get out to compute the new destination
> + * value for the fast path.
> + */
> + while ((*val & QEMU_LOCKCNT_STATE_MASK) != QEMU_LOCKCNT_STATE_FREE) {
> + if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_LOCKED) {
> + int expected = *val;
> + int new = expected - QEMU_LOCKCNT_STATE_LOCKED +
> QEMU_LOCKCNT_STATE_WAITING;
> +
> + trace_lockcnt_futex_wait_prepare(lockcnt, expected, new);
... the holder thread releases the lock at this point. In this case a second
call to this function in qemu_lockcnt_dec_and_lock does pass
QEMU_LOCKCNT_STATE_LOCKED in new_if_free, because 'waited' is left false there.
The code is okay, but the comment above is too strict.
> + *val = atomic_cmpxchg(&lockcnt->count, expected, new);
> + if (*val == expected) {
> + *val = new;
> + }
> + continue;
> + }
> +
> + if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_WAITING) {
> + *waited = true;
> + trace_lockcnt_futex_wait(lockcnt, *val);
> + qemu_futex_wait(&lockcnt->count, *val);
> + *val = atomic_read(&lockcnt->count);
> + trace_lockcnt_futex_wait_resume(lockcnt, *val);
> + continue;
> + }
> +
> + abort();
> + }
> + return false;
> +}
> +
> +void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
> +{
> + int val = atomic_read(&lockcnt->count);
> + bool waited = false;
> +
> + for (;;) {
> + if (val >= QEMU_LOCKCNT_COUNT_STEP) {
> + int expected = val;
> + val = atomic_cmpxchg(&lockcnt->count, val, val +
> QEMU_LOCKCNT_COUNT_STEP);
> + if (val == expected) {
> + break;
> + }
> + } else {
> + /* The fast path is (0, unlocked)->(1, unlocked). */
> + if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val,
> QEMU_LOCKCNT_COUNT_STEP,
> + &waited)) {
> + break;
> + }
> + }
> + }
> +
> + /* If we were woken by another thread, we should also wake one because
> + * we are effectively releasing the lock that was given to us. This is
> + * the case where qemu_lockcnt_lock would leave
> QEMU_LOCKCNT_STATE_WAITING
> + * in the low bits, and qemu_lockcnt_inc_and_unlock would find it and
> + * wake someone.
> + */
> + if (waited) {
> + lockcnt_wake(lockcnt);
> + }
> +}
> +
> +/* Decrement a counter, and return locked if it is decremented to zero.
> + * If the function returns true, it is impossible for the counter to
> + * become nonzero until the next qemu_lockcnt_unlock.
> + */
> +bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
> +{
> + int val = atomic_read(&lockcnt->count);
> + int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
> + bool waited = false;
> +
> + for (;;) {
> + if (val >= 2 * QEMU_LOCKCNT_COUNT_STEP) {
> + int expected = val;
> + int new = val - QEMU_LOCKCNT_COUNT_STEP;
> + val = atomic_cmpxchg(&lockcnt->count, val, new);
> + if (val == expected) {
> + break;
> + }
If (val != expected && val >= 2 * QEMU_LOCKCNT_COUNT_STEP), should this
atomic_cmpxchg be retried before trying qemu_lockcnt_cmpxchg_or_wait?
> + }
> +
> + /* If count is going 1->0, take the lock. The fast path is
> + * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
> + */
> + if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state,
> &waited)) {
> + return true;
> + }
> +
> + if (waited) {
> + /* At this point we do not know if there are more waiters.
> Assume
> + * there are.
> + */
> + locked_state = QEMU_LOCKCNT_STATE_WAITING;
> + }
> + }
> +
> + /* If we were woken by another thread, but we're returning in unlocked
> + * state, we should also wake a thread because we are effectively
> + * releasing the lock that was given to us. This is the case where
> + * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
> + * bits, and qemu_lockcnt_unlock would find it and wake someone.
> + */
> + if (waited) {
> + lockcnt_wake(lockcnt);
> + }
> + return false;
> +}
> +
> +/* If the counter is one, decrement it and return locked. Otherwise do
> + * nothing.
> + *
> + * If the function returns true, it is impossible for the counter to
> + * become nonzero until the next qemu_lockcnt_unlock.
> + */
> +bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
> +{
> + int val = atomic_read(&lockcnt->count);
> + int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
> + bool waited = false;
> +
> + while (val < 2 * QEMU_LOCKCNT_COUNT_STEP) {
> + /* If count is going 1->0, take the lock. The fast path is
> + * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
> + */
> + if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state,
> &waited)) {
> + return true;
> + }
> +
> + if (waited) {
> + /* At this point we do not know if there are more waiters.
> Assume
> + * there are.
> + */
> + locked_state = QEMU_LOCKCNT_STATE_WAITING;
> + }
> + }
> +
> + /* If we were woken by another thread, but we're returning in unlocked
> + * state, we should also wake a thread because we are effectively
> + * releasing the lock that was given to us. This is the case where
> + * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
> + * bits, and qemu_lockcnt_inc_and_unlock would find it and wake someone.
> + */
> + if (waited) {
> + lockcnt_wake(lockcnt);
> + }
> + return false;
> +}
> +
> +void qemu_lockcnt_lock(QemuLockCnt *lockcnt)
> +{
> + int val = atomic_read(&lockcnt->count);
> + int step = QEMU_LOCKCNT_STATE_LOCKED;
> + bool waited = false;
> +
> + /* The third argument is only used if the low bits of val are 0
> + * (QEMU_LOCKCNT_STATE_FREE), so just blindly mix in the desired
> + * state.
> + */
> + while (!qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, val + step,
> &waited)) {
> + if (waited) {
> + /* At this point we do not know if there are more waiters.
> Assume
> + * there are.
> + */
> + step = QEMU_LOCKCNT_STATE_WAITING;
> + }
> + }
> +}
> +
> +void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt)
> +{
> + int expected, new, val;
> +
> + val = atomic_read(&lockcnt->count);
> + do {
> + expected = val;
> + new = (val + QEMU_LOCKCNT_COUNT_STEP) & ~QEMU_LOCKCNT_STATE_MASK;
> + trace_lockcnt_unlock_attempt(lockcnt, val, new);
> + val = atomic_cmpxchg(&lockcnt->count, val, new);
> + } while (val != expected);
> +
> + trace_lockcnt_unlock_success(lockcnt, val, new);
> + if (val & QEMU_LOCKCNT_STATE_WAITING) {
> + lockcnt_wake(lockcnt);
> + }
> +}
Fam
- Re: [Qemu-devel] [PATCH 05/10] aio-posix: split aio_dispatch_handlers out of aio_dispatch, (continued)
- [Qemu-devel] [PATCH 02/10] qemu-thread: introduce QemuLockCnt, Paolo Bonzini, 2017/01/04
- [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux, Paolo Bonzini, 2017/01/04
- [Qemu-devel] [PATCH 06/10] aio: tweak walking in dispatch phase, Paolo Bonzini, 2017/01/04
- [Qemu-devel] [PATCH 07/10] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock, Paolo Bonzini, 2017/01/04
- [Qemu-devel] [PATCH 09/10] aio: document locking, Paolo Bonzini, 2017/01/04
- [Qemu-devel] [PATCH 10/10] async: optimize aio_bh_poll, Paolo Bonzini, 2017/01/04
- [Qemu-devel] [PATCH 08/10] aio-win32: remove walking_handlers, protecting AioHandler list with list_lock, Paolo Bonzini, 2017/01/04
- Re: [Qemu-devel] [PATCH v3 00/10] aio_context_acquire/release pushdown, part 1, Paolo Bonzini, 2017/01/10