qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH 2/3] barriers: block-raw-posix barrier support


From: Christoph Hellwig
Subject: [Qemu-devel] [PATCH 2/3] barriers: block-raw-posix barrier support
Date: Tue, 5 May 2009 14:08:36 +0200
User-agent: Mutt/1.3.28i

Add support for write barriers to the posix raw file / block device code.
The guts of this is in the aio emulation as that's where we handle our queue
of outstanding requests.

The highlevel design is the following:

 - As soon as a barrier request is submitted via qemu_paio_submit we increment
   the barrier_inprogress count to signal we now have to deal with barriers.
 - From that point on every new request that is queued up by qemu_paio_submit
   does not get onto the normal request list but a secondary post-barrier queue
 - Once the barrier request is dequeued by an aio_thread that thread waits for
   all other outstanding requests to finish, issues an fdatasync, the actual
   barrier request, another fdatasync to prevent reordering in the pagecache.
   After the request is finished the barrier_inprogress counter is decrement,
   the post-barrier list is splice back onto the main request list up to and
   including the next barrier request if there is one and normal operation
   is resumed.

That means barrier mean a quite massive serialization of the I/O submission
path, which unfortunately is not avoidable given their semantics.  I will
mitigate it for setups with multiple virtual storage device with a patch
that makes the aio state per-device in the near future.

Signed-off-by: Christoph Hellwig <address@hidden>

Index: qemu/posix-aio-compat.c
===================================================================
--- qemu.orig/posix-aio-compat.c        2009-05-05 13:35:09.115784239 +0200
+++ qemu/posix-aio-compat.c     2009-05-05 13:47:38.625659276 +0200
@@ -17,6 +17,7 @@
 #include <errno.h>
 #include <time.h>
 #include <string.h>
+#include <stdbool.h>
 #include <stdlib.h>
 #include <stdio.h>
 #include "osdep.h"
@@ -31,8 +32,19 @@ static pthread_attr_t attr;
 static int max_threads = 64;
 static int cur_threads = 0;
 static int idle_threads = 0;
+
+/* number of barriers currently handled */
+static int barrier_inprogress = 0;
+
+/* normal list of all requests waiting for execution */
 static TAILQ_HEAD(, qemu_paiocb) request_list;
 
+/* list of all requests issued after a barrier request */
+static TAILQ_HEAD(, qemu_paiocb) post_barrier_list;
+
+/* wait for all I/O threads to be idle before issueing a barrier request */
+static pthread_cond_t idle_wait = PTHREAD_COND_INITIALIZER;
+
 #ifdef HAVE_PREADV
 static int preadv_present = 1;
 #else
@@ -62,6 +74,13 @@ static void mutex_unlock(pthread_mutex_t
     if (ret) die2(ret, "pthread_mutex_unlock");
 }
 
+static int cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
+{
+    int ret = pthread_cond_wait(cond, mutex);
+    if (ret) die2(ret, "pthread_cond_wait");
+    return ret;
+}
+
 static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
                            struct timespec *ts)
 {
@@ -264,6 +283,22 @@ static size_t handle_aiocb_rw(struct qem
     return nbytes;
 }
 
+static void requeue_request_list(void)
+{
+    struct qemu_paiocb *cb, *next;
+
+    TAILQ_FOREACH_SAFE(cb, &post_barrier_list, node, next) {
+        TAILQ_REMOVE(&post_barrier_list, cb, node);
+        TAILQ_INSERT_TAIL(&request_list, cb, node);
+
+        /*
+         * Stop after the first barrier request.
+         */
+        if (cb->aio_flags & QEMU_AIO_BARRIER)
+            break;
+    }
+}
+
 static void *aio_thread(void *unused)
 {
     pid_t pid;
@@ -280,6 +315,8 @@ static void *aio_thread(void *unused)
         size_t ret = 0;
         qemu_timeval tv;
         struct timespec ts;
+        bool wakeup_threads = false;
+        bool wakeup_idle = false;
 
         qemu_gettimeofday(&tv);
         ts.tv_sec = tv.tv_sec + 10;
@@ -297,6 +334,16 @@ static void *aio_thread(void *unused)
 
         aiocb = TAILQ_FIRST(&request_list);
         TAILQ_REMOVE(&request_list, aiocb, node);
+
+        /*
+         * We've got a barrier request.  Make sure all previous requests
+         * are completed before we issue it.
+         */
+        if (aiocb->aio_flags & QEMU_AIO_BARRIER) {
+            while (idle_threads != cur_threads)
+                cond_wait(&idle_wait, &lock);
+        }
+
         aiocb->active = 1;
         idle_threads--;
         mutex_unlock(&lock);
@@ -304,7 +351,13 @@ static void *aio_thread(void *unused)
         switch (aiocb->aio_type) {
         case QEMU_PAIO_READ:
         case QEMU_PAIO_WRITE:
-               ret = handle_aiocb_rw(aiocb);
+                if (aiocb->aio_flags & QEMU_AIO_BARRIER) {
+                    fdatasync(aiocb->aio_fildes);
+                    ret = handle_aiocb_rw(aiocb);
+                    fdatasync(aiocb->aio_fildes);
+                } else {
+                    ret = handle_aiocb_rw(aiocb);
+                }
                break;
         case QEMU_PAIO_IOCTL:
                ret = handle_aiocb_ioctl(aiocb);
@@ -317,9 +370,32 @@ static void *aio_thread(void *unused)
 
         mutex_lock(&lock);
         aiocb->ret = ret;
-        idle_threads++;
+
+        if (aiocb->aio_flags & QEMU_AIO_BARRIER) {
+            barrier_inprogress--;
+            if (!TAILQ_EMPTY(&request_list))
+                 die2(ret, "request list not empty");
+
+            if (!TAILQ_EMPTY(&post_barrier_list)) {
+                requeue_request_list();
+                wakeup_threads = true;
+            }
+        }
+
+        /* wake up barrier thread when all threads are idle */
+        if (++idle_threads == cur_threads && barrier_inprogress)
+            wakeup_idle = true;
         mutex_unlock(&lock);
 
+        /*
+         * If any new requests were queued up on the post_barrier_list wake up
+         * I/O threads now.
+         */
+        if (wakeup_threads)
+            cond_signal(&cond);
+       if (wakeup_idle)
+            cond_signal(&idle_wait);
+
         if (kill(pid, aiocb->ev_signo)) die("kill failed");
     }
 
@@ -348,6 +424,7 @@ int qemu_paio_init(struct qemu_paioinit 
     if (ret) die2(ret, "pthread_attr_setdetachstate");
 
     TAILQ_INIT(&request_list);
+    TAILQ_INIT(&post_barrier_list);
 
     return 0;
 }
@@ -357,10 +434,21 @@ static int qemu_paio_submit(struct qemu_
     aiocb->aio_type = type;
     aiocb->ret = -EINPROGRESS;
     aiocb->active = 0;
+
     mutex_lock(&lock);
     if (idle_threads == 0 && cur_threads < max_threads)
         spawn_thread();
-    TAILQ_INSERT_TAIL(&request_list, aiocb, node);
+
+    if (barrier_inprogress) {
+        aiocb->aio_flags |= QEMU_AIO_POST_BARRIER;
+        TAILQ_INSERT_TAIL(&post_barrier_list, aiocb, node);
+    } else {
+        TAILQ_INSERT_TAIL(&request_list, aiocb, node);
+    }
+
+    if (aiocb->aio_flags & QEMU_AIO_BARRIER)
+        barrier_inprogress++;
+
     mutex_unlock(&lock);
     cond_signal(&cond);
 
@@ -411,13 +499,17 @@ int qemu_paio_cancel(int fd, struct qemu
 
     mutex_lock(&lock);
     if (!aiocb->active) {
-        TAILQ_REMOVE(&request_list, aiocb, node);
+        if (aiocb->aio_flags & QEMU_AIO_POST_BARRIER)
+            TAILQ_REMOVE(&post_barrier_list, aiocb, node);
+       else
+            TAILQ_REMOVE(&request_list, aiocb, node);
         aiocb->ret = -ECANCELED;
         ret = QEMU_PAIO_CANCELED;
-    } else if (aiocb->ret == -EINPROGRESS)
+    } else if (aiocb->ret == -EINPROGRESS) {
         ret = QEMU_PAIO_NOTCANCELED;
-    else
+    } else {
         ret = QEMU_PAIO_ALLDONE;
+    }
     mutex_unlock(&lock);
 
     return ret;
Index: qemu/posix-aio-compat.h
===================================================================
--- qemu.orig/posix-aio-compat.h        2009-05-05 13:35:09.160784863 +0200
+++ qemu/posix-aio-compat.h     2009-05-05 13:45:54.312668406 +0200
@@ -39,6 +39,11 @@ struct qemu_paiocb
     unsigned aio_flags;
 /* 512 byte alignment required for buffer, offset and length */
 #define QEMU_AIO_SECTOR_ALIGNED        0x01
+/* Barrier request, must not re-order */
+#define QEMU_AIO_BARRIER        0x02
+
+/* Internal flag, is in the post-barrier queue */
+#define QEMU_AIO_POST_BARRIER   0x80
 
     /* private */
     TAILQ_ENTRY(qemu_paiocb) node;
Index: qemu/block-raw-posix.c
===================================================================
--- qemu.orig/block-raw-posix.c 2009-05-05 13:43:21.431811845 +0200
+++ qemu/block-raw-posix.c      2009-05-05 13:43:21.897783237 +0200
@@ -172,6 +172,14 @@ static int raw_open(BlockDriverState *bs
             return ret;
         }
     }
+
+    /*
+     * If the open mode allows caching writes in the file cache advertise
+     * barrier support so that the guest can control the cachie behaviour.
+     */
+    if (!(open_flags & (O_DIRECT|O_DSYNC)))
+        bs->barrier_support = 1;
+
     return 0;
 }
 
@@ -600,8 +608,8 @@ static int posix_aio_init(void)
 }
 
 static RawAIOCB *raw_aio_setup(BlockDriverState *bs, int64_t sector_num,
-        QEMUIOVector *qiov, int nb_sectors,
-        BlockDriverCompletionFunc *cb, void *opaque)
+        QEMUIOVector *qiov, int nb_sectors, BlockDriverCompletionFunc *cb,
+        void *opaque, unsigned flags)
 {
     BDRVRawState *s = bs->opaque;
     RawAIOCB *acb;
@@ -627,6 +635,8 @@ static RawAIOCB *raw_aio_setup(BlockDriv
      */
     if (s->aligned_buf)
         acb->aiocb.aio_flags |= QEMU_AIO_SECTOR_ALIGNED;
+    if (flags & BDRV_IO_BARRIER)
+        acb->aiocb.aio_flags |= QEMU_AIO_BARRIER;
 
     acb->next = posix_aio_state->first_aio;
     posix_aio_state->first_aio = acb;
@@ -658,7 +668,7 @@ static BlockDriverAIOCB *raw_aio_readv(B
 {
     RawAIOCB *acb;
 
-    acb = raw_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque);
+    acb = raw_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque, flags);
     if (!acb)
         return NULL;
     if (qemu_paio_read(&acb->aiocb) < 0) {
@@ -674,7 +684,7 @@ static BlockDriverAIOCB *raw_aio_writev(
 {
     RawAIOCB *acb;
 
-    acb = raw_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque);
+    acb = raw_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque, flags);
     if (!acb)
         return NULL;
     if (qemu_paio_write(&acb->aiocb) < 0) {
@@ -1022,6 +1032,14 @@ static int hdev_open(BlockDriverState *b
         s->fd_media_changed = 1;
     }
 #endif
+
+    /*
+     * If the open mode allows caching writes in the file cache advertise
+     * barrier support so that the guest can control the cachie behaviour.
+     */
+    if (!(open_flags & (O_DIRECT|O_DSYNC)))
+        bs->barrier_support = 1;
+
     return 0;
 }
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]