[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [PATCH v2 05/10] block: add block job transactions
From: |
Stefan Hajnoczi |
Subject: |
[Qemu-devel] [PATCH v2 05/10] block: add block job transactions |
Date: |
Mon, 6 Jul 2015 15:24:24 +0100 |
Sometimes block jobs must execute as a transaction group. Finishing
jobs wait until all other jobs are ready to complete successfully.
Failure or cancellation of one job cancels the other jobs in the group.
Signed-off-by: Stefan Hajnoczi <address@hidden>
---
v2:
* Set txn pointer to NULL in block_job_txn_begin() [jsnow]
* Rename block_job_txn_prepare_to_complete to block_job_txn_job_done [jsnow]
* Rename block_job_txn_complete to block_job_txn_kick [jsnow]
* Add BLOCK_JOB_TXN_CANCEL_PENDING to solve race condition on cancel [jsnow]
* Document when txn may be NULL
---
blockjob.c | 193 +++++++++++++++++++++++++++++++++++++++++++++++
include/block/block.h | 1 +
include/block/blockjob.h | 52 +++++++++++++
trace-events | 4 +
4 files changed, 250 insertions(+)
diff --git a/blockjob.c b/blockjob.c
index ec46fad..d1f0206 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -400,3 +400,196 @@ void block_job_defer_to_main_loop(BlockJob *job,
qemu_bh_schedule(data->bh);
}
+
+typedef enum {
+ BLOCK_JOB_TXN_OK, /* no job failed yet */
+ BLOCK_JOB_TXN_CANCEL_PENDING, /* kick scheduled to cancel jobs */
+ BLOCK_JOB_TXN_CANCEL_DONE, /* cancelled jobs can terminate now */
+} BlockJobTxnState;
+
+/* Transactional group of block jobs */
+struct BlockJobTxn {
+ /* Jobs may be in different AioContexts so protect all fields */
+ QemuMutex lock;
+
+ /* Reference count for txn object */
+ unsigned int ref;
+
+ /* Is this txn ok or are jobs being cancelled? */
+ BlockJobTxnState state;
+
+ /* Number of jobs still running */
+ unsigned int jobs_pending;
+
+ /* List of jobs */
+ QLIST_HEAD(, BlockJob) jobs;
+};
+
+BlockJobTxn *block_job_txn_new(void)
+{
+ BlockJobTxn *txn = g_new(BlockJobTxn, 1);
+ qemu_mutex_init(&txn->lock);
+ txn->ref = 1; /* dropped by block_job_txn_begin() */
+ txn->state = BLOCK_JOB_TXN_OK;
+ txn->jobs_pending = 0;
+ QLIST_INIT(&txn->jobs);
+ return txn;
+}
+
+static void block_job_txn_unref(BlockJobTxn *txn)
+{
+ qemu_mutex_lock(&txn->lock);
+
+ if (--txn->ref > 0) {
+ qemu_mutex_unlock(&txn->lock);
+ return;
+ }
+
+ qemu_mutex_unlock(&txn->lock);
+ qemu_mutex_destroy(&txn->lock);
+ g_free(txn);
+}
+
+/* The purpose of this is to keep txn alive until all jobs have been added */
+void block_job_txn_begin(BlockJobTxn **txn)
+{
+ block_job_txn_unref(*txn);
+ *txn = NULL;
+}
+
+void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
+{
+ if (!txn) {
+ return;
+ }
+
+ assert(!job->txn);
+ job->txn = txn;
+
+ qemu_mutex_lock(&txn->lock);
+ txn->ref++;
+ txn->jobs_pending++;
+ QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
+ qemu_mutex_unlock(&txn->lock);
+}
+
+/* Cancel all other jobs in case of abort, wake all waiting jobs in case of
+ * successful completion. Runs from main loop.
+ */
+static void block_job_txn_kick(BlockJob *job, void *opaque)
+{
+ BlockJobTxn *txn = opaque;
+ BlockJob *other_job;
+ GSList *ctxs = NULL;
+ GSList *ctxs_iter;
+ bool cancel = false;
+
+ qemu_mutex_lock(&txn->lock);
+ txn->ref++; /* keep txn alive until the end of this loop */
+
+ /* Acquire AioContexts so jobs cannot race with us */
+ QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+ AioContext *ctx;
+
+ qemu_mutex_unlock(&txn->lock);
+ ctx = bdrv_get_aio_context(other_job->bs);
+ aio_context_acquire(ctx);
+ ctxs = g_slist_prepend(ctxs, ctx);
+ qemu_mutex_lock(&txn->lock);
+ }
+
+ /* From here on block_job_txn_job_done() callers should not wait */
+ if (txn->state == BLOCK_JOB_TXN_CANCEL_PENDING) {
+ txn->state = BLOCK_JOB_TXN_CANCEL_DONE;
+ cancel = true;
+ }
+
+ /* Kick jobs */
+ QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+ qemu_mutex_unlock(&txn->lock);
+
+ /* Don't cancel our own failed job since cancellation throws away the
+ * error value.
+ */
+ if (cancel && other_job != job) {
+ block_job_cancel(other_job);
+ } else {
+ block_job_enter(other_job);
+ }
+
+ qemu_mutex_lock(&txn->lock);
+ }
+
+ qemu_mutex_unlock(&txn->lock);
+ block_job_txn_unref(txn);
+
+ /* Release AioContexts */
+ for (ctxs_iter = ctxs; ctxs_iter; ctxs_iter = ctxs_iter->next) {
+ aio_context_release(ctxs_iter->data);
+ }
+ g_slist_free(ctxs);
+}
+
+void coroutine_fn block_job_txn_job_done(BlockJobTxn *txn,
+ BlockJob *job,
+ int ret)
+{
+ if (!txn) {
+ return;
+ }
+
+ qemu_mutex_lock(&txn->lock);
+
+ /* This function is entered in 4 cases:
+ *
+ * 1. Successful job completion - wait for other jobs
+ * 2. First failed/cancelled job in txn - cancel other jobs and wait
+ * 3. Subsequent cancelled jobs - finish immediately, don't wait
+ * 4. While kick is pending - wait for kick to cancel or wake us
+ */
+ trace_block_job_txn_job_done_entry(txn, job, ret,
+ block_job_is_cancelled(job),
+ txn->state,
+ txn->jobs_pending);
+
+ if (txn->state == BLOCK_JOB_TXN_CANCEL_DONE) { /* Case 3 */
+ assert(block_job_is_cancelled(job));
+ goto out; /* already cancelled, don't yield */
+ }
+
+ if (txn->state == BLOCK_JOB_TXN_OK) {
+ if (ret != 0 || block_job_is_cancelled(job)) { /* Case 2 */
+ txn->state = BLOCK_JOB_TXN_CANCEL_PENDING;
+ block_job_defer_to_main_loop(job, block_job_txn_kick, txn);
+ } else { /* Case 1 */
+ if (--txn->jobs_pending == 0) {
+ block_job_defer_to_main_loop(job, block_job_txn_kick, txn);
+ }
+ }
+ }
+
+ /* Wait for block_job_txn_kick() in cases 1, 2, and 4 */
+ do {
+ qemu_mutex_unlock(&txn->lock);
+ job->busy = false;
+ qemu_coroutine_yield();
+ job->busy = true;
+ qemu_mutex_lock(&txn->lock);
+
+ /* Did the user just cancel this job? */
+ if (block_job_is_cancelled(job) && txn->state == BLOCK_JOB_TXN_OK) {
+ txn->state = BLOCK_JOB_TXN_CANCEL_PENDING;
+ block_job_defer_to_main_loop(job, block_job_txn_kick, txn);
+ }
+ } while (txn->state != BLOCK_JOB_TXN_CANCEL_DONE &&
+ txn->jobs_pending > 0);
+
+out:
+ trace_block_job_txn_job_done_return(txn, job, ret,
+ block_job_is_cancelled(job),
+ txn->state,
+ txn->jobs_pending);
+
+ qemu_mutex_unlock(&txn->lock);
+ block_job_txn_unref(txn);
+}
diff --git a/include/block/block.h b/include/block/block.h
index 7437590..c7fc5b6 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -13,6 +13,7 @@
typedef struct BlockDriver BlockDriver;
typedef struct BlockJob BlockJob;
typedef struct BdrvChildRole BdrvChildRole;
+typedef struct BlockJobTxn BlockJobTxn;
typedef struct BlockDriverInfo {
/* in bytes, 0 if irrelevant */
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 57d8ef1..7d6ffb7 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -122,6 +122,10 @@ struct BlockJob {
/** The opaque value that is passed to the completion function. */
void *opaque;
+
+ /** Non-NULL if this job is part of a transaction */
+ BlockJobTxn *txn;
+ QLIST_ENTRY(BlockJob) txn_list;
};
/**
@@ -348,4 +352,52 @@ void block_job_defer_to_main_loop(BlockJob *job,
BlockJobDeferToMainLoopFn *fn,
void *opaque);
+/**
+ * block_job_txn_new:
+ *
+ * Allocate and return a new block job transaction. Jobs can be added to the
+ * transaction using block_job_txn_add_job(). block_job_txn_begin() must be
+ * called when all jobs (if any) have been added.
+ *
+ * All jobs in the transaction either complete successfully or fail/cancel as a
+ * group. Jobs wait for each other before completing. Cancelling one job
+ * cancels all jobs in the transaction.
+ */
+BlockJobTxn *block_job_txn_new(void);
+
+/**
+ * block_job_txn_add_job:
+ * @txn: The transaction (may be NULL)
+ * @job: Job to add to the transaction
+ *
+ * Add @job to the transaction. The @job must not already be in a transaction.
+ * The block job driver must call block_job_txn_prepare_to_complete() before
+ * final cleanup and completion.
+ */
+void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job);
+
+/**
+ * block_job_txn_begin:
+ * @txn: The transaction pointer
+ *
+ * Call this to mark the end of adding jobs to the transaction. This must be
+ * called even if no jobs were added.
+ *
+ * The caller may not add jobs after the transaction begins so the @txn pointer
+ * is set to NULL to show that the caller has released ownership.
+ */
+void block_job_txn_begin(BlockJobTxn **txn);
+
+/**
+ * block_job_txn_job_done:
+ * @txn: The transaction (may be NULL)
+ * @job: The block job
+ * @ret: Block job return value (0 for success, otherwise job failure)
+ *
+ * Wait for other jobs in the transaction to complete. If @ret is non-zero or
+ * @job is cancelled, all other jobs in the transaction will be cancelled.
+ */
+void coroutine_fn block_job_txn_job_done(BlockJobTxn *txn,
+ BlockJob *job, int ret);
+
#endif
diff --git a/trace-events b/trace-events
index 52b7efa..5877289 100644
--- a/trace-events
+++ b/trace-events
@@ -123,6 +123,10 @@ virtio_blk_data_plane_start(void *s) "dataplane %p"
virtio_blk_data_plane_stop(void *s) "dataplane %p"
virtio_blk_data_plane_process_request(void *s, unsigned int out_num, unsigned
int in_num, unsigned int head) "dataplane %p out_num %u in_num %u head %u"
+# blockjob.c
+block_job_txn_job_done_entry(void *txn, void *job, int ret, bool cancelled,
int status, unsigned int jobs_pending) "txn %p job %p ret %d cancelled %d
status %d jobs_pending %u"
+block_job_txn_job_done_return(void *txn, void *job, int ret, bool cancelled,
int status, unsigned int jobs_pending) "txn %p job %p ret %d cancelled %d
status %d jobs_pending %u"
+
# hw/virtio/dataplane/vring.c
vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring
physical %#"PRIx64" desc %p avail %p used %p"
--
2.4.3
- [Qemu-devel] [PATCH v2 00/10] block: incremental backup transactions using BlockJobTxn, Stefan Hajnoczi, 2015/07/06
- [Qemu-devel] [PATCH v2 02/10] iotests: add transactional incremental backup test, Stefan Hajnoczi, 2015/07/06
- [Qemu-devel] [PATCH v2 01/10] qapi: Add transaction support to block-dirty-bitmap operations, Stefan Hajnoczi, 2015/07/06
- [Qemu-devel] [PATCH v2 03/10] block: rename BlkTransactionState and BdrvActionOps, Stefan Hajnoczi, 2015/07/06
- [Qemu-devel] [PATCH v2 04/10] block: keep bitmap if incremental backup job is cancelled, Stefan Hajnoczi, 2015/07/06
- [Qemu-devel] [PATCH v2 06/10] blockdev: make BlockJobTxn available to qmp 'transaction', Stefan Hajnoczi, 2015/07/06
- [Qemu-devel] [PATCH v2 07/10] block/backup: support block job transactions, Stefan Hajnoczi, 2015/07/06
- [Qemu-devel] [PATCH v2 05/10] block: add block job transactions,
Stefan Hajnoczi <=
[Qemu-devel] [PATCH v2 08/10] iotests: 124 - transactional failure test, Stefan Hajnoczi, 2015/07/06
[Qemu-devel] [PATCH v2 09/10] qmp-commands.hx: Update the supported 'transaction' operations, Stefan Hajnoczi, 2015/07/06
[Qemu-devel] [PATCH v2 10/10] tests: add BlockJobTxn unit test, Stefan Hajnoczi, 2015/07/06