[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-block] [PATCH RFC 14/22] block/pcache: add support for reschedulin
From: |
Pavel Butsykin |
Subject: |
[Qemu-block] [PATCH RFC 14/22] block/pcache: add support for rescheduling requests |
Date: |
Thu, 25 Aug 2016 16:44:13 +0300 |
Now we can't drop nodes until aio write request will not be completed,
because there is no guarantee that in the interval of time between the start
request and its completion can be cached overlapping chunk of blocks
and some data in the cache will be irrelevant.
Also became possible when aio write corresponds to PCNode with status
NODE_WAIT_STATUS, if we drop the nodes in aio callback,
then these nodes can be skipped because there is a guarantee
that at the time of processing aio read for pending node data on the disk
will be relevant.
Signed-off-by: Pavel Butsykin <address@hidden>
---
block/pcache.c | 136 +++++++++++++++++++++++++++++++++++++++++++++++----------
1 file changed, 112 insertions(+), 24 deletions(-)
diff --git a/block/pcache.c b/block/pcache.c
index 1ff4c6a..cb5f884 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -43,6 +43,11 @@ typedef struct RbNodeKey {
uint32_t size;
} RbNodeKey;
+typedef struct ACBEntryLink {
+ QTAILQ_ENTRY(ACBEntryLink) entry;
+ struct PrefCacheAIOCB *acb;
+} ACBEntryLink;
+
typedef struct BlockNode {
struct RbNode rb_node;
union {
@@ -58,6 +63,10 @@ typedef struct BlockNode {
typedef struct PCNode {
BlockNode cm;
+ struct {
+ QTAILQ_HEAD(acb_head, ACBEntryLink) list;
+ uint32_t cnt;
+ } wait;
uint32_t status;
uint32_t ref;
uint8_t *data;
@@ -181,7 +190,6 @@ static inline PCNode *pcache_node_ref(PCNode *node)
{
assert(node->status == NODE_SUCCESS_STATUS ||
node->status == NODE_WAIT_STATUS);
- assert(atomic_read(&node->ref) == 0);/* XXX: only for sequential requests
*/
atomic_inc(&node->ref);
return node;
@@ -277,6 +285,8 @@ static inline void *pcache_node_alloc(RbNodeKey* key)
node->status = NODE_WAIT_STATUS;
qemu_co_mutex_init(&node->lock);
node->data = g_malloc(node->cm.nb_sectors << BDRV_SECTOR_BITS);
+ node->wait.cnt = 0;
+ QTAILQ_INIT(&node->wait.list);
return node;
}
@@ -308,15 +318,33 @@ static void pcache_node_drop(BDRVPCacheState *s, PCNode
*node)
pcache_node_unref(s, node);
}
+static inline PCNode *pcache_get_most_unused_node(BDRVPCacheState *s)
+{
+ PCNode *node;
+ assert(!QTAILQ_EMPTY(&s->pcache.lru.list));
+
+ qemu_co_mutex_lock(&s->pcache.lru.lock);
+ node = PCNODE(QTAILQ_LAST(&s->pcache.lru.list, lru_head));
+ pcache_node_ref(node);
+ qemu_co_mutex_unlock(&s->pcache.lru.lock);
+
+ return node;
+}
+
static void pcache_try_shrink(BDRVPCacheState *s)
{
while (s->pcache.curr_size > s->cfg_cache_size) {
- qemu_co_mutex_lock(&s->pcache.lru.lock);
- assert(!QTAILQ_EMPTY(&s->pcache.lru.list));
- PCNode *rmv_node = PCNODE(QTAILQ_LAST(&s->pcache.lru.list, lru_head));
- qemu_co_mutex_unlock(&s->pcache.lru.lock);
+ PCNode *rmv_node;
+ /* it can happen if all nodes are waiting */
+ if (QTAILQ_EMPTY(&s->pcache.lru.list)) {
+ DPRINTF("lru list is empty, but curr_size: %d\n",
+ s->pcache.curr_size);
+ break;
+ }
+ rmv_node = pcache_get_most_unused_node(s);
pcache_node_drop(s, rmv_node);
+ pcache_node_unref(s, rmv_node);
#ifdef PCACHE_DEBUG
atomic_inc(&s->shrink_cnt_node);
#endif
@@ -392,7 +420,7 @@ static uint64_t ranges_overlap_size(uint64_t node1,
uint32_t size1,
return MIN(node1 + size1, node2 + size2) - MAX(node1, node2);
}
-static void pcache_node_read(PrefCacheAIOCB *acb, PCNode* node)
+static inline void pcache_node_read_buf(PrefCacheAIOCB *acb, PCNode* node)
{
uint64_t qiov_offs = 0, node_offs = 0;
uint32_t size;
@@ -407,15 +435,41 @@ static void pcache_node_read(PrefCacheAIOCB *acb, PCNode*
node)
node->cm.sector_num, node->cm.nb_sectors)
<< BDRV_SECTOR_BITS;
+ qemu_co_mutex_lock(&node->lock); /* XXX: use rw lock */
+ copy = \
+ qemu_iovec_from_buf(acb->qiov, qiov_offs, node->data + node_offs,
size);
+ qemu_co_mutex_unlock(&node->lock);
+ assert(copy == size);
+}
+
+static inline void pcache_node_read_wait(PrefCacheAIOCB *acb, PCNode *node)
+{
+ ACBEntryLink *link = g_slice_alloc(sizeof(*link));
+ link->acb = acb;
+
+ atomic_inc(&node->wait.cnt);
+ QTAILQ_INSERT_HEAD(&node->wait.list, link, entry);
+ acb->ref++;
+}
+
+static void pcache_node_read(PrefCacheAIOCB *acb, PCNode* node)
+{
assert(node->status == NODE_SUCCESS_STATUS ||
+ node->status == NODE_WAIT_STATUS ||
node->status == NODE_REMOVE_STATUS);
assert(node->data != NULL);
qemu_co_mutex_lock(&node->lock);
- copy = \
- qemu_iovec_from_buf(acb->qiov, qiov_offs, node->data + node_offs,
size);
- assert(copy == size);
+ if (node->status == NODE_WAIT_STATUS) {
+ pcache_node_read_wait(acb, node);
+ qemu_co_mutex_unlock(&node->lock);
+
+ return;
+ }
qemu_co_mutex_unlock(&node->lock);
+
+ pcache_node_read_buf(acb, node);
+ pcache_node_unref(acb->s, node);
}
static inline void prefetch_init_key(PrefCacheAIOCB *acb, RbNodeKey* key)
@@ -446,10 +500,11 @@ static void pcache_pickup_parts_of_cache(PrefCacheAIOCB
*acb, PCNode *node,
size -= up_size;
num += up_size;
}
- pcache_node_read(acb, node);
up_size = MIN(node->cm.sector_num + node->cm.nb_sectors - num, size);
-
- pcache_node_unref(acb->s, node);
+ pcache_node_read(acb, node); /* don't use node after pcache_node_read,
+ * node maybe free.
+ */
+ node = NULL;
size -= up_size;
num += up_size;
@@ -488,7 +543,6 @@ static int32_t pcache_prefetch(PrefCacheAIOCB *acb)
acb->nb_sectors)
{
pcache_node_read(acb, node);
- pcache_node_unref(acb->s, node);
return PREFETCH_FULL_UP;
}
pcache_pickup_parts_of_cache(acb, node, key.num, key.size);
@@ -513,6 +567,31 @@ static void complete_aio_request(PrefCacheAIOCB *acb)
}
}
+static void pcache_complete_acb_wait_queue(BDRVPCacheState *s, PCNode *node)
+{
+ ACBEntryLink *link, *next;
+
+ if (atomic_read(&node->wait.cnt) == 0) {
+ return;
+ }
+
+ QTAILQ_FOREACH_SAFE(link, &node->wait.list, entry, next) {
+ PrefCacheAIOCB *wait_acb = link->acb;
+
+ QTAILQ_REMOVE(&node->wait.list, link, entry);
+ g_slice_free1(sizeof(*link), link);
+
+ pcache_node_read_buf(wait_acb, node);
+
+ assert(node->ref != 0);
+ pcache_node_unref(s, node);
+
+ complete_aio_request(wait_acb);
+ atomic_dec(&node->wait.cnt);
+ }
+ assert(atomic_read(&node->wait.cnt) == 0);
+}
+
static void pcache_node_submit(PrefCachePartReq *req)
{
PCNode *node = req->node;
@@ -539,14 +618,17 @@ static void pcache_merge_requests(PrefCacheAIOCB *acb)
qemu_co_mutex_lock(&acb->requests.lock);
QTAILQ_FOREACH_SAFE(req, &acb->requests.list, entry, next) {
+ PCNode *node = req->node;
QTAILQ_REMOVE(&acb->requests.list, req, entry);
assert(req != NULL);
- assert(req->node->status == NODE_WAIT_STATUS);
+ assert(node->status == NODE_WAIT_STATUS);
pcache_node_submit(req);
- pcache_node_read(acb, req->node);
+ pcache_node_read_buf(acb, node);
+
+ pcache_complete_acb_wait_queue(acb->s, node);
pcache_node_unref(acb->s, req->node);
@@ -559,22 +641,27 @@ static void pcache_try_node_drop(PrefCacheAIOCB *acb)
{
BDRVPCacheState *s = acb->s;
RbNodeKey key;
+ PCNode *node;
+ uint64_t end_offs = acb->sector_num + acb->nb_sectors;
- prefetch_init_key(acb, &key);
-
+ key.num = acb->sector_num;
do {
- PCNode *node;
- qemu_co_mutex_lock(&s->pcache.tree.lock);
+ key.size = end_offs - key.num;
+
+ qemu_co_mutex_lock(&s->pcache.tree.lock); /* XXX: use get_next_node */
node = pcache_node_search(&s->pcache.tree.root, &key);
qemu_co_mutex_unlock(&s->pcache.tree.lock);
if (node == NULL) {
- break;
+ return;
}
-
- pcache_node_drop(s, node);
+ if (node->status != NODE_WAIT_STATUS) {
+ assert(node->status == NODE_SUCCESS_STATUS);
+ pcache_node_drop(s, node);
+ }
+ key.num = node->cm.sector_num + node->cm.nb_sectors;
pcache_node_unref(s, node);
- } while (true);
+ } while (end_offs > key.num);
}
static void pcache_aio_cb(void *opaque, int ret)
@@ -586,6 +673,8 @@ static void pcache_aio_cb(void *opaque, int ret)
return;
}
pcache_merge_requests(acb);
+ } else { /* QEMU_AIO_WRITE */
+ pcache_try_node_drop(acb); /* XXX: use write through */
}
complete_aio_request(acb);
@@ -649,7 +738,6 @@ static BlockAIOCB *pcache_aio_writev(BlockDriverState *bs,
{
PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
opaque, QEMU_AIO_WRITE);
- pcache_try_node_drop(acb); /* XXX: use write through */
bdrv_aio_writev(bs->file, sector_num, qiov, nb_sectors,
pcache_aio_cb, acb);
--
2.8.3
- [Qemu-block] [PATCH RFC 12/22] block/pcache: implement read cache to qiov and drop node during aio write, (continued)
- [Qemu-block] [PATCH RFC 12/22] block/pcache: implement read cache to qiov and drop node during aio write, Pavel Butsykin, 2016/08/25
- [Qemu-block] [PATCH RFC 15/22] block/pcache: simple readahead one chunk forward, Pavel Butsykin, 2016/08/25
- [Qemu-block] [PATCH RFC 22/22] block/pcache: drop used pcache node, Pavel Butsykin, 2016/08/25
- [Qemu-block] [PATCH RFC 17/22] block/pcache: skip readahead for non-sequential requests, Pavel Butsykin, 2016/08/25
- [Qemu-block] [PATCH RFC 16/22] block/pcache: pcache readahead node around, Pavel Butsykin, 2016/08/25
- [Qemu-block] [PATCH RFC 20/22] block/pcache: implement pcache error handling of aio cb, Pavel Butsykin, 2016/08/25
- [Qemu-block] [PATCH RFC 21/22] block/pcache: add write through node, Pavel Butsykin, 2016/08/25
- [Qemu-block] [PATCH RFC 05/22] block/pcache: add aio requests into cache, Pavel Butsykin, 2016/08/25
- [Qemu-block] [PATCH RFC 03/22] util/rbtree: add rbtree from linux kernel, Pavel Butsykin, 2016/08/25
- [Qemu-block] [PATCH RFC 18/22] block/pcache: add pcache skip large aio read, Pavel Butsykin, 2016/08/25
- [Qemu-block] [PATCH RFC 14/22] block/pcache: add support for rescheduling requests,
Pavel Butsykin <=
- [Qemu-block] [PATCH RFC 13/22] block/pcache: add generic request complete, Pavel Butsykin, 2016/08/25
- [Qemu-block] [PATCH RFC 19/22] block/pcache: add pcache node assert, Pavel Butsykin, 2016/08/25
- [Qemu-block] [PATCH RFC 09/22] block/pcache: separation AIOCB on requests, Pavel Butsykin, 2016/08/25
- [Qemu-block] [PATCH RFC 06/22] block/pcache: restrict cache size, Pavel Butsykin, 2016/08/25
- Re: [Qemu-block] [Qemu-devel] [PATCH RFC 00/22] I/O prefetch cache, no-reply, 2016/08/26