qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [RFC PATCH 02/14] rcu: add rcu library


From: Mike Day
Subject: [Qemu-devel] [RFC PATCH 02/14] rcu: add rcu library
Date: Wed, 14 Aug 2013 11:50:38 -0400

From: Paolo Bonzini <address@hidden>

This includes a (mangled) copy of the urcu-qsbr code from liburcu.
The main changes are: 1) removing dependencies on many other header files
in liburcu; 2) removing for simplicity the tentative busy waiting in
synchronize_rcu, which has limited performance effects; 3) replacing
futexes in synchronize_rcu with QemuEvents for Win32 portability.
The API is the same as liburcu, so it should be possible in the future
to require liburcu on POSIX systems for example and use our copy only
on Windows.

Among the various versions available I chose urcu-qsbr, which has the
fastest rcu_read_{lock,unlock} but requires the program to manually
annotate quiescent points or intervals.  QEMU threads usually have easily
identified quiescent periods, so this should not be a problem.

Signed-off-by: Paolo Bonzini <address@hidden>
Reviewed-by: Mike Day <address@hidden>
---
 docs/rcu.txt               | 301 +++++++++++++++++++++++++++++++++++++++++++++
 hw/9pfs/virtio-9p-synth.c  |   1 +
 include/qemu/queue.h       |  13 ++
 include/qemu/rcu-pointer.h | 110 +++++++++++++++++
 include/qemu/rcu.h         | 168 +++++++++++++++++++++++++
 include/qemu/thread.h      |   3 -
 libcacard/Makefile         |   3 +-
 util/Makefile.objs         |   1 +
 util/rcu.c                 | 203 ++++++++++++++++++++++++++++++
 9 files changed, 799 insertions(+), 4 deletions(-)
 create mode 100644 docs/rcu.txt
 create mode 100644 include/qemu/rcu-pointer.h
 create mode 100644 include/qemu/rcu.h
 create mode 100644 util/rcu.c

diff --git a/docs/rcu.txt b/docs/rcu.txt
new file mode 100644
index 0000000..19e4840
--- /dev/null
+++ b/docs/rcu.txt
@@ -0,0 +1,301 @@
+Using RCU (Read-Copy-Update) for synchronization
+================================================
+
+Read-copy update (RCU) is a synchronization mechanism that is used to
+protect read-mostly data structures.  RCU is very efficient and scalable
+on the read side (it is wait-free), and thus can make the read paths
+extremely fast.
+
+RCU supports concurrency between a single writer and multiple readers,
+thus it is not used alone.  Typically, the write-side will use a lock to
+serialize multiple updates, but other approaches are possible (e.g.,
+restricting updates to a single task).  In QEMU, when a lock is used,
+this will often be the "iothread mutex", also known as the "big QEMU
+lock" (BQL).  Also, restricting updates to a single task is done in
+QEMU using the "bottom half" API.
+
+RCU is fundamentally a "wait-to-finish" mechanism.  The read side marks
+sections of code with "critical sections", and the update side will wait
+for the execution of all *currently running* critical sections before
+proceeding, or before asynchronously executing a callback.
+
+The key point here is that only the currently running critical sections
+are waited for; critical sections that are started _after_ the beginning
+of the wait do not extend the wait, despite running concurrently with
+the updater.  This is the reason why RCU is more scalable than,
+for example, reader-writer locks.  It is so much more scalable that
+the system will have a single instance of the RCU mechanism; a single
+mechanism can be used for an arbitrary number of "things", without
+having to worry about things such as contention or deadlocks.
+
+How is this possible?  The basic idea is to split updates in two phases,
+"removal" and "reclamation".  During removal, we ensure that subsequent
+readers will not be able to get a reference to the old data.  After
+removal has completed, a critical section will not be able to access
+the old data.  Therefore, critical sections that begin after removal
+do not matter; as soon as all previous critical sections have finished,
+there cannot be any readers who hold references to the data structure,
+which may not be safely reclaimed (e.g., freed or unref'ed).
+
+Here is a picutre:
+
+        thread 1                  thread 2                  thread 3
+    -------------------    ------------------------    -------------------
+    enter RCU crit.sec.
+           |                finish removal phase
+           |                begin wait
+           |                      |                    enter RCU crit.sec.
+    exit RCU crit.sec             |                           |
+                            complete wait                     |
+                            begin reclamation phase           |
+                                                       exit RCU crit.sec.
+
+
+Note how thread 3 is still executing its critical section when thread 2
+starts reclaiming data.  This is possible, because the old version of the
+data structure was not accessible at the time thread 3 began executing
+that critical section.
+
+
+RCU API
+=======
+
+The core RCU API is small:
+
+     void rcu_read_lock(void);
+
+        Used by a reader to inform the reclaimer that the reader is
+        entering an RCU read-side critical section.
+
+     void rcu_read_unlock(void);
+
+        Used by a reader to inform the reclaimer that the reader is
+        exiting an RCU read-side critical section.  Note that RCU
+        read-side critical sections may be nested and/or overlapping.
+
+     void synchronize_rcu(void);
+
+        Blocks until all pre-existing RCU read-side critical sections
+        on all threads have completed.  This marks the end of the removal
+        phase and the beginning of reclamation phase.
+
+        Note that it would be valid for another update to come while
+        synchronize_rcu is running.  Because of this, it is better that
+        the updater releases any locks it may hold before calling
+        synchronize_rcu.
+
+     typeof(*p) rcu_dereference(p);
+     typeof(p) rcu_assign_pointer(p, typeof(p) v);
+
+        These macros are similar to atomic_mb_read() and atomic_mb_set()
+        respectively.  However, they make some assumptions on the code
+        that calls them, which allows a more optimized implementation.
+
+        rcu_assign_pointer assumes that the update side is not going
+        to read from the data structure after "publishing" the new
+        values; that is, it assumes that all assignments happen at
+        the very end of the removal phase.
+
+        rcu_dereference assumes that whenever a single RCU critical
+        section reads multiple shared data, these reads are either
+        data-dependent or need no ordering.  This is almost always the
+        case when using RCU.  If this were not the case, you can use
+        atomic_mb_read() or smp_rmb().
+
+        If you are going to be fetching multiple fields from the
+        RCU-protected structure, repeated rcu_dereference() calls
+        would look ugly and incur unnecessary overhead on Alpha CPUs.
+        You can then do this:
+
+        p = &rcu_dereference(head);
+        foo = head->foo;
+        bar = head->bar;
+
+
+RCU QUIESCENT STATES
+====================
+
+An efficient implementation of rcu_read_lock() and rcu_read_unlock()
+relies on the availability of fast thread-local storage.  Unfortunately,
+this is not possible on all the systems supported by QEMU (in particular
+on many POSIX systems other than Linux and Solaris).
+
+For this reason, QEMU's RCU implementation resorts to manual annotation
+of "quiescent states", i.e. points where no RCU read-side critical
+section can be active.  All threads that participate in the RCU mechanism
+need to annotate such points.
+
+Marking quiescent states is done with the following three APIs:
+
+     void rcu_quiescent_state(void);
+
+        Marks a point in the execution of the current thread where no
+        RCU read-side critical section can be active.
+
+     void rcu_thread_offline(void);
+
+        Marks the beginning of an "extended quiescent state" for the
+        current thread, i.e. an interval of time during which no
+        RCU read-side critical section can be active.
+
+     void rcu_thread_online(void);
+
+        Marks the end of an extended quiescent state for the current
+        thread.
+
+
+Furthermore, threads that participate in the RCU mechanism must communicate
+this fact using the following APIs:
+
+     void rcu_register_thread(void);
+
+        Mark a thread as taking part in the RCU mechanism.  Such a thread
+        will have to report quiescent points regularly, either manually
+        or through the QemuCond/QemuSemaphore/QemuEvent APIs.
+
+     void rcu_unregister_thread(void);
+
+        Mark a thread as not taking part anymore in the RCU mechanism.
+        It is not a problem if such a thread reports quiescent points,
+        either manually or by using the QemuCond/QemuSemaphore/QemuEvent
+        APIs.
+
+Note that these APIs are relatively heavyweight, and should _not_ be
+nested.
+
+
+DIFFERENCES WITH LINUX
+======================
+
+- Sleeping is possible, though discouraged, within an RCU critical section.
+
+- rcu_dereference takes a _pointer_ to the variable being accessed.
+  Wrong usage will be detected by the compiler.
+
+- Quiescent points must be marked explicitly unless the thread uses
+  condvars/semaphores/events for synchronization.
+
+
+RCU PATTERNS
+============
+
+Many patterns using read-writer locks translate directly to RCU, with
+the advantages of higher scalability and deadlock immunity.
+
+In general, RCU can be used whenever it is possible to create a new
+"version" of a data structure every time the updater runs.  This may
+sound like a very strict restriction, however:
+
+- the updater does not mean "everything that writes to a data structure",
+  but rather "everything that involves a reclamation step".  See the
+  array example below
+
+- in some cases, creating a new version of a data structure may actually
+  be very cheap.  For example, modifying the "next" pointer of a singly
+  linked list is effectively creating a new version of the list.
+
+
+them however are worth noting.
+
+RCU list processing
+-------------------
+
+TBD (not yet used in QEMU)
+
+
+RCU reference counting
+----------------------
+
+Because grace periods are not allowed to complete while there is an RCU
+read-side critical section in progress, the RCU read-side primitives
+may be used as a restricted reference-counting mechanism.  For example,
+consider the following code fragment:
+
+    rcu_read_lock();
+    p = rcu_dereference(&foo);
+    /* do something with p. */
+    rcu_read_unlock();
+
+The RCU read-side critical section ensures that the value of "p" remains
+valid until after the rcu_read_unlock().  In some sense, it is acquiring
+a reference to p that is later released when the critical section ends.
+The write side looks simply like this (with appropriate locking):
+
+    qemu_mutex_lock(&foo_mutex);
+    old = foo;
+    rcu_assign_pointer(foo, new);
+    qemu_mutex_unlock(&foo_mutex);
+    synchronize_rcu();
+    free(old);
+
+Note that the same idiom would be possible with reader/writer
+locks:
+
+    read_lock(&foo_rwlock);         write_mutex_lock(&foo_rwlock);
+    p = foo;                        p = foo;
+    /* do something with p. */      foo = new;
+    read_unlock(&foo_rwlock);       free(p);
+                                    write_mutex_unlock(&foo_rwlock);
+                                    free(p);
+
+
+RCU resizable arrays
+--------------------
+
+Resizable arrays can be used with RCU.  The expensive RCU synchronization
+only needs to take place when the array is resized.  The two items to
+take care of are:
+
+- ensuring that the old version of the array is available between removal
+  and reclamation;
+
+- avoiding mismatches in the read side between the array data and the
+  array size.
+
+The first problem is avoided simply by not using realloc.  Instead,
+each resize will allocate a new array and copy the old data into it.
+The second problem would arise if the size and the data pointers were
+two members of a larger struct:
+
+    struct mystuff {
+        ...
+        int data_size;
+        int data_alloc;
+        T   *data;
+        ...
+    };
+
+Instead, we store the size of the array with the array itself:
+
+    struct arr {
+        int size;
+        int alloc;
+        T   data[];
+    };
+    struct arr *global_array;
+
+    read side:
+        rcu_read_lock();
+        struct arr *array = rcu_dereference(&global_array);
+        x = i < array->size ? array->data[i] : -1;
+        rcu_read_unlock();
+        return x;
+
+    write side (running under a lock):
+        if (global_array->size == global_array->alloc) {
+            /* Creating a new version.  */
+            new_array = g_malloc(sizeof(struct arr) +
+                                 global_array->alloc * 2 * sizeof(T));
+            new_array->size = global_array->size;
+            new_array->alloc = global_array->alloc * 2;
+            memcpy(new_array->data, global_array->data,
+                   global_array->alloc * sizeof(T));
+
+            /* Removal phase.  */
+            old_array = global_array;
+            rcu_assign_pointer(new_array->data, new_array);
+            synchronize_rcu();
+
+            /* Reclamation phase.  */
+            free(old_array);
+        }
diff --git a/hw/9pfs/virtio-9p-synth.c b/hw/9pfs/virtio-9p-synth.c
index 840e4eb..d5f5842 100644
--- a/hw/9pfs/virtio-9p-synth.c
+++ b/hw/9pfs/virtio-9p-synth.c
@@ -17,6 +17,7 @@
 #include "virtio-9p-xattr.h"
 #include "fsdev/qemu-fsdev.h"
 #include "virtio-9p-synth.h"
+#include "util/rcu.h"
 
 #include <sys/stat.h>
 
diff --git a/include/qemu/queue.h b/include/qemu/queue.h
index d433b90..847ddd1 100644
--- a/include/qemu/queue.h
+++ b/include/qemu/queue.h
@@ -104,6 +104,19 @@ struct {                                                   
             \
         (head)->lh_first = NULL;                                        \
 } while (/*CONSTCOND*/0)
 
+#define QLIST_SWAP(dstlist, srclist, field) do {                        \
+        void *tmplist;                                                  \
+        tmplist = (srclist)->lh_first;                                  \
+        (srclist)->lh_first = (dstlist)->lh_first;                      \
+        if ((srclist)->lh_first != NULL) {                              \
+            (srclist)->lh_first->field.le_prev = &(srclist)->lh_first;  \
+        }                                                               \
+        (dstlist)->lh_first = tmplist;                                  \
+        if ((dstlist)->lh_first != NULL) {                              \
+            (dstlist)->lh_first->field.le_prev = &(dstlist)->lh_first;  \
+        }                                                               \
+} while (/*CONSTCOND*/0)
+
 #define QLIST_INSERT_AFTER(listelm, elm, field) do {                    \
         if (((elm)->field.le_next = (listelm)->field.le_next) != NULL)  \
                 (listelm)->field.le_next->field.le_prev =               \
diff --git a/include/qemu/rcu-pointer.h b/include/qemu/rcu-pointer.h
new file mode 100644
index 0000000..0e6417c
--- /dev/null
+++ b/include/qemu/rcu-pointer.h
@@ -0,0 +1,110 @@
+#ifndef _URCU_POINTER_STATIC_H
+#define _URCU_POINTER_STATIC_H
+
+/*
+ * urcu-pointer-static.h
+ *
+ * Userspace RCU header. Operations on pointers.
+ *
+ * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See urcu-pointer.h for
+ * linking dynamically with the userspace rcu library.
+ *
+ * Copyright (c) 2009 Mathieu Desnoyers <address@hidden>
+ * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * IBM's contributions to this file may be relicensed under LGPLv2 or later.
+ */
+
+#include "compiler.h"
+#include "qemu/atomic.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * rcu_dereference - reads (copy) a RCU-protected pointer to a local variable
+ * into a RCU read-side critical section. The pointer can later be safely
+ * dereferenced within the critical section.
+ *
+ * This ensures that the pointer copy is invariant thorough the whole critical
+ * section.
+ *
+ * Inserts memory barriers on architectures that require them (currently only
+ * Alpha) and documents which pointers are protected by RCU.
+ *
+ * The compiler memory barrier in atomic_read() ensures that value-speculative
+ * optimizations (e.g. VSS: Value Speculation Scheduling) does not perform the
+ * data read before the pointer read by speculating the value of the pointer.
+ * Correct ordering is ensured because the pointer is read as a volatile 
access.
+ * This acts as a global side-effect operation, which forbids reordering of
+ * dependent memory operations. Note that such concern about 
dependency-breaking
+ * optimizations will eventually be taken care of by the "memory_order_consume"
+ * addition to forthcoming C++ standard.
+ *
+ * Should match rcu_assign_pointer() or rcu_xchg_pointer().
+ */
+
+#define rcu_dereference(p)                      \
+        ({                                      \
+            typeof(p) _p1 = (p);                \
+            smp_read_barrier_depends();         \
+            *(_p1);                             \
+        })
+
+/**
+ * rcu_cmpxchg_pointer - same as rcu_set_pointer, but tests if the pointer
+ * is as expected by "old". If succeeds, returns the previous pointer to the
+ * data structure, which can be safely freed after waiting for a quiescent 
state
+ * using synchronize_rcu(). If fails (unexpected value), returns old (which
+ * should not be freed !).
+ */
+
+#define rcu_cmpxchg_pointer(p, old, _new)       \
+        ({                                      \
+            typeof(*p) _pold = (old);           \
+            typeof(*p) _pnew = (_new);          \
+            atomic_cmpxchg(p, _pold, _pnew);    \
+        })
+
+#define rcu_set_pointer(p, v)                   \
+        ({                                      \
+             typeof(*p) _pv = (v);              \
+             smp_wmb();                         \
+             atomic_set(p, _pv);                \
+        })
+
+/**
+ * rcu_assign_pointer - assign (publicize) a pointer to a new data structure
+ * meant to be read by RCU read-side critical sections. Returns the assigned
+ * value.
+ *
+ * Documents which pointers will be dereferenced by RCU read-side critical
+ * sections and adds the required memory barriers on architectures requiring
+ * them. It also makes sure the compiler does not reorder code initializing the
+ * data structure before its publication.
+ *
+ * Should match rcu_dereference().
+ */
+
+#define rcu_assign_pointer(p, v)    rcu_set_pointer(&(p), v)
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _URCU_POINTER_STATIC_H */
diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
new file mode 100644
index 0000000..b875593
--- /dev/null
+++ b/include/qemu/rcu.h
@@ -0,0 +1,168 @@
+#ifndef _URCU_QSBR_H
+#define _URCU_QSBR_H
+
+/*
+ * urcu-qsbr.h
+ *
+ * Userspace RCU QSBR header.
+ *
+ * LGPL-compatible code should include this header with :
+ *
+ * #define _LGPL_SOURCE
+ * #include <urcu.h>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * IBM's contributions to this file may be relicensed under LGPLv2 or later.
+ */
+
+#include <stdlib.h>
+#include <assert.h>
+#include <limits.h>
+#include <unistd.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <glib.h>
+
+#include "qemu/compiler.h"
+#include "qemu/rcu-pointer.h"
+#include "qemu/thread.h"
+#include "qemu/queue.h"
+#include "qemu/atomic.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/*
+ * Important !
+ *
+ * Each thread containing read-side critical sections must be registered
+ * with rcu_register_thread() before calling rcu_read_lock().
+ * rcu_unregister_thread() should be called before the thread exits.
+ */
+
+#ifdef DEBUG_RCU
+#define rcu_assert(args...)    assert(args)
+#else
+#define rcu_assert(args...)
+#endif
+
+#define RCU_GP_ONLINE     (1UL << 0)
+#define RCU_GP_CTR        (1UL << 1)
+
+/*
+ * Global quiescent period counter with low-order bits unused.
+ * Using a int rather than a char to eliminate false register dependencies
+ * causing stalls on some architectures.
+ */
+extern unsigned long rcu_gp_ctr;
+
+extern QemuEvent rcu_gp_event;
+
+struct rcu_reader_data {
+    /* Data used by both reader and synchronize_rcu() */
+    unsigned long ctr;
+    bool waiting;
+
+    /* Data used for registry, protected by rcu_gp_lock */
+    QLIST_ENTRY(rcu_reader_data) node;
+};
+
+#ifdef __linux__
+extern __thread struct rcu_reader_data rcu_reader;
+#define DEFINE_RCU_READER() \
+    __thread struct rcu_reader_data rcu_reader
+
+static inline struct rcu_reader_data *get_rcu_reader(void)
+{
+    return &rcu_reader;
+}
+
+static inline void alloc_rcu_reader(void)
+{
+}
+#else
+extern GPrivate rcu_reader_key;
+#define DEFINE_RCU_READER() \
+     GPrivate rcu_reader_key = G_PRIVATE_INIT(g_free)
+
+static inline struct rcu_reader_data *get_rcu_reader(void)
+{
+    return g_private_get(&rcu_reader_key);
+}
+
+static inline void alloc_rcu_reader(void)
+{
+     g_private_replace(&rcu_reader_key,
+                       g_malloc0(sizeof(struct rcu_reader_data)));
+}
+#endif
+
+static inline void rcu_read_lock(void)
+{
+    rcu_assert(get_rcu_reader()->ctr);
+}
+
+static inline void rcu_read_unlock(void)
+{
+    /* Ensure that the previous reads complete before starting those
+     * in another critical section.
+     */
+    smp_rmb();
+}
+
+static inline void rcu_quiescent_state(void)
+{
+    struct rcu_reader_data *p_rcu_reader = get_rcu_reader();
+
+    /* Reuses smp_rmb() in the last rcu_read_unlock().  */
+    unsigned ctr = atomic_read(&rcu_gp_ctr);
+    atomic_xchg(&p_rcu_reader->ctr, ctr);
+    if (atomic_read(&p_rcu_reader->waiting)) {
+        atomic_set(&p_rcu_reader->waiting, false);
+        qemu_event_set(&rcu_gp_event);
+    }
+}
+
+static inline void rcu_thread_offline(void)
+{
+    struct rcu_reader_data *p_rcu_reader = get_rcu_reader();
+
+    atomic_xchg(&p_rcu_reader->ctr, 0);
+    if (atomic_read(&p_rcu_reader->waiting)) {
+        atomic_set(&p_rcu_reader->waiting, false);
+        qemu_event_set(&rcu_gp_event);
+    }
+}
+
+static inline void rcu_thread_online(void)
+{
+    rcu_quiescent_state();
+}
+
+extern void synchronize_rcu(void);
+
+/*
+ * Reader thread registration.
+ */
+extern void rcu_register_thread(void);
+extern void rcu_unregister_thread(void);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _URCU_QSBR_H */
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index 3e32c65..5d64a20 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -25,9 +25,6 @@ void qemu_mutex_lock(QemuMutex *mutex);
 int qemu_mutex_trylock(QemuMutex *mutex);
 void qemu_mutex_unlock(QemuMutex *mutex);
 
-#define rcu_read_lock() do { } while (0)
-#define rcu_read_unlock() do { } while (0)
-
 void qemu_cond_init(QemuCond *cond);
 void qemu_cond_destroy(QemuCond *cond);
 
diff --git a/libcacard/Makefile b/libcacard/Makefile
index 47827a0..f7a3b07 100644
--- a/libcacard/Makefile
+++ b/libcacard/Makefile
@@ -4,7 +4,8 @@ TOOLS += vscclient$(EXESUF)
 
 # objects linked into a shared library, built with libtool with -fPIC if 
required
 libcacard-obj-y = $(stub-obj-y) $(libcacard-y)
-libcacard-obj-y += util/osdep.o util/cutils.o util/qemu-timer-common.o 
util/error.o
+libcacard-obj-y += util/osdep.o util/cutils.o util/qemu-timer-common.o
+libcacard-obj-y += util/rcu.o util/error.o
 libcacard-obj-$(CONFIG_WIN32) += util/oslib-win32.o util/qemu-thread-win32.o
 libcacard-obj-$(CONFIG_POSIX) += util/oslib-posix.o util/qemu-thread-posix.o
 libcacard-obj-y += $(filter trace/%, $(util-obj-y))
diff --git a/util/Makefile.objs b/util/Makefile.objs
index dc72ab0..c34e5ee 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -11,3 +11,4 @@ util-obj-y += iov.o aes.o qemu-config.o qemu-sockets.o uri.o 
notify.o
 util-obj-y += qemu-option.o qemu-progress.o
 util-obj-y += hexdump.o
 util-obj-y += crc32c.o
+util-obj-y += rcu.o
diff --git a/util/rcu.c b/util/rcu.c
new file mode 100644
index 0000000..48686a3
--- /dev/null
+++ b/util/rcu.c
@@ -0,0 +1,203 @@
+/*
+ * urcu-qsbr.c
+ *
+ * Userspace RCU QSBR library
+ *
+ * Copyright (c) 2009 Mathieu Desnoyers <address@hidden>
+ * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
+ * Copyright 2013 Red Hat, Inc.
+ *
+ * Ported to QEMU by Paolo Bonzini  <address@hidden>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * IBM's contributions to this file may be relicensed under LGPLv2 or later.
+ */
+
+#include <stdio.h>
+#include <assert.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <errno.h>
+#include "qemu/rcu.h"
+#include "qemu/atomic.h"
+
+/*
+ * Global grace period counter.  Bit 0 is one if the thread is online.
+ * Bits 1 and above are defined in synchronize_rcu/update_counter_and_wait.
+ */
+#define RCU_GP_ONLINE           (1UL << 0)
+#define RCU_GP_CTR              (1UL << 1)
+
+unsigned long rcu_gp_ctr = RCU_GP_ONLINE;
+
+QemuEvent rcu_gp_event;
+static QemuMutex rcu_gp_lock;
+
+/*
+ * Check whether a quiescent state was crossed between the beginning of
+ * update_counter_and_wait and now.
+ */
+static inline int rcu_gp_ongoing(unsigned long *ctr)
+{
+    unsigned long v;
+
+    /* See update_counter_and_wait for the discussion of memory barriers.  */
+    v = atomic_read(ctr);
+    return v && (v != rcu_gp_ctr);
+}
+
+/* Written to only by each individual reader. Read by both the reader and the
+ * writers.
+ */
+DEFINE_RCU_READER();
+
+/* Protected by rcu_gp_lock.  */
+typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
+static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
+
+/* Wait for previous parity/grace period to be empty of readers.  */
+static void wait_for_readers(void)
+{
+    ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders);
+    struct rcu_reader_data *index, *tmp;
+
+    for (;;) {
+        /* We want to be notified of changes made to rcu_gp_ongoing
+         * while we walk the list.
+         */
+        qemu_event_reset(&rcu_gp_event);
+
+        /* Instead of using atomic_mb_set for index->waiting, and
+         * atomic_mb_read for index->ctr, memory barriers are placed
+         * manually since writes to different threads are independent.
+         * atomic_mb_set has a smp_wmb before...
+         */
+        smp_wmb();
+        QLIST_FOREACH(index, &registry, node) {
+            atomic_set(&index->waiting, true);
+        }
+
+        /* ... and a smp_mb after.
+         *
+         * This barrier also blocks stores that free old RCU-protected
+         * pointers.
+         */
+        smp_mb();
+
+        QLIST_FOREACH_SAFE(index, &registry, node, tmp) {
+            if (!rcu_gp_ongoing(&index->ctr)) {
+                QLIST_REMOVE(index, node);
+                QLIST_INSERT_HEAD(&qsreaders, index, node);
+
+                /* No need for mb_set here, worst of all we
+                 * get some extra futex wakeups.
+                 */
+                atomic_set(&index->waiting, false);
+            }
+        }
+
+        /* atomic_mb_read has smp_rmb after.  */
+        smp_rmb();
+
+        if (QLIST_EMPTY(&registry)) {
+            break;
+        }
+
+        /* Wait for one thread to report a quiescent state and
+         * try again.
+         */
+        qemu_event_wait(&rcu_gp_event);
+    }
+
+    /* put back the reader list in the registry */
+    QLIST_SWAP(&registry, &qsreaders, node);
+}
+
+void synchronize_rcu(void)
+{
+    unsigned long was_online;
+
+    was_online = get_rcu_reader()->ctr;
+
+    /* Mark the writer thread offline to make sure we don't wait for
+     * our own quiescent state. This allows using synchronize_rcu()
+     * in threads registered as readers.
+     *
+     * rcu_thread_offline() and rcu_thread_online() include a
+     * memory barrier.
+     */
+    if (was_online) {
+        rcu_thread_offline();
+    } else {
+        smp_mb();
+    }
+
+    qemu_mutex_lock(&rcu_gp_lock);
+
+    if (!QLIST_EMPTY(&registry)) {
+        if (sizeof(rcu_gp_ctr) < 8) {
+            /* For architectures with 32-bit longs, a two-subphases algorithm
+             * ensures we do not encounter overflow bugs.
+             *
+             * Switch parity: 0 -> 1, 1 -> 0.
+             */
+            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
+            wait_for_readers();
+            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
+        } else {
+            /* Increment current grace period.  */
+            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
+        }
+
+        wait_for_readers();
+    }
+
+    qemu_mutex_unlock(&rcu_gp_lock);
+
+    if (was_online) {
+        rcu_thread_online();
+    } else {
+        smp_mb();
+    }
+}
+
+void rcu_register_thread(void)
+{
+    if (!get_rcu_reader()) {
+        alloc_rcu_reader();
+    }
+
+    assert(get_rcu_reader()->ctr == 0);
+    qemu_mutex_lock(&rcu_gp_lock);
+    QLIST_INSERT_HEAD(&registry, get_rcu_reader(), node);
+    qemu_mutex_unlock(&rcu_gp_lock);
+    rcu_quiescent_state();
+}
+
+void rcu_unregister_thread(void)
+{
+    rcu_thread_offline();
+    qemu_mutex_lock(&rcu_gp_lock);
+    QLIST_REMOVE(get_rcu_reader(), node);
+    qemu_mutex_unlock(&rcu_gp_lock);
+}
+
+static void __attribute__((__constructor__)) rcu_init(void)
+{
+    qemu_mutex_init(&rcu_gp_lock);
+    qemu_event_init(&rcu_gp_event, true);
+    rcu_register_thread();
+}
-- 
1.8.3.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]