qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH] qemu-thread: add a fast path to the Win32 QemuEvent


From: Paolo Bonzini
Subject: [Qemu-devel] [PATCH] qemu-thread: add a fast path to the Win32 QemuEvent
Date: Wed, 12 Aug 2015 15:38:18 +0200

QemuEvents are used heavily by call_rcu.  We do not want them to be slow,
but the current implementation does a kernel call on every invocation
of qemu_event_* and won't cut it.

So, wrap a Win32 manual-reset event with a fast userspace path.  The
states and transitions are the same as for the futex and mutex/condvar
implementations, but the slow path is different of course.  The idea
is to reset the Win32 event lazily, as part of a test-reset-test-wait
sequence.  Such a sequence is, indeed, how QemuEvents are used by
RCU and other subsystems!

The patch includes a formal model of the algorithm.

Signed-off-by: Paolo Bonzini <address@hidden>
---
 docs/win32-qemu-event.promela | 98 +++++++++++++++++++++++++++++++++++++++++++
 include/qemu/thread-win32.h   |  1 +
 util/qemu-thread-win32.c      | 66 +++++++++++++++++++++++++++--
 3 files changed, 161 insertions(+), 4 deletions(-)
 create mode 100644 docs/win32-qemu-event.promela

diff --git a/docs/win32-qemu-event.promela b/docs/win32-qemu-event.promela
new file mode 100644
index 0000000..c446a71
--- /dev/null
+++ b/docs/win32-qemu-event.promela
@@ -0,0 +1,98 @@
+/*
+ * This model describes the implementation of QemuEvent in
+ * util/qemu-thread-win32.c.
+ *
+ * Author: Paolo Bonzini <address@hidden>
+ *
+ * This file is in the public domain.  If you really want a license,
+ * the WTFPL will do.
+ *
+ * To verify it:
+ *     spin -a docs/event.promela
+ *     gcc -O2 pan.c -DSAFETY
+ *     ./a.out
+ */
+
+bool event;
+int value;
+
+/* Primitives for a Win32 event */
+#define RAW_RESET event = false
+#define RAW_SET   event = true
+#define RAW_WAIT  do :: event -> break; od
+
+#if 0
+/* Basic sanity checking: test the Win32 event primitives */
+#define RESET     RAW_RESET
+#define SET       RAW_SET
+#define WAIT      RAW_WAIT
+#else
+/* Full model: layer a userspace-only fast path on top of the RAW_*
+ * primitives.  SET/RESET/WAIT have exactly the same semantics as
+ * RAW_SET/RAW_RESET/RAW_WAIT, but try to avoid invoking them.
+ */
+#define EV_SET 0
+#define EV_FREE 1
+#define EV_BUSY -1
+
+int state = EV_FREE;
+
+int xchg_result;
+#define SET   if :: state != EV_SET ->                                      \
+                    atomic { /* xchg_result=xchg(state, EV_SET) */          \
+                        xchg_result = state;                                \
+                        state = EV_SET;                                     \
+                    }                                                       \
+                    if :: xchg_result == EV_BUSY -> RAW_SET;                \
+                       :: else -> skip;                                     \
+                    fi;                                                     \
+                 :: else -> skip;                                           \
+              fi
+
+#define RESET if :: state == EV_SET -> atomic { state = state | EV_FREE; }  \
+                 :: else            -> skip;                                \
+              fi
+
+int tmp1, tmp2;
+#define WAIT  tmp1 = state;                                                 \
+              if :: tmp1 != EV_SET ->                                       \
+                    if :: tmp1 == EV_FREE ->                                \
+                          RAW_RESET;                                        \
+                          atomic { /* tmp2=cas(state, EV_FREE, EV_BUSY) */  \
+                              tmp2 = state;                                 \
+                              if :: tmp2 == EV_FREE -> state = EV_BUSY;     \
+                                 :: else            -> skip;                \
+                              fi;                                           \
+                          }                                                 \
+                          if :: tmp2 == EV_SET -> tmp1 = EV_SET;            \
+                             :: else           -> tmp1 = EV_BUSY;           \
+                          fi;                                               \
+                       :: else -> skip;                                     \
+                    fi;                                                     \
+                    assert(tmp1 != EV_FREE);                                \
+                    if :: tmp1 == EV_BUSY -> RAW_WAIT;                      \
+                       :: else -> skip;                                     \
+                    fi;                                                     \
+                 :: else -> skip;                                           \
+              fi
+#endif
+
+active proctype waiter()
+{
+     if
+         :: !value ->
+             RESET;
+             if
+                 :: !value -> WAIT;
+                 :: else   -> skip;
+             fi;
+        :: else -> skip;
+    fi;
+    assert(value);
+}
+
+active proctype notifier()
+{
+    value = true;
+    SET;
+}
diff --git a/include/qemu/thread-win32.h b/include/qemu/thread-win32.h
index 3d58081..385ff5f 100644
--- a/include/qemu/thread-win32.h
+++ b/include/qemu/thread-win32.h
@@ -18,6 +18,7 @@ struct QemuSemaphore {
 };
 
 struct QemuEvent {
+    int value;
     HANDLE event;
 };
 
diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c
index 406b52f..6cdd553 100644
--- a/util/qemu-thread-win32.c
+++ b/util/qemu-thread-win32.c
@@ -238,10 +238,34 @@ void qemu_sem_wait(QemuSemaphore *sem)
     }
 }
 
+/* Wrap a Win32 manual-reset event with a fast userspace path.  The idea
+ * is to reset the Win32 event lazily, as part of a test-reset-test-wait
+ * sequence.  Such a sequence is, indeed, how QemuEvents are used by
+ * RCU and other subsystems!
+ *
+ * Valid transitions:
+ * - free->set, when setting the event
+ * - busy->set, when setting the event, followed by futex_wake
+ * - set->free, when resetting the event
+ * - free->busy, when waiting
+ *
+ * set->busy does not happen (it can be observed from the outside but
+ * it really is set->free->busy).
+ *
+ * busy->free provably cannot happen; to enforce it, the set->free transition
+ * is done with an OR, which becomes a no-op if the event has concurrently
+ * transitioned to free or busy (and is faster than cmpxchg).
+ */
+
+#define EV_SET         0
+#define EV_FREE        1
+#define EV_BUSY       -1
+
 void qemu_event_init(QemuEvent *ev, bool init)
 {
     /* Manual reset.  */
-    ev->event = CreateEvent(NULL, TRUE, init, NULL);
+    ev->event = CreateEvent(NULL, TRUE, TRUE, NULL);
+    ev->value = (init ? EV_SET : EV_FREE);
 }
 
 void qemu_event_destroy(QemuEvent *ev)
@@ -251,17 +275,51 @@ void qemu_event_destroy(QemuEvent *ev)
 
 void qemu_event_set(QemuEvent *ev)
 {
-    SetEvent(ev->event);
+    if (atomic_mb_read(&ev->value) != EV_SET) {
+        if (atomic_xchg(&ev->value, EV_SET) == EV_BUSY) {
+            /* There were waiters, wake them up.  */
+            SetEvent(ev->event);
+        }
+    }
 }
 
 void qemu_event_reset(QemuEvent *ev)
 {
-    ResetEvent(ev->event);
+    if (atomic_mb_read(&ev->value) == EV_SET) {
+        /* If there was a concurrent reset (or even reset+wait),
+         * do nothing.  Otherwise change EV_SET->EV_FREE.
+         */
+        atomic_or(&ev->value, EV_FREE);
+    }
 }
 
 void qemu_event_wait(QemuEvent *ev)
 {
-    WaitForSingleObject(ev->event, INFINITE);
+    unsigned value;
+
+    value = atomic_mb_read(&ev->value);
+    if (value != EV_SET) {
+        if (value == EV_FREE) {
+            /* qemu_event_set is not yet going to call SetEvent, but we are
+             * going to do another check for EV_SET below when setting EV_BUSY.
+             * At that point it is safe to call WaitForSingleObject.
+             */
+            ResetEvent(ev->event);
+
+            /* Tell qemu_event_set that there are waiters.  No need to retry
+             * because there cannot be a concurent busy->free transition.
+             * After the CAS, the event will be either set or busy.
+             */
+            if (atomic_cmpxchg(&ev->value, EV_FREE, EV_BUSY) == EV_SET) {
+                value = EV_SET;
+            } else {
+                value = EV_BUSY;
+            }
+        }
+        if (value == EV_BUSY) {
+            WaitForSingleObject(ev->event, INFINITE);
+        }
+    }
 }
 
 struct QemuThreadData {
-- 
2.4.3




reply via email to

[Prev in Thread] Current Thread [Next in Thread]