qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PULL for-2.8 04/13] blockjob: add block_job_start


From: Jeff Cody
Subject: [Qemu-devel] [PULL for-2.8 04/13] blockjob: add block_job_start
Date: Mon, 14 Nov 2016 23:14:42 -0500

From: John Snow <address@hidden>

Instead of automatically starting jobs at creation time via backup_start
et al, we'd like to return a job object pointer that can be started
manually at later point in time.

For now, add the block_job_start mechanism and start the jobs
automatically as we have been doing, with conversions job-by-job coming
in later patches.

Of note: cancellation of unstarted jobs will perform all the normal
cleanup as if the job had started, particularly abort and clean. The
only difference is that we will not emit any events, because the job
never actually started.

Signed-off-by: John Snow <address@hidden>
Message-id: address@hidden
Signed-off-by: Jeff Cody <address@hidden>
---
 block/backup.c            |  3 +--
 block/commit.c            |  5 ++---
 block/mirror.c            |  5 ++---
 block/stream.c            |  5 ++---
 block/trace-events        |  6 +++---
 blockjob.c                | 54 ++++++++++++++++++++++++++++++++++++-----------
 include/block/blockjob.h  |  9 ++++++++
 tests/test-blockjob-txn.c | 12 +++++------
 8 files changed, 67 insertions(+), 32 deletions(-)

diff --git a/block/backup.c b/block/backup.c
index 4ed4494..ae1b99a 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -654,9 +654,8 @@ void backup_start(const char *job_id, BlockDriverState *bs,
 
     block_job_add_bdrv(&job->common, target);
     job->common.len = len;
-    job->common.co = qemu_coroutine_create(job->common.driver->start, job);
     block_job_txn_add_job(txn, &job->common);
-    qemu_coroutine_enter(job->common.co);
+    block_job_start(&job->common);
     return;
 
  error:
diff --git a/block/commit.c b/block/commit.c
index 20d27e2..c284e85 100644
--- a/block/commit.c
+++ b/block/commit.c
@@ -289,10 +289,9 @@ void commit_start(const char *job_id, BlockDriverState *bs,
     s->backing_file_str = g_strdup(backing_file_str);
 
     s->on_error = on_error;
-    s->common.co = qemu_coroutine_create(s->common.driver->start, s);
 
-    trace_commit_start(bs, base, top, s, s->common.co);
-    qemu_coroutine_enter(s->common.co);
+    trace_commit_start(bs, base, top, s);
+    block_job_start(&s->common);
 }
 
 
diff --git a/block/mirror.c b/block/mirror.c
index 659e09c..62ac87f 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -1009,9 +1009,8 @@ static void mirror_start_job(const char *job_id, 
BlockDriverState *bs,
         }
     }
 
-    s->common.co = qemu_coroutine_create(s->common.driver->start, s);
-    trace_mirror_start(bs, s, s->common.co, opaque);
-    qemu_coroutine_enter(s->common.co);
+    trace_mirror_start(bs, s, opaque);
+    block_job_start(&s->common);
 }
 
 void mirror_start(const char *job_id, BlockDriverState *bs,
diff --git a/block/stream.c b/block/stream.c
index 92309ff..1523ba7 100644
--- a/block/stream.c
+++ b/block/stream.c
@@ -255,7 +255,6 @@ void stream_start(const char *job_id, BlockDriverState *bs,
     s->bs_flags = orig_bs_flags;
 
     s->on_error = on_error;
-    s->common.co = qemu_coroutine_create(s->common.driver->start, s);
-    trace_stream_start(bs, base, s, s->common.co);
-    qemu_coroutine_enter(s->common.co);
+    trace_stream_start(bs, base, s);
+    block_job_start(&s->common);
 }
diff --git a/block/trace-events b/block/trace-events
index 882c903..cfc05f2 100644
--- a/block/trace-events
+++ b/block/trace-events
@@ -19,14 +19,14 @@ bdrv_co_do_copy_on_readv(void *bs, int64_t offset, unsigned 
int bytes, int64_t c
 
 # block/stream.c
 stream_one_iteration(void *s, int64_t sector_num, int nb_sectors, int 
is_allocated) "s %p sector_num %"PRId64" nb_sectors %d is_allocated %d"
-stream_start(void *bs, void *base, void *s, void *co) "bs %p base %p s %p co 
%p"
+stream_start(void *bs, void *base, void *s) "bs %p base %p s %p"
 
 # block/commit.c
 commit_one_iteration(void *s, int64_t sector_num, int nb_sectors, int 
is_allocated) "s %p sector_num %"PRId64" nb_sectors %d is_allocated %d"
-commit_start(void *bs, void *base, void *top, void *s, void *co) "bs %p base 
%p top %p s %p co %p"
+commit_start(void *bs, void *base, void *top, void *s) "bs %p base %p top %p s 
%p"
 
 # block/mirror.c
-mirror_start(void *bs, void *s, void *co, void *opaque) "bs %p s %p co %p 
opaque %p"
+mirror_start(void *bs, void *s, void *opaque) "bs %p s %p opaque %p"
 mirror_restart_iter(void *s, int64_t cnt) "s %p dirty count %"PRId64
 mirror_before_flush(void *s) "s %p"
 mirror_before_drain(void *s, int64_t cnt) "s %p dirty count %"PRId64
diff --git a/blockjob.c b/blockjob.c
index e3c458c..513620c 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -174,7 +174,9 @@ void *block_job_create(const char *job_id, const 
BlockJobDriver *driver,
     job->blk           = blk;
     job->cb            = cb;
     job->opaque        = opaque;
-    job->busy          = true;
+    job->busy          = false;
+    job->paused        = true;
+    job->pause_count   = 1;
     job->refcnt        = 1;
     bs->job = job;
 
@@ -202,6 +204,23 @@ bool block_job_is_internal(BlockJob *job)
     return (job->id == NULL);
 }
 
+static bool block_job_started(BlockJob *job)
+{
+    return job->co;
+}
+
+void block_job_start(BlockJob *job)
+{
+    assert(job && !block_job_started(job) && job->paused &&
+           !job->busy && job->driver->start);
+    job->co = qemu_coroutine_create(job->driver->start, job);
+    if (--job->pause_count == 0) {
+        job->paused = false;
+        job->busy = true;
+        qemu_coroutine_enter(job->co);
+    }
+}
+
 void block_job_ref(BlockJob *job)
 {
     ++job->refcnt;
@@ -248,14 +267,18 @@ static void block_job_completed_single(BlockJob *job)
     if (job->cb) {
         job->cb(job->opaque, job->ret);
     }
-    if (block_job_is_cancelled(job)) {
-        block_job_event_cancelled(job);
-    } else {
-        const char *msg = NULL;
-        if (job->ret < 0) {
-            msg = strerror(-job->ret);
+
+    /* Emit events only if we actually started */
+    if (block_job_started(job)) {
+        if (block_job_is_cancelled(job)) {
+            block_job_event_cancelled(job);
+        } else {
+            const char *msg = NULL;
+            if (job->ret < 0) {
+                msg = strerror(-job->ret);
+            }
+            block_job_event_completed(job, msg);
         }
-        block_job_event_completed(job, msg);
     }
 
     if (job->txn) {
@@ -363,7 +386,8 @@ void block_job_complete(BlockJob *job, Error **errp)
 {
     /* Should not be reachable via external interface for internal jobs */
     assert(job->id);
-    if (job->pause_count || job->cancelled || !job->driver->complete) {
+    if (job->pause_count || job->cancelled ||
+        !block_job_started(job) || !job->driver->complete) {
         error_setg(errp, "The active block job '%s' cannot be completed",
                    job->id);
         return;
@@ -395,6 +419,8 @@ bool block_job_user_paused(BlockJob *job)
 
 void coroutine_fn block_job_pause_point(BlockJob *job)
 {
+    assert(job && block_job_started(job));
+
     if (!block_job_should_pause(job)) {
         return;
     }
@@ -446,9 +472,13 @@ void block_job_enter(BlockJob *job)
 
 void block_job_cancel(BlockJob *job)
 {
-    job->cancelled = true;
-    block_job_iostatus_reset(job);
-    block_job_enter(job);
+    if (block_job_started(job)) {
+        job->cancelled = true;
+        block_job_iostatus_reset(job);
+        block_job_enter(job);
+    } else {
+        block_job_completed(job, -ECANCELED);
+    }
 }
 
 bool block_job_is_cancelled(BlockJob *job)
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 356cacf..1acb256 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -189,6 +189,15 @@ void block_job_add_bdrv(BlockJob *job, BlockDriverState 
*bs);
 void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp);
 
 /**
+ * block_job_start:
+ * @job: A job that has not yet been started.
+ *
+ * Begins execution of a block job.
+ * Takes ownership of one reference to the job object.
+ */
+void block_job_start(BlockJob *job);
+
+/**
  * block_job_cancel:
  * @job: The job to be canceled.
  *
diff --git a/tests/test-blockjob-txn.c b/tests/test-blockjob-txn.c
index f9afc3b..b132e39 100644
--- a/tests/test-blockjob-txn.c
+++ b/tests/test-blockjob-txn.c
@@ -24,10 +24,6 @@ typedef struct {
     int *result;
 } TestBlockJob;
 
-static const BlockJobDriver test_block_job_driver = {
-    .instance_size = sizeof(TestBlockJob),
-};
-
 static void test_block_job_complete(BlockJob *job, void *opaque)
 {
     BlockDriverState *bs = blk_bs(job->blk);
@@ -77,6 +73,11 @@ static void test_block_job_cb(void *opaque, int ret)
     g_free(data);
 }
 
+static const BlockJobDriver test_block_job_driver = {
+    .instance_size = sizeof(TestBlockJob),
+    .start = test_block_job_run,
+};
+
 /* Create a block job that completes with a given return code after a given
  * number of event loop iterations.  The return code is stored in the given
  * result pointer.
@@ -104,10 +105,9 @@ static BlockJob *test_block_job_start(unsigned int 
iterations,
     s->use_timer = use_timer;
     s->rc = rc;
     s->result = result;
-    s->common.co = qemu_coroutine_create(test_block_job_run, s);
     data->job = s;
     data->result = result;
-    qemu_coroutine_enter(s->common.co);
+    block_job_start(&s->common);
     return &s->common;
 }
 
-- 
2.7.4




reply via email to

[Prev in Thread] Current Thread [Next in Thread]