qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH 4/6] throttle: Add throttle group support


From: Alberto Garcia
Subject: [Qemu-devel] [PATCH 4/6] throttle: Add throttle group support
Date: Tue, 10 Mar 2015 17:30:48 +0200

The throttle group support use a cooperative round robin scheduling
algorithm.

The principles of the algorithm are simple:
- Each BDS of the group is used as a token in a circular way.
- The active BDS computes if a wait must be done and arms the right
  timer.
- If a wait must be done the token timer will be armed so the token
  will become the next active BDS.

Signed-off-by: Alberto Garcia <address@hidden>
---
 block.c                         |  65 ++++++++++----------
 block/qapi.c                    |   5 +-
 block/throttle-groups.c         | 127 ++++++++++++++++++++++++++++++++++++++++
 blockdev.c                      |  19 +++++-
 hmp.c                           |   4 +-
 include/block/block.h           |   3 +-
 include/block/block_int.h       |   9 +--
 include/block/throttle-groups.h |   4 ++
 qapi/block-core.json            |   4 +-
 qemu-options.hx                 |   1 +
 qmp-commands.hx                 |   3 +-
 11 files changed, 198 insertions(+), 46 deletions(-)

diff --git a/block.c b/block.c
index da0cb48..cd8582f 100644
--- a/block.c
+++ b/block.c
@@ -36,6 +36,7 @@
 #include "qmp-commands.h"
 #include "qemu/timer.h"
 #include "qapi-event.h"
+#include "block/throttle-groups.h"
 
 #ifdef CONFIG_BSD
 #include <sys/types.h>
@@ -130,7 +131,7 @@ void bdrv_set_io_limits(BlockDriverState *bs,
 {
     int i;
 
-    throttle_config(&bs->throttle_state, &bs->throttle_timers, cfg);
+    throttle_group_config(bs->throttle_state, &bs->throttle_timers, cfg);
 
     for (i = 0; i < 2; i++) {
         qemu_co_enter_next(&bs->throttled_reqs[i]);
@@ -160,9 +161,8 @@ static bool bdrv_start_throttled_reqs(BlockDriverState *bs)
 void bdrv_io_limits_disable(BlockDriverState *bs)
 {
     bs->io_limits_enabled = false;
-
     bdrv_start_throttled_reqs(bs);
-
+    throttle_group_unregister_bs(bs);
     throttle_timers_destroy(&bs->throttle_timers);
 }
 
@@ -179,10 +179,11 @@ static void bdrv_throttle_write_timer_cb(void *opaque)
 }
 
 /* should be called before bdrv_set_io_limits if a limit is set */
-void bdrv_io_limits_enable(BlockDriverState *bs)
+void bdrv_io_limits_enable(BlockDriverState *bs, const char *group)
 {
     assert(!bs->io_limits_enabled);
-    throttle_init(&bs->throttle_state);
+
+    throttle_group_register_bs(bs, group ? group : bdrv_get_device_name(bs));
     throttle_timers_init(&bs->throttle_timers,
                          bdrv_get_aio_context(bs),
                          QEMU_CLOCK_VIRTUAL,
@@ -192,6 +193,23 @@ void bdrv_io_limits_enable(BlockDriverState *bs)
     bs->io_limits_enabled = true;
 }
 
+void bdrv_io_limits_update_group(BlockDriverState *bs, const char *group)
+{
+    /* this bs is not part of any group */
+    if (!bs->throttle_state) {
+        return;
+    }
+
+    /* this bs is a part of the same group than the one we want */
+    if (!g_strcmp0(throttle_group_get_name(bs->throttle_state), group)) {
+        return;
+    }
+
+    /* need to change the group this bs belong to */
+    bdrv_io_limits_disable(bs);
+    bdrv_io_limits_enable(bs, group);
+}
+
 /* This function makes an IO wait if needed
  *
  * @nb_sectors: the number of sectors of the IO
@@ -201,29 +219,7 @@ static void bdrv_io_limits_intercept(BlockDriverState *bs,
                                      unsigned int bytes,
                                      bool is_write)
 {
-    /* does this io must wait */
-    bool must_wait = throttle_schedule_timer(&bs->throttle_state,
-                                             &bs->throttle_timers,
-                                             is_write);
-
-    /* if must wait or any request of this type throttled queue the IO */
-    if (must_wait ||
-        !qemu_co_queue_empty(&bs->throttled_reqs[is_write])) {
-        qemu_co_queue_wait(&bs->throttled_reqs[is_write]);
-    }
-
-    /* the IO will be executed, do the accounting */
-    throttle_account(&bs->throttle_state, is_write, bytes);
-
-
-    /* if the next request must wait -> do nothing */
-    if (throttle_schedule_timer(&bs->throttle_state, &bs->throttle_timers,
-                                is_write)) {
-        return;
-    }
-
-    /* else queue next request for execution */
-    qemu_co_queue_next(&bs->throttled_reqs[is_write]);
+    throttle_group_io_limits_intercept(bs, bytes, is_write);
 }
 
 size_t bdrv_opt_mem_align(BlockDriverState *bs)
@@ -2050,15 +2046,16 @@ static void bdrv_move_feature_fields(BlockDriverState 
*bs_dest,
     bs_dest->enable_write_cache = bs_src->enable_write_cache;
 
     /* i/o throttled req */
-    memcpy(&bs_dest->throttle_state,
-           &bs_src->throttle_state,
-           sizeof(ThrottleState));
+    bs_dest->throttle_state     = bs_src->throttle_state,
+    bs_dest->io_limits_enabled  = bs_src->io_limits_enabled;
+    bs_dest->throttled_reqs[0]  = bs_src->throttled_reqs[0];
+    bs_dest->throttled_reqs[1]  = bs_src->throttled_reqs[1];
+    memcpy(&bs_dest->round_robin,
+           &bs_src->round_robin,
+           sizeof(bs_dest->round_robin));
     memcpy(&bs_dest->throttle_timers,
            &bs_src->throttle_timers,
            sizeof(ThrottleTimers));
-    bs_dest->throttled_reqs[0]  = bs_src->throttled_reqs[0];
-    bs_dest->throttled_reqs[1]  = bs_src->throttled_reqs[1];
-    bs_dest->io_limits_enabled  = bs_src->io_limits_enabled;
 
     /* r/w error */
     bs_dest->on_read_error      = bs_src->on_read_error;
diff --git a/block/qapi.c b/block/qapi.c
index 1808e67..abeeb38 100644
--- a/block/qapi.c
+++ b/block/qapi.c
@@ -24,6 +24,7 @@
 
 #include "block/qapi.h"
 #include "block/block_int.h"
+#include "block/throttle-groups.h"
 #include "block/write-threshold.h"
 #include "qmp-commands.h"
 #include "qapi-visit.h"
@@ -63,7 +64,9 @@ BlockDeviceInfo *bdrv_block_device_info(BlockDriverState *bs)
 
     if (bs->io_limits_enabled) {
         ThrottleConfig cfg;
-        throttle_get_config(&bs->throttle_state, &cfg);
+
+        throttle_group_get_config(bs->throttle_state, &cfg);
+
         info->bps     = cfg.buckets[THROTTLE_BPS_TOTAL].avg;
         info->bps_rd  = cfg.buckets[THROTTLE_BPS_READ].avg;
         info->bps_wr  = cfg.buckets[THROTTLE_BPS_WRITE].avg;
diff --git a/block/throttle-groups.c b/block/throttle-groups.c
index e355fed..9075c46 100644
--- a/block/throttle-groups.c
+++ b/block/throttle-groups.c
@@ -23,6 +23,8 @@
  */
 
 #include "block/throttle-groups.h"
+#include "qemu/queue.h"
+#include "qemu/thread.h"
 
 /* The ThrottleGroup structure (with its ThrottleState) is shared
  * among different BlockDriverState and it's independent from
@@ -145,6 +147,131 @@ static BlockDriverState 
*throttle_group_next_bs(BlockDriverState *bs)
     return next;
 }
 
+/* Return the next BlockDriverState in the round-robin sequence with
+ * pending I/O requests.
+ *
+ * @bs:        the current BlockDriverState
+ * @is_write:  the type of operation (read/write)
+ * @ret:       the next BlockDriverState with pending requests, or bs
+ *             if there is none.
+ */
+static BlockDriverState *next_throttle_token(BlockDriverState *bs,
+                                             bool is_write)
+{
+    ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
+    BlockDriverState *token, *start;
+
+    start = token = tg->tokens[is_write];
+
+    /* get next bs round in round robin style */
+    token = throttle_group_next_bs(token);
+    while (token != start  &&
+           qemu_co_queue_empty(&token->throttled_reqs[is_write])) {
+        token = throttle_group_next_bs(token);
+    }
+
+    /* If no IO are queued for scheduling on the next round robin token
+     * then decide the token is the current bs because chances are
+     * the current bs get the current request queued.
+     */
+    if (token == start &&
+        qemu_co_queue_empty(&token->throttled_reqs[is_write])) {
+        token = bs;
+    }
+
+    return token;
+}
+
+/* Check if the next I/O request for a BlockDriverState needs to be
+ * throttled or not. If there's no timer set in this group, set one
+ * and update the token accordingly.
+ *
+ * @bs:         the current BlockDriverState
+ * @is_write:   the type of operation (read/write)
+ * @ret:        whether the I/O request needs to be throttled or not
+ */
+static bool throttle_group_schedule_timer(BlockDriverState *bs,
+                                          bool is_write)
+{
+    ThrottleState *ts = bs->throttle_state;
+    ThrottleTimers *tt = &bs->throttle_timers;
+    ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
+    bool must_wait;
+    BlockDriverState *iter;
+
+    /* Check if any of the timers in this group is already armed */
+    QLIST_FOREACH(iter, &tg->head, round_robin) {
+        if (timer_pending(iter->throttle_timers.timers[is_write])) {
+            return true;
+        }
+    }
+
+    must_wait = throttle_schedule_timer(ts, tt, is_write);
+
+    /* If a timer just got armed, set bs as the current token */
+    if (must_wait) {
+        tg->tokens[is_write] = bs;
+    }
+
+    return must_wait;
+}
+
+/* Check if an I/O request needs to be throttled, wait and set a timer
+ * if necessary, and schedule the next request using a round robin
+ * algorithm.
+ *
+ * @bs:        the current BlockDriverState
+ * @bytes:     the number of bytes for this I/O
+ * @is_write:  the type of operation (read/write)
+ */
+void throttle_group_io_limits_intercept(BlockDriverState *bs,
+                                        unsigned int bytes,
+                                        bool is_write)
+{
+    bool must_wait;
+    BlockDriverState *token;
+
+    ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
+    qemu_mutex_lock(&tg->lock);
+
+    /* First we check if this I/O has to be throttled. */
+    token = next_throttle_token(bs, is_write);
+    must_wait = throttle_group_schedule_timer(token, is_write);
+
+    /* Wait if there's a timer set or queued requests of this type */
+    if (must_wait || !qemu_co_queue_empty(&bs->throttled_reqs[is_write])) {
+        qemu_mutex_unlock(&tg->lock);
+        qemu_co_queue_wait(&bs->throttled_reqs[is_write]);
+        qemu_mutex_lock(&tg->lock);
+    }
+
+    /* The I/O will be executed, so do the accounting */
+    throttle_account(bs->throttle_state, is_write, bytes);
+
+    /* Now we check if there's any pending request to schedule next */
+    token = next_throttle_token(bs, is_write);
+    if (!qemu_co_queue_empty(&token->throttled_reqs[is_write])) {
+
+        /* If it doesn't have to wait, queue it for immediate execution */
+        must_wait = throttle_group_schedule_timer(token, is_write);
+
+        if (!must_wait) {
+            /* Give preference to requests from the current bs */
+            if (!qemu_co_queue_empty(&bs->throttled_reqs[is_write])) {
+                token = bs;
+                qemu_co_queue_next(&bs->throttled_reqs[is_write]);
+            } else {
+                ThrottleTimers *tt = &token->throttle_timers;
+                int64_t now = qemu_clock_get_ns(tt->clock_type);
+                timer_mod(tt->timers[is_write], now + 1);
+            }
+            tg->tokens[is_write] = token;
+        }
+    }
+
+    qemu_mutex_unlock(&tg->lock);
+}
+
 /* Update the throttle configuration for a particular group. Similar
  * to throttle_config(), but guarantees atomicity within the
  * throttling group.
diff --git a/blockdev.c b/blockdev.c
index b9c1c0c..89fc7a8 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -357,6 +357,7 @@ static BlockBackend *blockdev_init(const char *file, QDict 
*bs_opts,
     const char *id;
     bool has_driver_specific_opts;
     BlockdevDetectZeroesOptions detect_zeroes;
+    const char *throttling_group;
 
     /* Check common options by copying from bs_opts to opts, all other options
      * stay in bs_opts for processing by bdrv_open(). */
@@ -459,6 +460,8 @@ static BlockBackend *blockdev_init(const char *file, QDict 
*bs_opts,
 
     cfg.op_size = qemu_opt_get_number(opts, "throttling.iops-size", 0);
 
+    throttling_group = qemu_opt_get(opts, "throttling.group");
+
     if (!check_throttle_config(&cfg, &error)) {
         error_propagate(errp, error);
         goto early_err;
@@ -547,7 +550,7 @@ static BlockBackend *blockdev_init(const char *file, QDict 
*bs_opts,
 
     /* disk I/O throttling */
     if (throttle_enabled(&cfg)) {
-        bdrv_io_limits_enable(bs);
+        bdrv_io_limits_enable(bs, throttling_group);
         bdrv_set_io_limits(bs, &cfg);
     }
 
@@ -711,6 +714,8 @@ DriveInfo *drive_new(QemuOpts *all_opts, BlockInterfaceType 
block_default_type)
 
         { "iops_size",      "throttling.iops-size" },
 
+        { "group",          "throttling.group" },
+
         { "readonly",       "read-only" },
     };
 
@@ -1877,7 +1882,9 @@ void qmp_block_set_io_throttle(const char *device, 
int64_t bps, int64_t bps_rd,
                                bool has_iops_wr_max,
                                int64_t iops_wr_max,
                                bool has_iops_size,
-                               int64_t iops_size, Error **errp)
+                               int64_t iops_size,
+                               bool has_group,
+                               const char *group, Error **errp)
 {
     ThrottleConfig cfg;
     BlockDriverState *bs;
@@ -1929,9 +1936,11 @@ void qmp_block_set_io_throttle(const char *device, 
int64_t bps, int64_t bps_rd,
     aio_context_acquire(aio_context);
 
     if (!bs->io_limits_enabled && throttle_enabled(&cfg)) {
-        bdrv_io_limits_enable(bs);
+        bdrv_io_limits_enable(bs, has_group ? group : NULL);
     } else if (bs->io_limits_enabled && !throttle_enabled(&cfg)) {
         bdrv_io_limits_disable(bs);
+    } else if (bs->io_limits_enabled && throttle_enabled(&cfg)) {
+        bdrv_io_limits_update_group(bs, has_group ? group : NULL);
     }
 
     if (bs->io_limits_enabled) {
@@ -2991,6 +3000,10 @@ QemuOptsList qemu_common_drive_opts = {
             .type = QEMU_OPT_NUMBER,
             .help = "when limiting by iops max size of an I/O in bytes",
         },{
+            .name = "throttling.group",
+            .type = QEMU_OPT_STRING,
+            .help = "name of the block throttling group",
+        },{
             .name = "copy-on-read",
             .type = QEMU_OPT_BOOL,
             .help = "copy read data from backing file into image file",
diff --git a/hmp.c b/hmp.c
index 71c28bc..b81e4ba 100644
--- a/hmp.c
+++ b/hmp.c
@@ -1261,7 +1261,9 @@ void hmp_block_set_io_throttle(Monitor *mon, const QDict 
*qdict)
                               false,
                               0,
                               false, /* No default I/O size */
-                              0, &err);
+                              0,
+                              false,
+                              NULL, &err);
     hmp_handle_error(mon, &err);
 }
 
diff --git a/include/block/block.h b/include/block/block.h
index 649c269..2bb6222 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -162,8 +162,9 @@ void bdrv_stats_print(Monitor *mon, const QObject *data);
 void bdrv_info_stats(Monitor *mon, QObject **ret_data);
 
 /* disk I/O throttling */
-void bdrv_io_limits_enable(BlockDriverState *bs);
+void bdrv_io_limits_enable(BlockDriverState *bs, const char *group);
 void bdrv_io_limits_disable(BlockDriverState *bs);
+void bdrv_io_limits_update_group(BlockDriverState *bs, const char *group);
 
 void bdrv_init(void);
 void bdrv_init_with_whitelist(void);
diff --git a/include/block/block_int.h b/include/block/block_int.h
index 1314fd3..c02e963 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -358,12 +358,13 @@ struct BlockDriverState {
     /* number of in-flight serialising requests */
     unsigned int serialising_in_flight;
 
-    /* I/O throttling */
-    ThrottleState throttle_state;
-    ThrottleTimers throttle_timers;
-    CoQueue      throttled_reqs[2];
+    /* I/O throttling - following elements protected by ThrottleGroup lock */
+    ThrottleState *throttle_state;
     bool         io_limits_enabled;
+    CoQueue      throttled_reqs[2];
     QLIST_ENTRY(BlockDriverState) round_robin;
+    /* timers have their own locking */
+    ThrottleTimers throttle_timers;
 
     /* I/O stats (display with "info blockstats"). */
     BlockAcctStats stats;
diff --git a/include/block/throttle-groups.h b/include/block/throttle-groups.h
index 23a172f..ddb962b 100644
--- a/include/block/throttle-groups.h
+++ b/include/block/throttle-groups.h
@@ -38,4 +38,8 @@ void throttle_group_get_config(ThrottleState *ts, 
ThrottleConfig *cfg);
 void throttle_group_register_bs(BlockDriverState *bs, const char *groupname);
 void throttle_group_unregister_bs(BlockDriverState *bs);
 
+void throttle_group_io_limits_intercept(BlockDriverState *bs,
+                                        unsigned int bytes,
+                                        bool is_write);
+
 #endif
diff --git a/qapi/block-core.json b/qapi/block-core.json
index a3fdaf0..487b147 100644
--- a/qapi/block-core.json
+++ b/qapi/block-core.json
@@ -989,6 +989,8 @@
 #
 # @iops_size: #optional an I/O size in bytes (Since 1.7)
 #
+# @group: #optional throttle group name (Since 2.3)
+#
 # Returns: Nothing on success
 #          If @device is not a valid block device, DeviceNotFound
 #
@@ -1000,7 +1002,7 @@
             '*bps_max': 'int', '*bps_rd_max': 'int',
             '*bps_wr_max': 'int', '*iops_max': 'int',
             '*iops_rd_max': 'int', '*iops_wr_max': 'int',
-            '*iops_size': 'int' } }
+            '*iops_size': 'int', '*group': 'str' } }
 
 ##
 # @block-stream:
diff --git a/qemu-options.hx b/qemu-options.hx
index 837624d..6db0613 100644
--- a/qemu-options.hx
+++ b/qemu-options.hx
@@ -454,6 +454,7 @@ DEF("drive", HAS_ARG, QEMU_OPTION_drive,
     "       [[,bps_max=bm]|[[,bps_rd_max=rm][,bps_wr_max=wm]]]\n"
     "       [[,iops_max=im]|[[,iops_rd_max=irm][,iops_wr_max=iwm]]]\n"
     "       [[,iops_size=is]]\n"
+    "       [[,group=g]]\n"
     "                use 'file' as a drive image\n", QEMU_ARCH_ALL)
 STEXI
 @item -drive @var{option}[,@var{option}[,@var{option}[,...]]]
diff --git a/qmp-commands.hx b/qmp-commands.hx
index c12334a..d866d57 100644
--- a/qmp-commands.hx
+++ b/qmp-commands.hx
@@ -1704,7 +1704,7 @@ EQMP
 
     {
         .name       = "block_set_io_throttle",
-        .args_type  = 
"device:B,bps:l,bps_rd:l,bps_wr:l,iops:l,iops_rd:l,iops_wr:l,bps_max:l?,bps_rd_max:l?,bps_wr_max:l?,iops_max:l?,iops_rd_max:l?,iops_wr_max:l?,iops_size:l?",
+        .args_type  = 
"device:B,bps:l,bps_rd:l,bps_wr:l,iops:l,iops_rd:l,iops_wr:l,bps_max:l?,bps_rd_max:l?,bps_wr_max:l?,iops_max:l?,iops_rd_max:l?,iops_wr_max:l?,iops_size:l?,group:s?",
         .mhandler.cmd_new = qmp_marshal_input_block_set_io_throttle,
     },
 
@@ -1730,6 +1730,7 @@ Arguments:
 - "iops_rd_max":  read I/O operations max (json-int)
 - "iops_wr_max":  write I/O operations max (json-int)
 - "iops_size":  I/O size in bytes when limiting (json-int)
+- "group": throttle group name (json-string)
 
 Example:
 
-- 
2.1.4




reply via email to

[Prev in Thread] Current Thread [Next in Thread]