qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH 03/12] rcu: add rcu library


From: liu ping fan
Subject: Re: [Qemu-devel] [PATCH 03/12] rcu: add rcu library
Date: Fri, 17 May 2013 12:36:21 +0800

On Wed, May 15, 2013 at 11:48 PM, Paolo Bonzini <address@hidden> wrote:
> This includes a (mangled) copy of the urcu-qsbr code from liburcu.
> The main changes are: 1) removing dependencies on many other header files
> in liburcu; 2) removing for simplicity the tentative busy waiting in
> synchronize_rcu, which has limited performance effects; 3) replacing
> futexes in synchronize_rcu with QemuEvents for Win32 portability.
> The API is the same as liburcu, so it should be possible in the future
> to require liburcu on POSIX systems for example and use our copy only
> on Windows.
>
> Among the various versions available I chose urcu-qsbr, which has the
> fastest rcu_read_{lock,unlock} but requires the program to manually
> annotate quiescent points or intervals.  QEMU threads usually have easily
> identified quiescent periods, so this should not be a problem.
>
> Signed-off-by: Paolo Bonzini <address@hidden>
> ---
>  docs/rcu.txt               | 301 
> +++++++++++++++++++++++++++++++++++++++++++++
>  hw/9pfs/virtio-9p-synth.c  |   1 +
>  include/qemu/queue.h       |  13 ++
>  include/qemu/rcu-pointer.h | 110 +++++++++++++++++
>  include/qemu/rcu.h         | 168 +++++++++++++++++++++++++
>  include/qemu/thread.h      |   3 -
>  libcacard/Makefile         |   3 +-
>  util/Makefile.objs         |   1 +
>  util/rcu.c                 | 203 ++++++++++++++++++++++++++++++
>  9 files changed, 799 insertions(+), 4 deletions(-)
>  create mode 100644 docs/rcu.txt
>  create mode 100644 include/qemu/rcu-pointer.h
>  create mode 100644 include/qemu/rcu.h
>  create mode 100644 util/rcu.c
>
> diff --git a/docs/rcu.txt b/docs/rcu.txt
> new file mode 100644
> index 0000000..19e4840
> --- /dev/null
> +++ b/docs/rcu.txt
> @@ -0,0 +1,301 @@
> +Using RCU (Read-Copy-Update) for synchronization
> +================================================
> +
> +Read-copy update (RCU) is a synchronization mechanism that is used to
> +protect read-mostly data structures.  RCU is very efficient and scalable
> +on the read side (it is wait-free), and thus can make the read paths
> +extremely fast.
> +
> +RCU supports concurrency between a single writer and multiple readers,
> +thus it is not used alone.  Typically, the write-side will use a lock to
> +serialize multiple updates, but other approaches are possible (e.g.,
> +restricting updates to a single task).  In QEMU, when a lock is used,
> +this will often be the "iothread mutex", also known as the "big QEMU
> +lock" (BQL).  Also, restricting updates to a single task is done in
> +QEMU using the "bottom half" API.
> +
> +RCU is fundamentally a "wait-to-finish" mechanism.  The read side marks
> +sections of code with "critical sections", and the update side will wait
> +for the execution of all *currently running* critical sections before
> +proceeding, or before asynchronously executing a callback.
> +
> +The key point here is that only the currently running critical sections
> +are waited for; critical sections that are started _after_ the beginning
> +of the wait do not extend the wait, despite running concurrently with
> +the updater.  This is the reason why RCU is more scalable than,
> +for example, reader-writer locks.  It is so much more scalable that
> +the system will have a single instance of the RCU mechanism; a single
> +mechanism can be used for an arbitrary number of "things", without
> +having to worry about things such as contention or deadlocks.
> +
> +How is this possible?  The basic idea is to split updates in two phases,
> +"removal" and "reclamation".  During removal, we ensure that subsequent
> +readers will not be able to get a reference to the old data.  After
> +removal has completed, a critical section will not be able to access
> +the old data.  Therefore, critical sections that begin after removal
> +do not matter; as soon as all previous critical sections have finished,
> +there cannot be any readers who hold references to the data structure,
> +which may not be safely reclaimed (e.g., freed or unref'ed).
> +
> +Here is a picutre:
> +
> +        thread 1                  thread 2                  thread 3
> +    -------------------    ------------------------    -------------------
> +    enter RCU crit.sec.
> +           |                finish removal phase
> +           |                begin wait
> +           |                      |                    enter RCU crit.sec.
> +    exit RCU crit.sec             |                           |
> +                            complete wait                     |
> +                            begin reclamation phase           |
> +                                                       exit RCU crit.sec.
> +
> +
> +Note how thread 3 is still executing its critical section when thread 2
> +starts reclaiming data.  This is possible, because the old version of the
> +data structure was not accessible at the time thread 3 began executing
> +that critical section.
> +
> +
> +RCU API
> +=======
> +
> +The core RCU API is small:
> +
> +     void rcu_read_lock(void);
> +
> +        Used by a reader to inform the reclaimer that the reader is
> +        entering an RCU read-side critical section.
> +
> +     void rcu_read_unlock(void);
> +
> +        Used by a reader to inform the reclaimer that the reader is
> +        exiting an RCU read-side critical section.  Note that RCU
> +        read-side critical sections may be nested and/or overlapping.
> +
> +     void synchronize_rcu(void);
> +
> +        Blocks until all pre-existing RCU read-side critical sections
> +        on all threads have completed.  This marks the end of the removal
> +        phase and the beginning of reclamation phase.
> +
> +        Note that it would be valid for another update to come while
> +        synchronize_rcu is running.  Because of this, it is better that
> +        the updater releases any locks it may hold before calling
> +        synchronize_rcu.
> +
> +     typeof(*p) rcu_dereference(p);
> +     typeof(p) rcu_assign_pointer(p, typeof(p) v);
> +
> +        These macros are similar to atomic_mb_read() and atomic_mb_set()
> +        respectively.  However, they make some assumptions on the code
> +        that calls them, which allows a more optimized implementation.
> +
> +        rcu_assign_pointer assumes that the update side is not going
> +        to read from the data structure after "publishing" the new
> +        values; that is, it assumes that all assignments happen at
> +        the very end of the removal phase.
> +
> +        rcu_dereference assumes that whenever a single RCU critical
> +        section reads multiple shared data, these reads are either
> +        data-dependent or need no ordering.  This is almost always the
> +        case when using RCU.  If this were not the case, you can use
> +        atomic_mb_read() or smp_rmb().
> +
> +        If you are going to be fetching multiple fields from the
> +        RCU-protected structure, repeated rcu_dereference() calls
> +        would look ugly and incur unnecessary overhead on Alpha CPUs.
> +        You can then do this:
> +
> +        p = &rcu_dereference(head);
> +        foo = head->foo;
> +        bar = head->bar;
> +
> +
> +RCU QUIESCENT STATES
> +====================
> +
> +An efficient implementation of rcu_read_lock() and rcu_read_unlock()
> +relies on the availability of fast thread-local storage.  Unfortunately,
> +this is not possible on all the systems supported by QEMU (in particular
> +on many POSIX systems other than Linux and Solaris).
> +
> +For this reason, QEMU's RCU implementation resorts to manual annotation
> +of "quiescent states", i.e. points where no RCU read-side critical
> +section can be active.  All threads that participate in the RCU mechanism
> +need to annotate such points.
> +
> +Marking quiescent states is done with the following three APIs:
> +
> +     void rcu_quiescent_state(void);
> +
> +        Marks a point in the execution of the current thread where no
> +        RCU read-side critical section can be active.
> +
> +     void rcu_thread_offline(void);
> +
> +        Marks the beginning of an "extended quiescent state" for the
> +        current thread, i.e. an interval of time during which no
> +        RCU read-side critical section can be active.
> +
> +     void rcu_thread_online(void);
> +
> +        Marks the end of an extended quiescent state for the current
> +        thread.
> +
> +
> +Furthermore, threads that participate in the RCU mechanism must communicate
> +this fact using the following APIs:
> +
> +     void rcu_register_thread(void);
> +
> +        Mark a thread as taking part in the RCU mechanism.  Such a thread
> +        will have to report quiescent points regularly, either manually
> +        or through the QemuCond/QemuSemaphore/QemuEvent APIs.
> +
> +     void rcu_unregister_thread(void);
> +
> +        Mark a thread as not taking part anymore in the RCU mechanism.
> +        It is not a problem if such a thread reports quiescent points,
> +        either manually or by using the QemuCond/QemuSemaphore/QemuEvent
> +        APIs.
> +
> +Note that these APIs are relatively heavyweight, and should _not_ be
> +nested.
> +
> +
> +DIFFERENCES WITH LINUX
> +======================
> +
> +- Sleeping is possible, though discouraged, within an RCU critical section.
> +
> +- rcu_dereference takes a _pointer_ to the variable being accessed.
> +  Wrong usage will be detected by the compiler.
> +
> +- Quiescent points must be marked explicitly unless the thread uses
> +  condvars/semaphores/events for synchronization.
> +
> +
> +RCU PATTERNS
> +============
> +
> +Many patterns using read-writer locks translate directly to RCU, with
> +the advantages of higher scalability and deadlock immunity.
> +
> +In general, RCU can be used whenever it is possible to create a new
> +"version" of a data structure every time the updater runs.  This may
> +sound like a very strict restriction, however:
> +
> +- the updater does not mean "everything that writes to a data structure",
> +  but rather "everything that involves a reclamation step".  See the
> +  array example below
> +
> +- in some cases, creating a new version of a data structure may actually
> +  be very cheap.  For example, modifying the "next" pointer of a singly
> +  linked list is effectively creating a new version of the list.
> +
> +
> +them however are worth noting.
> +
> +RCU list processing
> +-------------------
> +
> +TBD (not yet used in QEMU)
> +
> +
> +RCU reference counting
> +----------------------
> +
> +Because grace periods are not allowed to complete while there is an RCU
> +read-side critical section in progress, the RCU read-side primitives
> +may be used as a restricted reference-counting mechanism.  For example,
> +consider the following code fragment:
> +
> +    rcu_read_lock();
> +    p = rcu_dereference(&foo);
> +    /* do something with p. */
> +    rcu_read_unlock();
> +
> +The RCU read-side critical section ensures that the value of "p" remains
> +valid until after the rcu_read_unlock().  In some sense, it is acquiring
> +a reference to p that is later released when the critical section ends.
> +The write side looks simply like this (with appropriate locking):
> +
> +    qemu_mutex_lock(&foo_mutex);
> +    old = foo;
> +    rcu_assign_pointer(foo, new);
> +    qemu_mutex_unlock(&foo_mutex);
> +    synchronize_rcu();
> +    free(old);
> +
> +Note that the same idiom would be possible with reader/writer
> +locks:
> +
> +    read_lock(&foo_rwlock);         write_mutex_lock(&foo_rwlock);
> +    p = foo;                        p = foo;
> +    /* do something with p. */      foo = new;
> +    read_unlock(&foo_rwlock);       free(p);
> +                                    write_mutex_unlock(&foo_rwlock);
> +                                    free(p);
> +
> +
> +RCU resizable arrays
> +--------------------
> +
> +Resizable arrays can be used with RCU.  The expensive RCU synchronization
> +only needs to take place when the array is resized.  The two items to
> +take care of are:
> +
> +- ensuring that the old version of the array is available between removal
> +  and reclamation;
> +
> +- avoiding mismatches in the read side between the array data and the
> +  array size.
> +
> +The first problem is avoided simply by not using realloc.  Instead,
> +each resize will allocate a new array and copy the old data into it.
> +The second problem would arise if the size and the data pointers were
> +two members of a larger struct:
> +
> +    struct mystuff {
> +        ...
> +        int data_size;
> +        int data_alloc;
> +        T   *data;
> +        ...
> +    };
> +
> +Instead, we store the size of the array with the array itself:
> +
> +    struct arr {
> +        int size;
> +        int alloc;
> +        T   data[];
> +    };
> +    struct arr *global_array;
> +
> +    read side:
> +        rcu_read_lock();
> +        struct arr *array = rcu_dereference(&global_array);
> +        x = i < array->size ? array->data[i] : -1;
> +        rcu_read_unlock();
> +        return x;
> +
> +    write side (running under a lock):
> +        if (global_array->size == global_array->alloc) {
> +            /* Creating a new version.  */
> +            new_array = g_malloc(sizeof(struct arr) +
> +                                 global_array->alloc * 2 * sizeof(T));
> +            new_array->size = global_array->size;
> +            new_array->alloc = global_array->alloc * 2;
> +            memcpy(new_array->data, global_array->data,
> +                   global_array->alloc * sizeof(T));
> +
> +            /* Removal phase.  */
> +            old_array = global_array;
> +            rcu_assign_pointer(new_array->data, new_array);
> +            synchronize_rcu();
> +
> +            /* Reclamation phase.  */
> +            free(old_array);
> +        }
> diff --git a/hw/9pfs/virtio-9p-synth.c b/hw/9pfs/virtio-9p-synth.c
> index 840e4eb..d5f5842 100644
> --- a/hw/9pfs/virtio-9p-synth.c
> +++ b/hw/9pfs/virtio-9p-synth.c
> @@ -17,6 +17,7 @@
>  #include "virtio-9p-xattr.h"
>  #include "fsdev/qemu-fsdev.h"
>  #include "virtio-9p-synth.h"
> +#include "util/rcu.h"
>
>  #include <sys/stat.h>
>
> diff --git a/include/qemu/queue.h b/include/qemu/queue.h
> index d433b90..847ddd1 100644
> --- a/include/qemu/queue.h
> +++ b/include/qemu/queue.h
> @@ -104,6 +104,19 @@ struct {                                                 
>                \
>          (head)->lh_first = NULL;                                        \
>  } while (/*CONSTCOND*/0)
>
> +#define QLIST_SWAP(dstlist, srclist, field) do {                        \
> +        void *tmplist;                                                  \
> +        tmplist = (srclist)->lh_first;                                  \
> +        (srclist)->lh_first = (dstlist)->lh_first;                      \
> +        if ((srclist)->lh_first != NULL) {                              \
> +            (srclist)->lh_first->field.le_prev = &(srclist)->lh_first;  \
> +        }                                                               \
> +        (dstlist)->lh_first = tmplist;                                  \
> +        if ((dstlist)->lh_first != NULL) {                              \
> +            (dstlist)->lh_first->field.le_prev = &(dstlist)->lh_first;  \
> +        }                                                               \
> +} while (/*CONSTCOND*/0)
> +
>  #define QLIST_INSERT_AFTER(listelm, elm, field) do {                    \
>          if (((elm)->field.le_next = (listelm)->field.le_next) != NULL)  \
>                  (listelm)->field.le_next->field.le_prev =               \
> diff --git a/include/qemu/rcu-pointer.h b/include/qemu/rcu-pointer.h
> new file mode 100644
> index 0000000..0e6417c
> --- /dev/null
> +++ b/include/qemu/rcu-pointer.h
> @@ -0,0 +1,110 @@
> +#ifndef _URCU_POINTER_STATIC_H
> +#define _URCU_POINTER_STATIC_H
> +
> +/*
> + * urcu-pointer-static.h
> + *
> + * Userspace RCU header. Operations on pointers.
> + *
> + * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See urcu-pointer.h for
> + * linking dynamically with the userspace rcu library.
> + *
> + * Copyright (c) 2009 Mathieu Desnoyers <address@hidden>
> + * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 
> USA
> + *
> + * IBM's contributions to this file may be relicensed under LGPLv2 or later.
> + */
> +
> +#include "compiler.h"
> +#include "qemu/atomic.h"
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +/**
> + * rcu_dereference - reads (copy) a RCU-protected pointer to a local variable
> + * into a RCU read-side critical section. The pointer can later be safely
> + * dereferenced within the critical section.
> + *
> + * This ensures that the pointer copy is invariant thorough the whole 
> critical
> + * section.
> + *
> + * Inserts memory barriers on architectures that require them (currently only
> + * Alpha) and documents which pointers are protected by RCU.
> + *
> + * The compiler memory barrier in atomic_read() ensures that 
> value-speculative
> + * optimizations (e.g. VSS: Value Speculation Scheduling) does not perform 
> the
> + * data read before the pointer read by speculating the value of the pointer.
> + * Correct ordering is ensured because the pointer is read as a volatile 
> access.
> + * This acts as a global side-effect operation, which forbids reordering of
> + * dependent memory operations. Note that such concern about 
> dependency-breaking
> + * optimizations will eventually be taken care of by the 
> "memory_order_consume"
> + * addition to forthcoming C++ standard.
> + *
> + * Should match rcu_assign_pointer() or rcu_xchg_pointer().
> + */
> +
> +#define rcu_dereference(p)                      \
> +        ({                                      \
> +            typeof(p) _p1 = (p);                \
> +            smp_read_barrier_depends();         \
> +            *(_p1);                             \
> +        })
> +
> +/**
> + * rcu_cmpxchg_pointer - same as rcu_set_pointer, but tests if the pointer
> + * is as expected by "old". If succeeds, returns the previous pointer to the
> + * data structure, which can be safely freed after waiting for a quiescent 
> state
> + * using synchronize_rcu(). If fails (unexpected value), returns old (which
> + * should not be freed !).
> + */
> +
> +#define rcu_cmpxchg_pointer(p, old, _new)       \
> +        ({                                      \
> +            typeof(*p) _pold = (old);           \
> +            typeof(*p) _pnew = (_new);          \
> +            atomic_cmpxchg(p, _pold, _pnew);    \
> +        })
> +
> +#define rcu_set_pointer(p, v)                   \
> +        ({                                      \
> +             typeof(*p) _pv = (v);              \
> +             smp_wmb();                         \
> +             atomic_set(p, _pv);                \
> +        })
> +
> +/**
> + * rcu_assign_pointer - assign (publicize) a pointer to a new data structure
> + * meant to be read by RCU read-side critical sections. Returns the assigned
> + * value.
> + *
> + * Documents which pointers will be dereferenced by RCU read-side critical
> + * sections and adds the required memory barriers on architectures requiring
> + * them. It also makes sure the compiler does not reorder code initializing 
> the
> + * data structure before its publication.
> + *
> + * Should match rcu_dereference().
> + */
> +
> +#define rcu_assign_pointer(p, v)    rcu_set_pointer(&(p), v)
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _URCU_POINTER_STATIC_H */
> diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
> new file mode 100644
> index 0000000..b875593
> --- /dev/null
> +++ b/include/qemu/rcu.h
> @@ -0,0 +1,168 @@
> +#ifndef _URCU_QSBR_H
> +#define _URCU_QSBR_H
> +
> +/*
> + * urcu-qsbr.h
> + *
> + * Userspace RCU QSBR header.
> + *
> + * LGPL-compatible code should include this header with :
> + *
> + * #define _LGPL_SOURCE
> + * #include <urcu.h>
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 
> USA
> + *
> + * IBM's contributions to this file may be relicensed under LGPLv2 or later.
> + */
> +
> +#include <stdlib.h>
> +#include <assert.h>
> +#include <limits.h>
> +#include <unistd.h>
> +#include <stdint.h>
> +#include <stdbool.h>
> +#include <glib.h>
> +
> +#include "qemu/compiler.h"
> +#include "qemu/rcu-pointer.h"
> +#include "qemu/thread.h"
> +#include "qemu/queue.h"
> +#include "qemu/atomic.h"
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +/*
> + * Important !
> + *
> + * Each thread containing read-side critical sections must be registered
> + * with rcu_register_thread() before calling rcu_read_lock().
> + * rcu_unregister_thread() should be called before the thread exits.
> + */
> +
> +#ifdef DEBUG_RCU
> +#define rcu_assert(args...)    assert(args)
> +#else
> +#define rcu_assert(args...)
> +#endif
> +
> +#define RCU_GP_ONLINE     (1UL << 0)
> +#define RCU_GP_CTR        (1UL << 1)
> +
> +/*
> + * Global quiescent period counter with low-order bits unused.
> + * Using a int rather than a char to eliminate false register dependencies
> + * causing stalls on some architectures.
> + */
> +extern unsigned long rcu_gp_ctr;
> +
> +extern QemuEvent rcu_gp_event;
> +
> +struct rcu_reader_data {
> +    /* Data used by both reader and synchronize_rcu() */
> +    unsigned long ctr;
> +    bool waiting;
> +
> +    /* Data used for registry, protected by rcu_gp_lock */
> +    QLIST_ENTRY(rcu_reader_data) node;
> +};
> +
> +#ifdef __linux__
> +extern __thread struct rcu_reader_data rcu_reader;
> +#define DEFINE_RCU_READER() \
> +    __thread struct rcu_reader_data rcu_reader
> +
> +static inline struct rcu_reader_data *get_rcu_reader(void)
> +{
> +    return &rcu_reader;
> +}
> +
> +static inline void alloc_rcu_reader(void)
> +{
> +}
> +#else
> +extern GPrivate rcu_reader_key;
> +#define DEFINE_RCU_READER() \
> +     GPrivate rcu_reader_key = G_PRIVATE_INIT(g_free)
> +
> +static inline struct rcu_reader_data *get_rcu_reader(void)
> +{
> +    return g_private_get(&rcu_reader_key);
> +}
> +
> +static inline void alloc_rcu_reader(void)
> +{
> +     g_private_replace(&rcu_reader_key,
> +                       g_malloc0(sizeof(struct rcu_reader_data)));
> +}
> +#endif
> +
> +static inline void rcu_read_lock(void)
> +{
> +    rcu_assert(get_rcu_reader()->ctr);
> +}
> +
> +static inline void rcu_read_unlock(void)
> +{
> +    /* Ensure that the previous reads complete before starting those
> +     * in another critical section.
> +     */
> +    smp_rmb();
> +}
> +
> +static inline void rcu_quiescent_state(void)
> +{
> +    struct rcu_reader_data *p_rcu_reader = get_rcu_reader();
> +
> +    /* Reuses smp_rmb() in the last rcu_read_unlock().  */
> +    unsigned ctr = atomic_read(&rcu_gp_ctr);
> +    atomic_xchg(&p_rcu_reader->ctr, ctr);
> +    if (atomic_read(&p_rcu_reader->waiting)) {
> +        atomic_set(&p_rcu_reader->waiting, false);
> +        qemu_event_set(&rcu_gp_event);
> +    }
> +}
> +
> +static inline void rcu_thread_offline(void)
> +{
> +    struct rcu_reader_data *p_rcu_reader = get_rcu_reader();
> +
> +    atomic_xchg(&p_rcu_reader->ctr, 0);
> +    if (atomic_read(&p_rcu_reader->waiting)) {
> +        atomic_set(&p_rcu_reader->waiting, false);
> +        qemu_event_set(&rcu_gp_event);
> +    }
> +}
> +
> +static inline void rcu_thread_online(void)
> +{
> +    rcu_quiescent_state();
> +}
> +
> +extern void synchronize_rcu(void);
> +
> +/*
> + * Reader thread registration.
> + */
> +extern void rcu_register_thread(void);
> +extern void rcu_unregister_thread(void);
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _URCU_QSBR_H */
> diff --git a/include/qemu/thread.h b/include/qemu/thread.h
> index 3e32c65..5d64a20 100644
> --- a/include/qemu/thread.h
> +++ b/include/qemu/thread.h
> @@ -25,9 +25,6 @@ void qemu_mutex_lock(QemuMutex *mutex);
>  int qemu_mutex_trylock(QemuMutex *mutex);
>  void qemu_mutex_unlock(QemuMutex *mutex);
>
> -#define rcu_read_lock() do { } while (0)
> -#define rcu_read_unlock() do { } while (0)
> -
>  void qemu_cond_init(QemuCond *cond);
>  void qemu_cond_destroy(QemuCond *cond);
>
> diff --git a/libcacard/Makefile b/libcacard/Makefile
> index 47827a0..f7a3b07 100644
> --- a/libcacard/Makefile
> +++ b/libcacard/Makefile
> @@ -4,7 +4,8 @@ TOOLS += vscclient$(EXESUF)
>
>  # objects linked into a shared library, built with libtool with -fPIC if 
> required
>  libcacard-obj-y = $(stub-obj-y) $(libcacard-y)
> -libcacard-obj-y += util/osdep.o util/cutils.o util/qemu-timer-common.o 
> util/error.o
> +libcacard-obj-y += util/osdep.o util/cutils.o util/qemu-timer-common.o
> +libcacard-obj-y += util/rcu.o util/error.o
>  libcacard-obj-$(CONFIG_WIN32) += util/oslib-win32.o util/qemu-thread-win32.o
>  libcacard-obj-$(CONFIG_POSIX) += util/oslib-posix.o util/qemu-thread-posix.o
>  libcacard-obj-y += $(filter trace/%, $(util-obj-y))
> diff --git a/util/Makefile.objs b/util/Makefile.objs
> index 4a1bd4e..f05eba1 100644
> --- a/util/Makefile.objs
> +++ b/util/Makefile.objs
> @@ -11,3 +11,4 @@ util-obj-y += iov.o aes.o qemu-config.o qemu-sockets.o 
> uri.o notify.o
>  util-obj-y += qemu-option.o qemu-progress.o
>  util-obj-y += hexdump.o
>  util-obj-y += crc32c.o
> +util-obj-y += rcu.o
> diff --git a/util/rcu.c b/util/rcu.c
> new file mode 100644
> index 0000000..48686a3
> --- /dev/null
> +++ b/util/rcu.c
> @@ -0,0 +1,203 @@
> +/*
> + * urcu-qsbr.c
> + *
> + * Userspace RCU QSBR library
> + *
> + * Copyright (c) 2009 Mathieu Desnoyers <address@hidden>
> + * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
> + * Copyright 2013 Red Hat, Inc.
> + *
> + * Ported to QEMU by Paolo Bonzini  <address@hidden>
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 
> USA
> + *
> + * IBM's contributions to this file may be relicensed under LGPLv2 or later.
> + */
> +
> +#include <stdio.h>
> +#include <assert.h>
> +#include <stdlib.h>
> +#include <stdint.h>
> +#include <errno.h>
> +#include "qemu/rcu.h"
> +#include "qemu/atomic.h"
> +
> +/*
> + * Global grace period counter.  Bit 0 is one if the thread is online.
> + * Bits 1 and above are defined in synchronize_rcu/update_counter_and_wait.
> + */
> +#define RCU_GP_ONLINE           (1UL << 0)
> +#define RCU_GP_CTR              (1UL << 1)
> +
> +unsigned long rcu_gp_ctr = RCU_GP_ONLINE;
> +
> +QemuEvent rcu_gp_event;
> +static QemuMutex rcu_gp_lock;
> +
> +/*
> + * Check whether a quiescent state was crossed between the beginning of
> + * update_counter_and_wait and now.
> + */
> +static inline int rcu_gp_ongoing(unsigned long *ctr)
> +{
> +    unsigned long v;
> +
> +    /* See update_counter_and_wait for the discussion of memory barriers.  */
> +    v = atomic_read(ctr);
> +    return v && (v != rcu_gp_ctr);
> +}
> +
> +/* Written to only by each individual reader. Read by both the reader and the
> + * writers.
> + */
> +DEFINE_RCU_READER();
> +
> +/* Protected by rcu_gp_lock.  */
> +typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
> +static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
> +
> +/* Wait for previous parity/grace period to be empty of readers.  */
> +static void wait_for_readers(void)
> +{
> +    ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders);
> +    struct rcu_reader_data *index, *tmp;
> +
> +    for (;;) {
> +        /* We want to be notified of changes made to rcu_gp_ongoing
> +         * while we walk the list.
> +         */
> +        qemu_event_reset(&rcu_gp_event);
> +
> +        /* Instead of using atomic_mb_set for index->waiting, and
> +         * atomic_mb_read for index->ctr, memory barriers are placed
> +         * manually since writes to different threads are independent.
> +         * atomic_mb_set has a smp_wmb before...
> +         */
> +        smp_wmb();
> +        QLIST_FOREACH(index, &registry, node) {
> +            atomic_set(&index->waiting, true);
> +        }
> +
> +        /* ... and a smp_mb after.
> +         *
> +         * This barrier also blocks stores that free old RCU-protected
> +         * pointers.
> +         */
> +        smp_mb();
> +
> +        QLIST_FOREACH_SAFE(index, &registry, node, tmp) {
> +            if (!rcu_gp_ongoing(&index->ctr)) {
> +                QLIST_REMOVE(index, node);
> +                QLIST_INSERT_HEAD(&qsreaders, index, node);
> +
> +                /* No need for mb_set here, worst of all we
> +                 * get some extra futex wakeups.
> +                 */
> +                atomic_set(&index->waiting, false);
> +            }
> +        }
> +
> +        /* atomic_mb_read has smp_rmb after.  */
> +        smp_rmb();
> +
> +        if (QLIST_EMPTY(&registry)) {
> +            break;
> +        }
> +
> +        /* Wait for one thread to report a quiescent state and
> +         * try again.
> +         */
> +        qemu_event_wait(&rcu_gp_event);
> +    }
> +
> +    /* put back the reader list in the registry */
> +    QLIST_SWAP(&registry, &qsreaders, node);
> +}
> +
> +void synchronize_rcu(void)
> +{
> +    unsigned long was_online;
> +
> +    was_online = get_rcu_reader()->ctr;
> +
> +    /* Mark the writer thread offline to make sure we don't wait for
> +     * our own quiescent state. This allows using synchronize_rcu()
> +     * in threads registered as readers.
> +     *
> +     * rcu_thread_offline() and rcu_thread_online() include a
> +     * memory barrier.
> +     */
> +    if (was_online) {
> +        rcu_thread_offline();

Encourage the user to call synchronize_rcu() in reader? I think the
caller should ensure it is outside read-section. Also online can be
nested which make the situation even worse.

Regards,
Pingfan
> +    } else {
> +        smp_mb();
> +    }
> +
> +    qemu_mutex_lock(&rcu_gp_lock);
> +
> +    if (!QLIST_EMPTY(&registry)) {
> +        if (sizeof(rcu_gp_ctr) < 8) {
> +            /* For architectures with 32-bit longs, a two-subphases algorithm
> +             * ensures we do not encounter overflow bugs.
> +             *
> +             * Switch parity: 0 -> 1, 1 -> 0.
> +             */
> +            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
> +            wait_for_readers();
> +            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
> +        } else {
> +            /* Increment current grace period.  */
> +            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
> +        }
> +
> +        wait_for_readers();
> +    }
> +
> +    qemu_mutex_unlock(&rcu_gp_lock);
> +
> +    if (was_online) {
> +        rcu_thread_online();
> +    } else {
> +        smp_mb();
> +    }
> +}
> +
> +void rcu_register_thread(void)
> +{
> +    if (!get_rcu_reader()) {
> +        alloc_rcu_reader();
> +    }
> +
> +    assert(get_rcu_reader()->ctr == 0);
> +    qemu_mutex_lock(&rcu_gp_lock);
> +    QLIST_INSERT_HEAD(&registry, get_rcu_reader(), node);
> +    qemu_mutex_unlock(&rcu_gp_lock);
> +    rcu_quiescent_state();
> +}
> +
> +void rcu_unregister_thread(void)
> +{
> +    rcu_thread_offline();
> +    qemu_mutex_lock(&rcu_gp_lock);
> +    QLIST_REMOVE(get_rcu_reader(), node);
> +    qemu_mutex_unlock(&rcu_gp_lock);
> +}
> +
> +static void __attribute__((__constructor__)) rcu_init(void)
> +{
> +    qemu_mutex_init(&rcu_gp_lock);
> +    qemu_event_init(&rcu_gp_event, true);
> +    rcu_register_thread();
> +}
> --
> 1.8.1.4
>
>



reply via email to

[Prev in Thread] Current Thread [Next in Thread]