[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-block] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexe
From: |
Paolo Bonzini |
Subject: |
[Qemu-block] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux |
Date: |
Wed, 21 Dec 2016 15:03:45 +0100 |
This is complex, but I think it is reasonably documented in the source.
Signed-off-by: Paolo Bonzini <address@hidden>
---
docs/lockcnt.txt | 9 +-
include/qemu/futex.h | 36 ++++++
include/qemu/thread.h | 2 +
util/lockcnt.c | 282 +++++++++++++++++++++++++++++++++++++++++++++++
util/qemu-thread-posix.c | 35 +-----
util/qemu-thread-win32.c | 2 +-
util/trace-events | 10 ++
7 files changed, 341 insertions(+), 35 deletions(-)
create mode 100644 include/qemu/futex.h
diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt
index 25a8091..2a79b32 100644
--- a/docs/lockcnt.txt
+++ b/docs/lockcnt.txt
@@ -142,12 +142,11 @@ can also be more efficient in two ways:
- it avoids taking the lock for many operations (for example
incrementing the counter while it is non-zero);
-- on some platforms, one could implement QemuLockCnt to hold the
- lock and the mutex in a single word, making it no more expensive
+- on some platforms, one can implement QemuLockCnt to hold the lock
+ and the mutex in a single word, making the fast path no more expensive
than simply managing a counter using atomic operations (see
- docs/atomics.txt). This is not implemented yet, but can be
- very helpful if concurrent access to the data structure is
- expected to be rare.
+ docs/atomics.txt). This can be very helpful if concurrent access to
+ the data structure is expected to be rare.
Using the same mutex for frees and writes can still incur some small
diff --git a/include/qemu/futex.h b/include/qemu/futex.h
new file mode 100644
index 0000000..852d612
--- /dev/null
+++ b/include/qemu/futex.h
@@ -0,0 +1,36 @@
+/*
+ * Wrappers around Linux futex syscall
+ *
+ * Copyright Red Hat, Inc. 2015
+ *
+ * Author:
+ * Paolo Bonzini <address@hidden>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include <sys/syscall.h>
+#include <linux/futex.h>
+
+#define qemu_futex(...) syscall(__NR_futex, __VA_ARGS__)
+
+static inline void qemu_futex_wake(void *f, int n)
+{
+ qemu_futex(f, FUTEX_WAKE, n, NULL, NULL, 0);
+}
+
+static inline void qemu_futex_wait(void *f, unsigned val)
+{
+ while (qemu_futex(f, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
+ switch (errno) {
+ case EWOULDBLOCK:
+ return;
+ case EINTR:
+ break; /* get out of switch and retry */
+ default:
+ abort();
+ }
+ }
+}
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index 7944f79..93337c4 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -100,7 +100,9 @@ static inline void qemu_spin_unlock(QemuSpin *spin)
}
struct QemuLockCnt {
+#ifndef CONFIG_LINUX
QemuMutex mutex;
+#endif
unsigned count;
};
diff --git a/util/lockcnt.c b/util/lockcnt.c
index 78ed1e4..40cc02a 100644
--- a/util/lockcnt.c
+++ b/util/lockcnt.c
@@ -9,7 +9,288 @@
#include "qemu/osdep.h"
#include "qemu/thread.h"
#include "qemu/atomic.h"
+#include "trace.h"
+#ifdef CONFIG_LINUX
+#include "qemu/futex.h"
+
+/* On Linux, bits 0-1 are a futex-based lock, bits 2-31 are the counter.
+ * For the mutex algorithm see Ulrich Drepper's "Futexes Are Tricky" (ok,
+ * this is not the most relaxing citation I could make...). It is similar
+ * to mutex2 in the paper.
+ */
+
+#define QEMU_LOCKCNT_STATE_MASK 3
+#define QEMU_LOCKCNT_STATE_FREE 0
+#define QEMU_LOCKCNT_STATE_LOCKED 1
+#define QEMU_LOCKCNT_STATE_WAITING 2
+
+#define QEMU_LOCKCNT_COUNT_STEP 4
+#define QEMU_LOCKCNT_COUNT_SHIFT 2
+
+void qemu_lockcnt_init(QemuLockCnt *lockcnt)
+{
+ lockcnt->count = 0;
+}
+
+void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
+{
+}
+
+/* *val is the current value of lockcnt->count.
+ *
+ * If the lock is free, try a cmpxchg from *val to new_if_free; return
+ * true and set *val to the old value found by the cmpxchg in
+ * lockcnt->count.
+ *
+ * If the lock is taken, wait for it to be released and return false
+ * *without trying again to take the lock*. Again, set *val to the
+ * new value of lockcnt->count.
+ *
+ * new_if_free's bottom two bits must not be QEMU_LOCKCNT_STATE_LOCKED
+ * if calling this function a second time after it has returned
+ * false.
+ */
+static bool qemu_lockcnt_cmpxchg_or_wait(QemuLockCnt *lockcnt, int *val,
+ int new_if_free, bool *waited)
+{
+ /* Fast path for when the lock is free. */
+ if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_FREE) {
+ int expected = *val;
+
+ trace_lockcnt_fast_path_attempt(lockcnt, expected, new_if_free);
+ *val = atomic_cmpxchg(&lockcnt->count, expected, new_if_free);
+ if (*val == expected) {
+ trace_lockcnt_fast_path_success(lockcnt, expected, new_if_free);
+ *val = new_if_free;
+ return true;
+ }
+ }
+
+ /* The slow path moves from locked to waiting if necessary, then
+ * does a futex wait. Both steps can be repeated ad nauseam,
+ * only getting out of the loop if we can have another shot at the
+ * fast path. Once we can, get out to compute the new destination
+ * value for the fast path.
+ */
+ while ((*val & QEMU_LOCKCNT_STATE_MASK) != QEMU_LOCKCNT_STATE_FREE) {
+ if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_LOCKED) {
+ int expected = *val;
+ int new = expected - QEMU_LOCKCNT_STATE_LOCKED +
QEMU_LOCKCNT_STATE_WAITING;
+
+ trace_lockcnt_futex_wait_prepare(lockcnt, expected, new);
+ *val = atomic_cmpxchg(&lockcnt->count, expected, new);
+ if (*val == expected) {
+ *val = new;
+ }
+ continue;
+ }
+
+ if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_WAITING) {
+ *waited = true;
+ trace_lockcnt_futex_wait(lockcnt, *val);
+ qemu_futex_wait(&lockcnt->count, *val);
+ *val = atomic_read(&lockcnt->count);
+ trace_lockcnt_futex_wait_resume(lockcnt, *val);
+ continue;
+ }
+
+ abort();
+ }
+ return false;
+}
+
+static void lockcnt_wake(QemuLockCnt *lockcnt)
+{
+ trace_lockcnt_futex_wake(lockcnt);
+ qemu_futex_wake(&lockcnt->count, 1);
+}
+
+void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
+{
+ int val = atomic_read(&lockcnt->count);
+ bool waited = false;
+
+ for (;;) {
+ if (val >= QEMU_LOCKCNT_COUNT_STEP) {
+ int expected = val;
+ val = atomic_cmpxchg(&lockcnt->count, val, val +
QEMU_LOCKCNT_COUNT_STEP);
+ if (val == expected) {
+ break;
+ }
+ } else {
+ /* The fast path is (0, unlocked)->(1, unlocked). */
+ if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val,
QEMU_LOCKCNT_COUNT_STEP,
+ &waited)) {
+ break;
+ }
+ }
+ }
+
+ /* If we were woken by another thread, we should also wake one because
+ * we are effectively releasing the lock that was given to us. This is
+ * the case where qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING
+ * in the low bits, and qemu_lockcnt_inc_and_unlock would find it and
+ * wake someone.
+ */
+ if (waited) {
+ lockcnt_wake(lockcnt);
+ }
+}
+
+void qemu_lockcnt_dec(QemuLockCnt *lockcnt)
+{
+ atomic_sub(&lockcnt->count, QEMU_LOCKCNT_COUNT_STEP);
+}
+
+/* Decrement a counter, and return locked if it is decremented to zero.
+ * If the function returns true, it is impossible for the counter to
+ * become nonzero until the next qemu_lockcnt_unlock.
+ */
+bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
+{
+ int val = atomic_read(&lockcnt->count);
+ int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
+ bool waited = false;
+
+ for (;;) {
+ if (val >= 2 * QEMU_LOCKCNT_COUNT_STEP) {
+ int expected = val;
+ int new = val - QEMU_LOCKCNT_COUNT_STEP;
+ val = atomic_cmpxchg(&lockcnt->count, val, new);
+ if (val == expected) {
+ break;
+ }
+ }
+
+ /* If count is going 1->0, take the lock. The fast path is
+ * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
+ */
+ if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state,
&waited)) {
+ return true;
+ }
+
+ if (waited) {
+ /* At this point we do not know if there are more waiters. Assume
+ * there are.
+ */
+ locked_state = QEMU_LOCKCNT_STATE_WAITING;
+ }
+ }
+
+ /* If we were woken by another thread, but we're returning in unlocked
+ * state, we should also wake a thread because we are effectively
+ * releasing the lock that was given to us. This is the case where
+ * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
+ * bits, and qemu_lockcnt_unlock would find it and wake someone.
+ */
+ if (waited) {
+ lockcnt_wake(lockcnt);
+ }
+ return false;
+}
+
+/* If the counter is one, decrement it and return locked. Otherwise do
+ * nothing.
+ *
+ * If the function returns true, it is impossible for the counter to
+ * become nonzero until the next qemu_lockcnt_unlock.
+ */
+bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
+{
+ int val = atomic_read(&lockcnt->count);
+ int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
+ bool waited = false;
+
+ while (val < 2 * QEMU_LOCKCNT_COUNT_STEP) {
+ /* If count is going 1->0, take the lock. The fast path is
+ * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
+ */
+ if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state,
&waited)) {
+ return true;
+ }
+
+ if (waited) {
+ /* At this point we do not know if there are more waiters. Assume
+ * there are.
+ */
+ locked_state = QEMU_LOCKCNT_STATE_WAITING;
+ }
+ }
+
+ /* If we were woken by another thread, but we're returning in unlocked
+ * state, we should also wake a thread because we are effectively
+ * releasing the lock that was given to us. This is the case where
+ * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
+ * bits, and qemu_lockcnt_inc_and_unlock would find it and wake someone.
+ */
+ if (waited) {
+ lockcnt_wake(lockcnt);
+ }
+ return false;
+}
+
+void qemu_lockcnt_lock(QemuLockCnt *lockcnt)
+{
+ int val = atomic_read(&lockcnt->count);
+ int step = QEMU_LOCKCNT_STATE_LOCKED;
+ bool waited = false;
+
+ /* The third argument is only used if the low bits of val are 0
+ * (QEMU_LOCKCNT_STATE_FREE), so just blindly mix in the desired
+ * state.
+ */
+ while (!qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, val + step, &waited)) {
+ if (waited) {
+ /* At this point we do not know if there are more waiters. Assume
+ * there are.
+ */
+ step = QEMU_LOCKCNT_STATE_WAITING;
+ }
+ }
+}
+
+void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt)
+{
+ int expected, new, val;
+
+ val = atomic_read(&lockcnt->count);
+ do {
+ expected = val;
+ new = (val + QEMU_LOCKCNT_COUNT_STEP) & ~QEMU_LOCKCNT_STATE_MASK;
+ trace_lockcnt_unlock_attempt(lockcnt, val, new);
+ val = atomic_cmpxchg(&lockcnt->count, val, new);
+ } while (val != expected);
+
+ trace_lockcnt_unlock_success(lockcnt, val, new);
+ if (val & QEMU_LOCKCNT_STATE_WAITING) {
+ lockcnt_wake(lockcnt);
+ }
+}
+
+void qemu_lockcnt_unlock(QemuLockCnt *lockcnt)
+{
+ int expected, new, val;
+
+ val = atomic_read(&lockcnt->count);
+ do {
+ expected = val;
+ new = val & ~QEMU_LOCKCNT_STATE_MASK;
+ trace_lockcnt_unlock_attempt(lockcnt, val, new);
+ val = atomic_cmpxchg(&lockcnt->count, val, new);
+ } while (val != expected);
+
+ trace_lockcnt_unlock_success(lockcnt, val, new);
+ if (val & QEMU_LOCKCNT_STATE_WAITING) {
+ lockcnt_wake(lockcnt);
+ }
+}
+
+unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
+{
+ return lockcnt->count >> QEMU_LOCKCNT_COUNT_SHIFT;
+}
+#else
void qemu_lockcnt_init(QemuLockCnt *lockcnt)
{
qemu_mutex_init(&lockcnt->mutex);
@@ -111,3 +392,4 @@ unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
{
return lockcnt->count;
}
+#endif
diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c
index d20cdde..37cd8ba 100644
--- a/util/qemu-thread-posix.c
+++ b/util/qemu-thread-posix.c
@@ -11,10 +11,6 @@
*
*/
#include "qemu/osdep.h"
-#ifdef __linux__
-#include <sys/syscall.h>
-#include <linux/futex.h>
-#endif
#include "qemu/thread.h"
#include "qemu/atomic.h"
#include "qemu/notify.h"
@@ -294,28 +290,9 @@ void qemu_sem_wait(QemuSemaphore *sem)
}
#ifdef __linux__
-#define futex(...) syscall(__NR_futex, __VA_ARGS__)
-
-static inline void futex_wake(QemuEvent *ev, int n)
-{
- futex(ev, FUTEX_WAKE, n, NULL, NULL, 0);
-}
-
-static inline void futex_wait(QemuEvent *ev, unsigned val)
-{
- while (futex(ev, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
- switch (errno) {
- case EWOULDBLOCK:
- return;
- case EINTR:
- break; /* get out of switch and retry */
- default:
- abort();
- }
- }
-}
+#include "qemu/futex.h"
#else
-static inline void futex_wake(QemuEvent *ev, int n)
+static inline void qemu_futex_wake(QemuEvent *ev, int n)
{
pthread_mutex_lock(&ev->lock);
if (n == 1) {
@@ -326,7 +303,7 @@ static inline void futex_wake(QemuEvent *ev, int n)
pthread_mutex_unlock(&ev->lock);
}
-static inline void futex_wait(QemuEvent *ev, unsigned val)
+static inline void qemu_futex_wait(QemuEvent *ev, unsigned val)
{
pthread_mutex_lock(&ev->lock);
if (ev->value == val) {
@@ -338,7 +315,7 @@ static inline void futex_wait(QemuEvent *ev, unsigned val)
/* Valid transitions:
* - free->set, when setting the event
- * - busy->set, when setting the event, followed by futex_wake
+ * - busy->set, when setting the event, followed by qemu_futex_wake
* - set->free, when resetting the event
* - free->busy, when waiting
*
@@ -381,7 +358,7 @@ void qemu_event_set(QemuEvent *ev)
if (atomic_read(&ev->value) != EV_SET) {
if (atomic_xchg(&ev->value, EV_SET) == EV_BUSY) {
/* There were waiters, wake them up. */
- futex_wake(ev, INT_MAX);
+ qemu_futex_wake(ev, INT_MAX);
}
}
}
@@ -419,7 +396,7 @@ void qemu_event_wait(QemuEvent *ev)
return;
}
}
- futex_wait(ev, EV_BUSY);
+ qemu_futex_wait(ev, EV_BUSY);
}
}
diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c
index 728e76b..178e016 100644
--- a/util/qemu-thread-win32.c
+++ b/util/qemu-thread-win32.c
@@ -269,7 +269,7 @@ void qemu_sem_wait(QemuSemaphore *sem)
*
* Valid transitions:
* - free->set, when setting the event
- * - busy->set, when setting the event, followed by futex_wake
+ * - busy->set, when setting the event, followed by SetEvent
* - set->free, when resetting the event
* - free->busy, when waiting
*
diff --git a/util/trace-events b/util/trace-events
index ed06aee..2b8aa30 100644
--- a/util/trace-events
+++ b/util/trace-events
@@ -30,3 +30,13 @@ qemu_anon_ram_free(void *ptr, size_t size) "ptr %p size %zu"
hbitmap_iter_skip_words(const void *hb, void *hbi, uint64_t pos, unsigned long
cur) "hb %p hbi %p pos %"PRId64" cur 0x%lx"
hbitmap_reset(void *hb, uint64_t start, uint64_t count, uint64_t sbit,
uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
hbitmap_set(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t
ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
+
+# util/lockcnt.c
+lockcnt_fast_path_attempt(const void *lockcnt, int expected, int new) "lockcnt
%p fast path %d->%d"
+lockcnt_fast_path_success(const void *lockcnt, int expected, int new) "lockcnt
%p fast path %d->%d succeeded"
+lockcnt_unlock_attempt(const void *lockcnt, int expected, int new) "lockcnt %p
unlock %d->%d"
+lockcnt_unlock_success(const void *lockcnt, int expected, int new) "lockcnt %p
unlock %d->%d succeeded"
+lockcnt_futex_wait_prepare(const void *lockcnt, int expected, int new)
"lockcnt %p preparing slow path %d->%d"
+lockcnt_futex_wait(const void *lockcnt, int val) "lockcnt %p waiting on %d"
+lockcnt_futex_wait_resume(const void *lockcnt, int new) "lockcnt %p after
wait: %d"
+lockcnt_futex_wake(const void *lockcnt) "lockcnt %p waking up one waiter"
--
2.9.3
- [Qemu-block] [PATCH v2 00/10] aio_context_acquire/release pushdown, part 1, Paolo Bonzini, 2016/12/21
- [Qemu-block] [PATCH 01/10] aio: rename bh_lock to list_lock, Paolo Bonzini, 2016/12/21
- [Qemu-block] [PATCH 03/10] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh, Paolo Bonzini, 2016/12/21
- [Qemu-block] [PATCH 02/10] qemu-thread: introduce QemuLockCnt, Paolo Bonzini, 2016/12/21
- [Qemu-block] [PATCH 05/10] aio-posix: split aio_dispatch_handlers out of aio_dispatch, Paolo Bonzini, 2016/12/21
- [Qemu-block] [PATCH 07/10] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock, Paolo Bonzini, 2016/12/21
- [Qemu-block] [PATCH 09/10] aio: document locking, Paolo Bonzini, 2016/12/21
- [Qemu-block] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux,
Paolo Bonzini <=
- [Qemu-block] [PATCH 06/10] aio: tweak walking in dispatch phase, Paolo Bonzini, 2016/12/21
- [Qemu-block] [PATCH 10/10] async: optimize aio_bh_poll, Paolo Bonzini, 2016/12/21
- [Qemu-block] [PATCH 08/10] aio-win32: remove walking_handlers, protecting AioHandler list with list_lock, Paolo Bonzini, 2016/12/21