qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [RFC PATCH] Introduce RCU-enabled DQs (v2)


From: Paolo Bonzini
Subject: Re: [Qemu-devel] [RFC PATCH] Introduce RCU-enabled DQs (v2)
Date: Sun, 25 Aug 2013 08:32:08 +0200
User-agent: Mozilla/5.0 (X11; Linux x86_64; rv:17.0) Gecko/20130805 Thunderbird/17.0.8

Il 24/08/2013 21:06, Mike Day ha scritto:
> Add RCU-enabled variants on the existing bsd DQ facility. Each Q
> operation has the same interface as the existing (non-RCU)
> version. Also, each operation is implemented as macro for now.
> 
> Using the RCU-enabled DQ, existing DQ users will be able to convert to
> RCU without using a different list interface.
> 
> This version (2) adds a macro to walk a Q in reverse:
> 
> QLIST_FOREACH_REVERSE_RCU(el, head, field)

Just a couple of questions, one of them on the new macro...

> +/* prior to publication of the elm->prev->next value, some list
> + * readers  may still see the removed element when following
> + * the antecedent's next pointer.
> + */
> +
> +#define QLIST_REMOVE_RCU(elm, field) do {                       \
> +    if ((elm)->field.le_next != NULL) {                         \
> +       (elm)->field.le_next->field.le_prev =                    \
> +        (elm)->field.le_prev;                                   \
> +    }                                                           \
> +    atomic_rcu_set((elm)->field.le_prev, (elm)->field.le_next); \
> +} while (/*CONSTCOND*/0)

Why is the barrier needed here, but not in Linux's list_del_rcu?

I think it is not needed because all involved elements have already been
published and just have their pointers shuffled.

> +/* QLIST_ENTRY MUST be the first element of (var)
> + */
> +#define QLIST_FOREACH_REVERSE_RCU(var, head, field)                     \
> +    for ( ;                                                             \
> +    (var) && (var) != (head)->lh_first;                                 \
> +          (var) = (typeof(var))atomic_rcu_read(&(var)->field.le_prev))

This API has no non-_RCU equivalent, so I don't think it is needed.

Paolo

> +
> +#ifdef __cplusplus
> +}
> +#endif
> +#endif /* QEMU_RCU_SYS_QUEUE.H */
> diff --git a/tests/Makefile b/tests/Makefile
> index 136b7a4..98674f9 100644
> --- a/tests/Makefile
> +++ b/tests/Makefile
> @@ -53,7 +53,8 @@ gcov-files-test-int128-y =
>  check-unit-y += tests/test-bitops$(EXESUF)
>  check-unit-y += tests/rcutorture$(EXESUF)
>  gcov-files-rcutorture-y = util/rcu.c
> -
> +check-unit-y += tests/rcuq_test$(EXESUF)
> +gcov-files-rcuq_test-y = util/rcu.c
>  check-block-$(CONFIG_POSIX) += tests/qemu-iotests-quick.sh
>  
>  # All QTests for now are POSIX-only, but the dependencies are
> @@ -105,7 +106,7 @@ test-obj-y = tests/check-qint.o tests/check-qstring.o 
> tests/check-qdict.o \
>       tests/test-qmp-input-visitor.o tests/test-qmp-input-strict.o \
>       tests/test-qmp-commands.o tests/test-visitor-serialization.o \
>       tests/test-x86-cpuid.o tests/test-mul64.o tests/test-int128.o \
> -     tests/test-tls.o tests/rcutorture.o
> +     tests/test-tls.o tests/rcutorture.o tests/rcuq_test.o
>  
>  test-qapi-obj-y = tests/test-qapi-visit.o tests/test-qapi-types.o
>  
> @@ -131,6 +132,7 @@ tests/test-cutils$(EXESUF): tests/test-cutils.o 
> util/cutils.o
>  tests/test-int128$(EXESUF): tests/test-int128.o
>  tests/test-tls$(EXESUF): tests/test-tls.o libqemuutil.a
>  tests/rcutorture$(EXESUF): tests/rcutorture.o libqemuutil.a
> +tests/rcuq_test$(EXESUF): tests/rcuq_test.o libqemuutil.a
>  
>  tests/test-qapi-types.c tests/test-qapi-types.h :\
>  $(SRC_PATH)/tests/qapi-schema/qapi-schema-test.json 
> $(SRC_PATH)/scripts/qapi-types.py
> diff --git a/tests/rcuq_test.c b/tests/rcuq_test.c
> new file mode 100644
> index 0000000..4f62143
> --- /dev/null
> +++ b/tests/rcuq_test.c
> @@ -0,0 +1,290 @@
> +/*
> + * rcuq_test.c
> + *
> + * usage: rcuq_test <readers> <duration>
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 2 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
> + *
> + * Copyright (c) 2013 Mike D. Day, IBM Corporation.
> + */
> +
> +/*
> + * Test variables.
> + */
> +
> +#include <glib.h>
> +#include <stdlib.h>
> +#include <stdio.h>
> +#include <string.h>
> +#include "qemu/atomic.h"
> +#include "qemu/rcu.h"
> +#include "qemu/compiler.h"
> +#include "qemu/thread.h"
> +#include "qemu/rcu_queue.h"
> +
> +long long n_reads = 0LL;
> +long long n_updates = 0LL;
> +long long n_reclaims = 0LL;
> +long long n_nodes_removed = 0LL;
> +long long n_nodes = 0LL;
> +
> +
> +int nthreadsrunning;
> +
> +char argsbuf[64];
> +
> +#define GOFLAG_INIT 0
> +#define GOFLAG_RUN  1
> +#define GOFLAG_STOP 2
> +
> +static volatile int goflag = GOFLAG_INIT;
> +
> +#define RCU_READ_RUN 1000
> +#define RCU_UPDATE_RUN 10
> +#define NR_THREADS 100
> +#define RCU_Q_LEN 100
> +
> +static QemuThread threads[NR_THREADS];
> +static struct rcu_reader_data *data[NR_THREADS];
> +static int n_threads;
> +
> +static int select_random_el(int max)
> +{
> +    return (rand() % max);
> +}
> +
> +
> +static void create_thread(void *(*func)(void *))
> +{
> +    if (n_threads >= NR_THREADS) {
> +        fprintf(stderr, "Thread limit of %d exceeded!\n", NR_THREADS);
> +        exit(-1);
> +    }
> +    qemu_thread_create(&threads[n_threads], func, &data[n_threads],
> +                       QEMU_THREAD_JOINABLE);
> +    n_threads++;
> +}
> +
> +static void wait_all_threads(void)
> +{
> +    int i;
> +
> +    for (i = 0; i < n_threads; i++) {
> +        qemu_thread_join(&threads[i]);
> +    }
> +    n_threads = 0;
> +}
> +
> +
> +struct list_element {
> +    QLIST_ENTRY(list_element) entry;
> +    struct rcu_head rcu;
> +    long long val;
> +};
> +
> +static void reclaim_list_el(struct rcu_head *prcu)
> +{
> +    struct list_element *el = container_of(prcu, struct list_element, rcu);
> +    g_free(el);
> +    atomic_add(&n_reclaims, 1);
> +}
> +
> +static QLIST_HEAD(q_list_head, list_element) Q_list_head;
> +
> +static void *rcu_q_reader(void *arg)
> +{
> +    long long j, n_reads_local = 0;
> +    struct list_element *el, *prev_el = NULL;
> +
> +    *(struct rcu_reader_data **)arg = tls_get_rcu_reader();
> +    atomic_inc(&nthreadsrunning);
> +    rcu_thread_offline();
> +    while (goflag == GOFLAG_INIT) {
> +        g_usleep(1000);
> +    }
> +
> +    rcu_thread_online();
> +    while (goflag == GOFLAG_RUN) {
> +        rcu_read_lock();
> +        QLIST_FOREACH_RCU(el, &Q_list_head, entry) {
> +            prev_el = el;
> +            j = el->val;
> +            if (j){;}
> +            n_reads_local++;
> +            if (goflag == GOFLAG_STOP)
> +                break;
> +        }
> +        rcu_read_unlock();
> +
> +        rcu_quiescent_state();
> +        g_usleep(100);
> +        rcu_read_lock();
> +        QLIST_FOREACH_REVERSE_RCU(prev_el, &Q_list_head, entry) {
> +            el = prev_el;
> +            j = el->val;
> +            if (j){;}
> +            n_reads_local++;
> +            if (goflag != GOFLAG_RUN)
> +                break;
> +        }
> +        rcu_read_unlock();
> +
> +        rcu_quiescent_state();
> +        g_usleep(100);
> +    }
> +    rcu_thread_offline();
> +    atomic_add(&n_reads, n_reads_local);
> +    return NULL;
> +}
> +
> +
> +static void *rcu_q_updater(void *arg)
> +{
> +    int i, j, target_el;
> +    long long n_updates_local = 0;
> +    long long n_removed_local = 0;
> +    struct list_element *el, *prev_el;
> +
> +    *(struct rcu_reader_data **)arg = tls_get_rcu_reader();
> +    atomic_inc(&nthreadsrunning);
> +    while (goflag == GOFLAG_INIT) {
> +        g_usleep(1000);
> +    }
> +
> +    while (goflag == GOFLAG_RUN) {
> +        for (i = 0; i < RCU_UPDATE_RUN && goflag == GOFLAG_RUN; i++) {
> +            target_el = select_random_el(RCU_Q_LEN);
> +            j = 0;
> +            /* FOREACH_RCU could work here but let's use both macros */
> +            QLIST_FOREACH_SAFE_RCU(prev_el, &Q_list_head, entry, el) {
> +                j++;
> +                if (target_el == j) {
> +                    QLIST_REMOVE_RCU(prev_el, entry);
> +                    /* may be more than one updater in the future */
> +                    call_rcu1(&prev_el->rcu, reclaim_list_el);
> +                    n_removed_local++;
> +                    break;
> +                }
> +            }
> +            if (goflag == GOFLAG_STOP)
> +                break;
> +            target_el = select_random_el(RCU_Q_LEN);
> +            j = 0;
> +            QLIST_FOREACH_RCU(el, &Q_list_head, entry) {
> +                j++;
> +                if (target_el == j) {
> +                    prev_el = g_new(struct list_element, 1);
> +                    atomic_add(&n_nodes, 1);
> +                    prev_el->val = atomic_read(&n_nodes);
> +                    QLIST_INSERT_BEFORE_RCU(el, prev_el, entry);
> +                    break;
> +                }
> +            }
> +        }
> +        n_updates_local += 2;
> +        synchronize_rcu();
> +    }
> +    rcu_quiescent_state();
> +    atomic_add(&n_updates, n_updates_local);
> +    atomic_add(&n_nodes_removed, n_removed_local);
> +    return NULL;
> +}
> +
> +static void rcu_qtest_init(void)
> +{
> +    struct list_element *new_el;
> +    int i;
> +    nthreadsrunning = 0;
> +    srand(time(0));
> +    for (i = 0; i < RCU_Q_LEN; i++) {
> +        new_el = g_new(struct list_element, 1);
> +        new_el->val = i;
> +        QLIST_INSERT_HEAD_RCU(&Q_list_head, new_el, entry);
> +    }
> +    atomic_add(&n_nodes, RCU_Q_LEN);
> +}
> +
> +static void rcu_qtest_run(int duration, int nreaders)
> +{
> +    int nthreads = nreaders + 1;
> +    while (atomic_read(&nthreadsrunning) < nthreads) {
> +        g_usleep(1000);
> +    }
> +
> +    goflag = GOFLAG_RUN;
> +    sleep(duration);
> +    goflag = GOFLAG_STOP;
> +    wait_all_threads();
> +}
> +
> +
> +static void rcu_qtest(const char *test, int duration, int nreaders)
> +{
> +    int i;
> +    long long n_removed_local=0;
> +
> +    struct list_element *el, *prev_el;
> +
> +    rcu_qtest_init();
> +    for (i = 0; i < nreaders; i++) {
> +        create_thread(rcu_q_reader);
> +    }
> +    create_thread(rcu_q_updater);
> +    rcu_qtest_run(duration, nreaders);
> +    rcu_thread_offline();
> +
> +    QLIST_FOREACH_SAFE_RCU(prev_el, &Q_list_head, entry, el) {
> +        QLIST_REMOVE_RCU(prev_el, entry);
> +        call_rcu1(&prev_el->rcu, reclaim_list_el);
> +        n_removed_local++;
> +    }
> +    atomic_add(&n_nodes_removed, n_removed_local);
> +    rcu_quiescent_state();
> +    synchronize_rcu();
> +    while (n_nodes_removed > n_reclaims){
> +        rcu_quiescent_state();
> +        g_usleep(100);
> +        synchronize_rcu();
> +    }
> +    printf("%s: %d readers; 1 updater; nodes read: %lld, nodes removed: 
> %lld; nodes reclaimed: %lld\n",
> +           test, nthreadsrunning -1, n_reads, n_nodes_removed, n_reclaims);
> +    exit(0);
> +}
> +
> +static void usage(int argc, char *argv[])
> +{
> +    fprintf(stderr, "Usage: %s duration nreaders\n", argv[0]);
> +    exit(-1);
> +}
> +
> +
> +int main(int argc, char *argv[])
> +{
> +    int duration = 0, readers = 0;
> +
> +    if (argc >= 2) {
> +        duration = strtoul(argv[1], NULL, 0);
> +    }
> +    if (argc >= 3) {
> +        readers = strtoul(argv[2], NULL, 0);
> +    }
> +
> +    /* This thread is not part of the test.  */
> +    rcu_thread_offline();
> +    if (duration && readers)
> +        rcu_qtest(argv[0], duration, readers);
> +    usage(argc, argv);
> +    return 0;
> +}
> 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]