qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads


From: Avihai Horon
Subject: Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Date: Thu, 28 Nov 2024 12:26:43 +0200
User-agent: Mozilla Thunderbird


On 17/11/2024 21:20, Maciej S. Szmigiero wrote:
External email: Use caution opening links or attachments


From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>

Some drivers might want to make use of auxiliary helper threads during VM
state loading, for example to make sure that their blocking (sync) I/O
operations don't block the rest of the migration process.

Add a migration core managed thread pool to facilitate this use case.

The migration core will wait for these threads to finish before
(re)starting the VM at destination.

Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
---
  include/migration/misc.h |  3 ++
  include/qemu/typedefs.h  |  1 +
  migration/savevm.c       | 77 ++++++++++++++++++++++++++++++++++++++++
  3 files changed, 81 insertions(+)

diff --git a/include/migration/misc.h b/include/migration/misc.h
index 804eb23c0607..c92ca018ab3b 100644
--- a/include/migration/misc.h
+++ b/include/migration/misc.h
@@ -45,9 +45,12 @@ bool migrate_ram_is_ignored(RAMBlock *block);
  /* migration/block.c */

  AnnounceParameters *migrate_announce_params(void);
+
  /* migration/savevm.c */

  void dump_vmstate_json_to_file(FILE *out_fp);
+void qemu_loadvm_start_load_thread(MigrationLoadThread function,
+                                   void *opaque);

  /* migration/migration.c */
  void migration_object_init(void);
diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
index 3d84efcac47a..8c8ea5c2840d 100644
--- a/include/qemu/typedefs.h
+++ b/include/qemu/typedefs.h
@@ -131,5 +131,6 @@ typedef struct IRQState *qemu_irq;
   * Function types
   */
  typedef void (*qemu_irq_handler)(void *opaque, int n, int level);
+typedef int (*MigrationLoadThread)(bool *abort_flag, void *opaque);

  #endif /* QEMU_TYPEDEFS_H */
diff --git a/migration/savevm.c b/migration/savevm.c
index 1f58a2fa54ae..6ea9054c4083 100644
--- a/migration/savevm.c
+++ b/migration/savevm.c
@@ -54,6 +54,7 @@
  #include "qemu/job.h"
  #include "qemu/main-loop.h"
  #include "block/snapshot.h"
+#include "block/thread-pool.h"
  #include "qemu/cutils.h"
  #include "io/channel-buffer.h"
  #include "io/channel-file.h"
@@ -71,6 +72,10 @@

  const unsigned int postcopy_ram_discard_version;

+static ThreadPool *load_threads;
+static int load_threads_ret;
+static bool load_threads_abort;
+
  /* Subcommands for QEMU_VM_COMMAND */
  enum qemu_vm_cmd {
      MIG_CMD_INVALID = 0,   /* Must be 0 */
@@ -2788,6 +2793,12 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error 
**errp)
      int ret;

      trace_loadvm_state_setup();
+
+    assert(!load_threads);
+    load_threads = thread_pool_new();
+    load_threads_ret = 0;
+    load_threads_abort = false;
+
      QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
          if (!se->ops || !se->ops->load_setup) {
              continue;
@@ -2806,19 +2817,72 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error 
**errp)
              return ret;
          }
      }
+
+    return 0;
+}
+
+struct LoadThreadData {
+    MigrationLoadThread function;
+    void *opaque;
+};
+
+static int qemu_loadvm_load_thread(void *thread_opaque)
+{
+    struct LoadThreadData *data = thread_opaque;
+    int ret;
+
+    ret = data->function(&load_threads_abort, data->opaque);
+    if (ret && !qatomic_read(&load_threads_ret)) {
+        /*
+         * Racy with the above read but that's okay - which thread error
+         * return we report is purely arbitrary anyway.
+         */
+        qatomic_set(&load_threads_ret, ret);
+    }

Can we use cmpxchg instead? E.g.:

if (ret) {
    qatomic_cmpxchg(&load_threads_ret, 0, ret);
}

+
      return 0;
  }

+void qemu_loadvm_start_load_thread(MigrationLoadThread function,
+                                   void *opaque)
+{
+    struct LoadThreadData *data;
+
+    /* We only set it from this thread so it's okay to read it directly */
+    assert(!load_threads_abort);
+
+    data = g_new(struct LoadThreadData, 1);
+    data->function = function;
+    data->opaque = opaque;
+
+    thread_pool_submit(load_threads, qemu_loadvm_load_thread,
+                       data, g_free);
+    thread_pool_adjust_max_threads_to_work(load_threads);
+}
+
  void qemu_loadvm_state_cleanup(void)
  {
      SaveStateEntry *se;

      trace_loadvm_state_cleanup();
+
      QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
          if (se->ops && se->ops->load_cleanup) {
              se->ops->load_cleanup(se->opaque);
          }
      }
+
+    /*
+     * We might be called even without earlier qemu_loadvm_state_setup()
+     * call if qemu_loadvm_state() fails very early.
+     */
+    if (load_threads) {
+        qatomic_set(&load_threads_abort, true);
+        bql_unlock(); /* Load threads might be waiting for BQL */
+        thread_pool_wait(load_threads);
+        bql_lock();
+        g_clear_pointer(&load_threads, thread_pool_free);

Since thread_pool_free() also waits for pending jobs before returning, can we drop the explicit thread_pool_wait()? E.g.:

qatomic_set(&load_threads_abort, true);
bql_unlock(); /* Load threads might be waiting for BQL */
g_clear_pointer(&load_threads, thread_pool_free);
bql_lock();

Thanks.

+    }
  }

  /* Return true if we should continue the migration, or false. */
@@ -3007,6 +3071,19 @@ int qemu_loadvm_state(QEMUFile *f)
          return ret;
      }

+    if (ret == 0) {
+        bql_unlock(); /* Let load threads do work requiring BQL */
+        thread_pool_wait(load_threads);
+        bql_lock();
+
+        ret = load_threads_ret;
+    }
+    /*
+     * Set this flag unconditionally so we'll catch further attempts to
+     * start additional threads via an appropriate assert()
+     */
+    qatomic_set(&load_threads_abort, true);
+
      if (ret == 0) {
          ret = qemu_file_get_error(f);
      }



reply via email to

[Prev in Thread] Current Thread [Next in Thread]