qemu-block
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-block] [PATCH RFC v2 14/22] block/pcache: add support for reschedu


From: Pavel Butsykin
Subject: [Qemu-block] [PATCH RFC v2 14/22] block/pcache: add support for rescheduling requests
Date: Mon, 29 Aug 2016 20:10:13 +0300

Now we can't drop nodes until aio write request will not be completed,
because there is no guarantee that in the interval of time between the start
request and its completion can be cached overlapping chunk of blocks
and some data in the cache will be irrelevant.

Also became possible when aio write corresponds to PCNode with status
NODE_WAIT_STATUS, if we drop the nodes in aio callback,
then these nodes can be skipped because there is a guarantee
that at the time of processing aio read for pending node data on the disk
will be relevant.

Signed-off-by: Pavel Butsykin <address@hidden>
---
 block/pcache.c | 136 +++++++++++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 112 insertions(+), 24 deletions(-)

diff --git a/block/pcache.c b/block/pcache.c
index 1ff4c6a..cb5f884 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -43,6 +43,11 @@ typedef struct RbNodeKey {
     uint32_t    size;
 } RbNodeKey;
 
+typedef struct ACBEntryLink {
+    QTAILQ_ENTRY(ACBEntryLink) entry;
+    struct PrefCacheAIOCB     *acb;
+} ACBEntryLink;
+
 typedef struct BlockNode {
     struct RbNode               rb_node;
     union {
@@ -58,6 +63,10 @@ typedef struct BlockNode {
 typedef struct PCNode {
     BlockNode cm;
 
+    struct {
+        QTAILQ_HEAD(acb_head, ACBEntryLink) list;
+        uint32_t cnt;
+    } wait;
     uint32_t                 status;
     uint32_t                 ref;
     uint8_t                  *data;
@@ -181,7 +190,6 @@ static inline PCNode *pcache_node_ref(PCNode *node)
 {
     assert(node->status == NODE_SUCCESS_STATUS ||
            node->status == NODE_WAIT_STATUS);
-    assert(atomic_read(&node->ref) == 0);/* XXX: only for sequential requests 
*/
     atomic_inc(&node->ref);
 
     return node;
@@ -277,6 +285,8 @@ static inline void *pcache_node_alloc(RbNodeKey* key)
     node->status = NODE_WAIT_STATUS;
     qemu_co_mutex_init(&node->lock);
     node->data = g_malloc(node->cm.nb_sectors << BDRV_SECTOR_BITS);
+    node->wait.cnt = 0;
+    QTAILQ_INIT(&node->wait.list);
 
     return node;
 }
@@ -308,15 +318,33 @@ static void pcache_node_drop(BDRVPCacheState *s, PCNode 
*node)
     pcache_node_unref(s, node);
 }
 
+static inline PCNode *pcache_get_most_unused_node(BDRVPCacheState *s)
+{
+    PCNode *node;
+    assert(!QTAILQ_EMPTY(&s->pcache.lru.list));
+
+    qemu_co_mutex_lock(&s->pcache.lru.lock);
+    node = PCNODE(QTAILQ_LAST(&s->pcache.lru.list, lru_head));
+    pcache_node_ref(node);
+    qemu_co_mutex_unlock(&s->pcache.lru.lock);
+
+    return node;
+}
+
 static void pcache_try_shrink(BDRVPCacheState *s)
 {
     while (s->pcache.curr_size > s->cfg_cache_size) {
-        qemu_co_mutex_lock(&s->pcache.lru.lock);
-        assert(!QTAILQ_EMPTY(&s->pcache.lru.list));
-        PCNode *rmv_node = PCNODE(QTAILQ_LAST(&s->pcache.lru.list, lru_head));
-        qemu_co_mutex_unlock(&s->pcache.lru.lock);
+        PCNode *rmv_node;
+                /* it can happen if all nodes are waiting */
+        if (QTAILQ_EMPTY(&s->pcache.lru.list)) {
+            DPRINTF("lru list is empty, but curr_size: %d\n",
+                    s->pcache.curr_size);
+            break;
+        }
+        rmv_node = pcache_get_most_unused_node(s);
 
         pcache_node_drop(s, rmv_node);
+        pcache_node_unref(s, rmv_node);
 #ifdef PCACHE_DEBUG
         atomic_inc(&s->shrink_cnt_node);
 #endif
@@ -392,7 +420,7 @@ static uint64_t ranges_overlap_size(uint64_t node1, 
uint32_t size1,
     return MIN(node1 + size1, node2 + size2) - MAX(node1, node2);
 }
 
-static void pcache_node_read(PrefCacheAIOCB *acb, PCNode* node)
+static inline void pcache_node_read_buf(PrefCacheAIOCB *acb, PCNode* node)
 {
     uint64_t qiov_offs = 0, node_offs = 0;
     uint32_t size;
@@ -407,15 +435,41 @@ static void pcache_node_read(PrefCacheAIOCB *acb, PCNode* 
node)
                                node->cm.sector_num, node->cm.nb_sectors)
            << BDRV_SECTOR_BITS;
 
+    qemu_co_mutex_lock(&node->lock); /* XXX: use rw lock */
+    copy = \
+        qemu_iovec_from_buf(acb->qiov, qiov_offs, node->data + node_offs, 
size);
+    qemu_co_mutex_unlock(&node->lock);
+    assert(copy == size);
+}
+
+static inline void pcache_node_read_wait(PrefCacheAIOCB *acb, PCNode *node)
+{
+    ACBEntryLink *link = g_slice_alloc(sizeof(*link));
+    link->acb = acb;
+
+    atomic_inc(&node->wait.cnt);
+    QTAILQ_INSERT_HEAD(&node->wait.list, link, entry);
+    acb->ref++;
+}
+
+static void pcache_node_read(PrefCacheAIOCB *acb, PCNode* node)
+{
     assert(node->status == NODE_SUCCESS_STATUS ||
+           node->status == NODE_WAIT_STATUS    ||
            node->status == NODE_REMOVE_STATUS);
     assert(node->data != NULL);
 
     qemu_co_mutex_lock(&node->lock);
-    copy = \
-        qemu_iovec_from_buf(acb->qiov, qiov_offs, node->data + node_offs, 
size);
-    assert(copy == size);
+    if (node->status == NODE_WAIT_STATUS) {
+        pcache_node_read_wait(acb, node);
+        qemu_co_mutex_unlock(&node->lock);
+
+        return;
+    }
     qemu_co_mutex_unlock(&node->lock);
+
+    pcache_node_read_buf(acb, node);
+    pcache_node_unref(acb->s, node);
 }
 
 static inline void prefetch_init_key(PrefCacheAIOCB *acb, RbNodeKey* key)
@@ -446,10 +500,11 @@ static void pcache_pickup_parts_of_cache(PrefCacheAIOCB 
*acb, PCNode *node,
             size -= up_size;
             num += up_size;
         }
-        pcache_node_read(acb, node);
         up_size = MIN(node->cm.sector_num + node->cm.nb_sectors - num, size);
-
-        pcache_node_unref(acb->s, node);
+        pcache_node_read(acb, node); /* don't use node after pcache_node_read,
+                                      * node maybe free.
+                                      */
+        node = NULL;
 
         size -= up_size;
         num += up_size;
@@ -488,7 +543,6 @@ static int32_t pcache_prefetch(PrefCacheAIOCB *acb)
                                                      acb->nb_sectors)
     {
         pcache_node_read(acb, node);
-        pcache_node_unref(acb->s, node);
         return PREFETCH_FULL_UP;
     }
     pcache_pickup_parts_of_cache(acb, node, key.num, key.size);
@@ -513,6 +567,31 @@ static void complete_aio_request(PrefCacheAIOCB *acb)
     }
 }
 
+static void pcache_complete_acb_wait_queue(BDRVPCacheState *s, PCNode *node)
+{
+    ACBEntryLink *link, *next;
+
+    if (atomic_read(&node->wait.cnt) == 0) {
+        return;
+    }
+
+    QTAILQ_FOREACH_SAFE(link, &node->wait.list, entry, next) {
+        PrefCacheAIOCB *wait_acb = link->acb;
+
+        QTAILQ_REMOVE(&node->wait.list, link, entry);
+        g_slice_free1(sizeof(*link), link);
+
+        pcache_node_read_buf(wait_acb, node);
+
+        assert(node->ref != 0);
+        pcache_node_unref(s, node);
+
+        complete_aio_request(wait_acb);
+        atomic_dec(&node->wait.cnt);
+    }
+    assert(atomic_read(&node->wait.cnt) == 0);
+}
+
 static void pcache_node_submit(PrefCachePartReq *req)
 {
     PCNode *node = req->node;
@@ -539,14 +618,17 @@ static void pcache_merge_requests(PrefCacheAIOCB *acb)
 
     qemu_co_mutex_lock(&acb->requests.lock);
     QTAILQ_FOREACH_SAFE(req, &acb->requests.list, entry, next) {
+        PCNode *node = req->node;
         QTAILQ_REMOVE(&acb->requests.list, req, entry);
 
         assert(req != NULL);
-        assert(req->node->status == NODE_WAIT_STATUS);
+        assert(node->status == NODE_WAIT_STATUS);
 
         pcache_node_submit(req);
 
-        pcache_node_read(acb, req->node);
+        pcache_node_read_buf(acb, node);
+
+        pcache_complete_acb_wait_queue(acb->s, node);
 
         pcache_node_unref(acb->s, req->node);
 
@@ -559,22 +641,27 @@ static void pcache_try_node_drop(PrefCacheAIOCB *acb)
 {
     BDRVPCacheState *s = acb->s;
     RbNodeKey key;
+    PCNode *node;
+    uint64_t end_offs = acb->sector_num + acb->nb_sectors;
 
-    prefetch_init_key(acb, &key);
-
+    key.num = acb->sector_num;
     do {
-        PCNode *node;
-        qemu_co_mutex_lock(&s->pcache.tree.lock);
+        key.size = end_offs - key.num;
+
+        qemu_co_mutex_lock(&s->pcache.tree.lock); /* XXX: use get_next_node */
         node = pcache_node_search(&s->pcache.tree.root, &key);
         qemu_co_mutex_unlock(&s->pcache.tree.lock);
         if (node == NULL) {
-            break;
+            return;
         }
-
-        pcache_node_drop(s, node);
+        if (node->status != NODE_WAIT_STATUS) {
+            assert(node->status == NODE_SUCCESS_STATUS);
+            pcache_node_drop(s, node);
+        }
+        key.num = node->cm.sector_num + node->cm.nb_sectors;
 
         pcache_node_unref(s, node);
-    } while (true);
+    } while (end_offs > key.num);
 }
 
 static void pcache_aio_cb(void *opaque, int ret)
@@ -586,6 +673,8 @@ static void pcache_aio_cb(void *opaque, int ret)
             return;
         }
         pcache_merge_requests(acb);
+    } else {        /* QEMU_AIO_WRITE */
+        pcache_try_node_drop(acb); /* XXX: use write through */
     }
 
     complete_aio_request(acb);
@@ -649,7 +738,6 @@ static BlockAIOCB *pcache_aio_writev(BlockDriverState *bs,
 {
     PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
                                          opaque, QEMU_AIO_WRITE);
-    pcache_try_node_drop(acb); /* XXX: use write through */
 
     bdrv_aio_writev(bs->file, sector_num, qiov, nb_sectors,
                     pcache_aio_cb, acb);
-- 
2.8.3




reply via email to

[Prev in Thread] Current Thread [Next in Thread]