qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH v2 05/10] block: add block job transactions


From: Stefan Hajnoczi
Subject: [Qemu-devel] [PATCH v2 05/10] block: add block job transactions
Date: Mon, 6 Jul 2015 15:24:24 +0100

Sometimes block jobs must execute as a transaction group.  Finishing
jobs wait until all other jobs are ready to complete successfully.
Failure or cancellation of one job cancels the other jobs in the group.

Signed-off-by: Stefan Hajnoczi <address@hidden>
---
v2:
 * Set txn pointer to NULL in block_job_txn_begin() [jsnow]
 * Rename block_job_txn_prepare_to_complete to block_job_txn_job_done [jsnow]
 * Rename block_job_txn_complete to block_job_txn_kick [jsnow]
 * Add BLOCK_JOB_TXN_CANCEL_PENDING to solve race condition on cancel [jsnow]
 * Document when txn may be NULL
---
 blockjob.c               | 193 +++++++++++++++++++++++++++++++++++++++++++++++
 include/block/block.h    |   1 +
 include/block/blockjob.h |  52 +++++++++++++
 trace-events             |   4 +
 4 files changed, 250 insertions(+)

diff --git a/blockjob.c b/blockjob.c
index ec46fad..d1f0206 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -400,3 +400,196 @@ void block_job_defer_to_main_loop(BlockJob *job,
 
     qemu_bh_schedule(data->bh);
 }
+
+typedef enum {
+    BLOCK_JOB_TXN_OK,               /* no job failed yet */
+    BLOCK_JOB_TXN_CANCEL_PENDING,   /* kick scheduled to cancel jobs */
+    BLOCK_JOB_TXN_CANCEL_DONE,      /* cancelled jobs can terminate now */
+} BlockJobTxnState;
+
+/* Transactional group of block jobs */
+struct BlockJobTxn {
+    /* Jobs may be in different AioContexts so protect all fields */
+    QemuMutex lock;
+
+    /* Reference count for txn object */
+    unsigned int ref;
+
+    /* Is this txn ok or are jobs being cancelled? */
+    BlockJobTxnState state;
+
+    /* Number of jobs still running */
+    unsigned int jobs_pending;
+
+    /* List of jobs */
+    QLIST_HEAD(, BlockJob) jobs;
+};
+
+BlockJobTxn *block_job_txn_new(void)
+{
+    BlockJobTxn *txn = g_new(BlockJobTxn, 1);
+    qemu_mutex_init(&txn->lock);
+    txn->ref = 1; /* dropped by block_job_txn_begin() */
+    txn->state = BLOCK_JOB_TXN_OK;
+    txn->jobs_pending = 0;
+    QLIST_INIT(&txn->jobs);
+    return txn;
+}
+
+static void block_job_txn_unref(BlockJobTxn *txn)
+{
+    qemu_mutex_lock(&txn->lock);
+
+    if (--txn->ref > 0) {
+        qemu_mutex_unlock(&txn->lock);
+        return;
+    }
+
+    qemu_mutex_unlock(&txn->lock);
+    qemu_mutex_destroy(&txn->lock);
+    g_free(txn);
+}
+
+/* The purpose of this is to keep txn alive until all jobs have been added */
+void block_job_txn_begin(BlockJobTxn **txn)
+{
+    block_job_txn_unref(*txn);
+    *txn = NULL;
+}
+
+void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
+{
+    if (!txn) {
+        return;
+    }
+
+    assert(!job->txn);
+    job->txn = txn;
+
+    qemu_mutex_lock(&txn->lock);
+    txn->ref++;
+    txn->jobs_pending++;
+    QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
+    qemu_mutex_unlock(&txn->lock);
+}
+
+/* Cancel all other jobs in case of abort, wake all waiting jobs in case of
+ * successful completion.  Runs from main loop.
+ */
+static void block_job_txn_kick(BlockJob *job, void *opaque)
+{
+    BlockJobTxn *txn = opaque;
+    BlockJob *other_job;
+    GSList *ctxs = NULL;
+    GSList *ctxs_iter;
+    bool cancel = false;
+
+    qemu_mutex_lock(&txn->lock);
+    txn->ref++; /* keep txn alive until the end of this loop */
+
+    /* Acquire AioContexts so jobs cannot race with us */
+    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+        AioContext *ctx;
+
+        qemu_mutex_unlock(&txn->lock);
+        ctx = bdrv_get_aio_context(other_job->bs);
+        aio_context_acquire(ctx);
+        ctxs = g_slist_prepend(ctxs, ctx);
+        qemu_mutex_lock(&txn->lock);
+    }
+
+    /* From here on block_job_txn_job_done() callers should not wait */
+    if (txn->state == BLOCK_JOB_TXN_CANCEL_PENDING) {
+        txn->state = BLOCK_JOB_TXN_CANCEL_DONE;
+        cancel = true;
+    }
+
+    /* Kick jobs */
+    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+        qemu_mutex_unlock(&txn->lock);
+
+        /* Don't cancel our own failed job since cancellation throws away the
+         * error value.
+         */
+        if (cancel && other_job != job) {
+            block_job_cancel(other_job);
+        } else {
+            block_job_enter(other_job);
+        }
+
+        qemu_mutex_lock(&txn->lock);
+    }
+
+    qemu_mutex_unlock(&txn->lock);
+    block_job_txn_unref(txn);
+
+    /* Release AioContexts */
+    for (ctxs_iter = ctxs; ctxs_iter; ctxs_iter = ctxs_iter->next) {
+        aio_context_release(ctxs_iter->data);
+    }
+    g_slist_free(ctxs);
+}
+
+void coroutine_fn block_job_txn_job_done(BlockJobTxn *txn,
+                                         BlockJob *job,
+                                         int ret)
+{
+    if (!txn) {
+        return;
+    }
+
+    qemu_mutex_lock(&txn->lock);
+
+    /* This function is entered in 4 cases:
+     *
+     * 1. Successful job completion - wait for other jobs
+     * 2. First failed/cancelled job in txn - cancel other jobs and wait
+     * 3. Subsequent cancelled jobs - finish immediately, don't wait
+     * 4. While kick is pending - wait for kick to cancel or wake us
+     */
+    trace_block_job_txn_job_done_entry(txn, job, ret,
+                                       block_job_is_cancelled(job),
+                                       txn->state,
+                                       txn->jobs_pending);
+
+    if (txn->state == BLOCK_JOB_TXN_CANCEL_DONE) { /* Case 3 */
+        assert(block_job_is_cancelled(job));
+        goto out; /* already cancelled, don't yield */
+    }
+
+    if (txn->state == BLOCK_JOB_TXN_OK) {
+        if (ret != 0 || block_job_is_cancelled(job)) { /* Case 2 */
+            txn->state = BLOCK_JOB_TXN_CANCEL_PENDING;
+            block_job_defer_to_main_loop(job, block_job_txn_kick, txn);
+        } else { /* Case 1 */
+            if (--txn->jobs_pending == 0) {
+                block_job_defer_to_main_loop(job, block_job_txn_kick, txn);
+            }
+        }
+    }
+
+    /* Wait for block_job_txn_kick() in cases 1, 2, and 4 */
+    do {
+        qemu_mutex_unlock(&txn->lock);
+        job->busy = false;
+        qemu_coroutine_yield();
+        job->busy = true;
+        qemu_mutex_lock(&txn->lock);
+
+        /* Did the user just cancel this job? */
+        if (block_job_is_cancelled(job) && txn->state == BLOCK_JOB_TXN_OK) {
+            txn->state = BLOCK_JOB_TXN_CANCEL_PENDING;
+            block_job_defer_to_main_loop(job, block_job_txn_kick, txn);
+        }
+    } while (txn->state != BLOCK_JOB_TXN_CANCEL_DONE &&
+             txn->jobs_pending > 0);
+
+out:
+    trace_block_job_txn_job_done_return(txn, job, ret,
+                                        block_job_is_cancelled(job),
+                                        txn->state,
+                                        txn->jobs_pending);
+
+    qemu_mutex_unlock(&txn->lock);
+    block_job_txn_unref(txn);
+}
diff --git a/include/block/block.h b/include/block/block.h
index 7437590..c7fc5b6 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -13,6 +13,7 @@
 typedef struct BlockDriver BlockDriver;
 typedef struct BlockJob BlockJob;
 typedef struct BdrvChildRole BdrvChildRole;
+typedef struct BlockJobTxn BlockJobTxn;
 
 typedef struct BlockDriverInfo {
     /* in bytes, 0 if irrelevant */
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 57d8ef1..7d6ffb7 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -122,6 +122,10 @@ struct BlockJob {
 
     /** The opaque value that is passed to the completion function.  */
     void *opaque;
+
+    /** Non-NULL if this job is part of a transaction */
+    BlockJobTxn *txn;
+    QLIST_ENTRY(BlockJob) txn_list;
 };
 
 /**
@@ -348,4 +352,52 @@ void block_job_defer_to_main_loop(BlockJob *job,
                                   BlockJobDeferToMainLoopFn *fn,
                                   void *opaque);
 
+/**
+ * block_job_txn_new:
+ *
+ * Allocate and return a new block job transaction.  Jobs can be added to the
+ * transaction using block_job_txn_add_job().  block_job_txn_begin() must be
+ * called when all jobs (if any) have been added.
+ *
+ * All jobs in the transaction either complete successfully or fail/cancel as a
+ * group.  Jobs wait for each other before completing.  Cancelling one job
+ * cancels all jobs in the transaction.
+ */
+BlockJobTxn *block_job_txn_new(void);
+
+/**
+ * block_job_txn_add_job:
+ * @txn: The transaction (may be NULL)
+ * @job: Job to add to the transaction
+ *
+ * Add @job to the transaction.  The @job must not already be in a transaction.
+ * The block job driver must call block_job_txn_prepare_to_complete() before
+ * final cleanup and completion.
+ */
+void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job);
+
+/**
+ * block_job_txn_begin:
+ * @txn: The transaction pointer
+ *
+ * Call this to mark the end of adding jobs to the transaction.  This must be
+ * called even if no jobs were added.
+ *
+ * The caller may not add jobs after the transaction begins so the @txn pointer
+ * is set to NULL to show that the caller has released ownership.
+ */
+void block_job_txn_begin(BlockJobTxn **txn);
+
+/**
+ * block_job_txn_job_done:
+ * @txn: The transaction (may be NULL)
+ * @job: The block job
+ * @ret: Block job return value (0 for success, otherwise job failure)
+ *
+ * Wait for other jobs in the transaction to complete.  If @ret is non-zero or
+ * @job is cancelled, all other jobs in the transaction will be cancelled.
+ */
+void coroutine_fn block_job_txn_job_done(BlockJobTxn *txn,
+                                         BlockJob *job, int ret);
+
 #endif
diff --git a/trace-events b/trace-events
index 52b7efa..5877289 100644
--- a/trace-events
+++ b/trace-events
@@ -123,6 +123,10 @@ virtio_blk_data_plane_start(void *s) "dataplane %p"
 virtio_blk_data_plane_stop(void *s) "dataplane %p"
 virtio_blk_data_plane_process_request(void *s, unsigned int out_num, unsigned 
int in_num, unsigned int head) "dataplane %p out_num %u in_num %u head %u"
 
+# blockjob.c
+block_job_txn_job_done_entry(void *txn, void *job, int ret, bool cancelled, 
int status, unsigned int jobs_pending) "txn %p job %p ret %d cancelled %d 
status %d jobs_pending %u"
+block_job_txn_job_done_return(void *txn, void *job, int ret, bool cancelled, 
int status, unsigned int jobs_pending) "txn %p job %p ret %d cancelled %d 
status %d jobs_pending %u"
+
 # hw/virtio/dataplane/vring.c
 vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring 
physical %#"PRIx64" desc %p avail %p used %p"
 
-- 
2.4.3




reply via email to

[Prev in Thread] Current Thread [Next in Thread]