[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[RFC PATCH 6/9] block/curl: Cache downloaded blocks
From: |
David Edmondson |
Subject: |
[RFC PATCH 6/9] block/curl: Cache downloaded blocks |
Date: |
Tue, 18 Aug 2020 12:08:42 +0100 |
In the hope that they will be referred to multiple times, cache the
blocks downloaded from the remote server.
Signed-off-by: David Edmondson <david.edmondson@oracle.com>
---
block/curl.c | 321 +++++++++++++++++++++++++++++++++++++++------
block/trace-events | 3 +
2 files changed, 287 insertions(+), 37 deletions(-)
diff --git a/block/curl.c b/block/curl.c
index b2d02818a9..0ea9eedebd 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -81,11 +81,29 @@ static CURLMcode __curl_multi_socket_action(CURLM
*multi_handle,
/* Must be a non-zero power of 2. */
#define CURL_BLOCK_OPT_BLOCKSIZE_DEFAULT (256 * 1024)
+/* The maximum number of blocks to store in the cache. */
+#define CURL_BLOCK_CACHE_MAX_BLOCKS 100
+/* The number of heads in the hash table. */
+#define CURL_BLOCK_CACHE_HASH 37
+
struct BDRVCURLState;
struct CURLState;
static bool libcurl_initialized;
+typedef struct block {
+ QLIST_ENTRY(block) hash; /* Blocks with the same hash value. */
+ QLIST_ENTRY(block) free; /* Block free list. */
+ QTAILQ_ENTRY(block) lru; /* LRU list. */
+ bool hashed; /* block_t contains data and is hashed. */
+ int use; /* Use count. */
+
+ uint64_t start; /* Offset of first byte. */
+ uint64_t count; /* Valid bytes. */
+
+ char *buf; /* Data. */
+} block_t;
+
typedef struct CURLAIOCB {
Coroutine *co;
QEMUIOVector *qiov;
@@ -117,12 +135,11 @@ typedef struct CURLState
CURLAIOCB *acb[CURL_NUM_ACB];
CURL *curl;
QLIST_HEAD(, CURLSocket) sockets;
- char *orig_buf;
- uint64_t buf_start;
size_t buf_off;
char range[128];
char errmsg[CURL_ERROR_SIZE];
char in_use;
+ block_t *cache_block;
} CURLState;
typedef struct BDRVCURLState {
@@ -144,11 +161,17 @@ typedef struct BDRVCURLState {
char *proxypassword;
size_t offset;
size_t blocksize;
+ int cache_allocated; /* The number of block_t currently allocated. */
+ QLIST_HEAD(, block) cache_free;
+ QTAILQ_HEAD(, block) cache_lru;
+ QLIST_HEAD(, block) * cache_hash;
} BDRVCURLState;
static void curl_clean_state(CURLState *s);
static void curl_multi_do(void *arg);
+static void curl_cache_free(BDRVCURLState *s, block_t *b);
+
/* Align "n" to the start of the containing block. */
static inline uint64_t curl_block_align(BDRVCURLState *s, uint64_t n)
{
@@ -161,6 +184,198 @@ static inline uint64_t curl_block_offset(BDRVCURLState
*s, uint64_t n)
return n & (s->blocksize - 1);
}
+static uint64_t curl_cache_hash(BDRVCURLState *s, uint64_t n)
+{
+ return curl_block_align(s, n) % CURL_BLOCK_CACHE_HASH;
+}
+
+static bool curl_cache_init(BDRVCURLState *s)
+{
+ s->cache_allocated = 0;
+
+ QLIST_INIT(&s->cache_free);
+ QTAILQ_INIT(&s->cache_lru);
+
+ s->cache_hash = g_try_malloc(CURL_BLOCK_CACHE_HASH *
sizeof(s->cache_hash));
+ if (!s->cache_hash) {
+ return false;
+ }
+
+ for (int i = 0; i < CURL_BLOCK_CACHE_HASH; i++) {
+ QLIST_INIT(&s->cache_hash[i]);
+ }
+
+ return true;
+}
+
+static void curl_cache_deinit(BDRVCURLState *s)
+{
+ block_t *b;
+
+ /*
+ * Cache blocks are either in the hash table or on the free list.
+ */
+ for (int i = 0; i < CURL_BLOCK_CACHE_HASH; i++) {
+ while (!QLIST_EMPTY(&s->cache_hash[i])) {
+ b = QLIST_FIRST(&s->cache_hash[i]);
+ QLIST_REMOVE(b, hash);
+ b->hashed = false;
+ curl_cache_free(s, b);
+ }
+ }
+
+ while (!QLIST_EMPTY(&s->cache_free)) {
+ b = QLIST_FIRST(&s->cache_free);
+ QLIST_REMOVE(b, free);
+ curl_cache_free(s, b);
+ }
+
+ assert(s->cache_allocated == 0);
+
+ g_free(s->cache_hash);
+ s->cache_hash = NULL;
+}
+
+static block_t *curl_cache_alloc(BDRVCURLState *s)
+{
+ block_t *b = g_try_malloc0(sizeof(*b));
+
+ if (!b) {
+ return NULL;
+ }
+
+ b->buf = g_try_malloc(s->blocksize);
+ if (!b->buf) {
+ g_free(b);
+ return NULL;
+ }
+
+ s->cache_allocated++;
+
+ trace_curl_cache_alloc(s->cache_allocated);
+
+ return b;
+}
+
+static void curl_cache_free(BDRVCURLState *s, block_t *b)
+{
+ assert(b->use == 0);
+ assert(!b->hashed);
+
+ g_free(b->buf);
+ g_free(b);
+
+ s->cache_allocated--;
+
+ trace_curl_cache_free(s->cache_allocated);
+}
+
+static block_t *curl_cache_get(BDRVCURLState *s)
+{
+ block_t *b = NULL;
+
+ /* If there is one on the free list, use it. */
+ if (!QLIST_EMPTY(&s->cache_free)) {
+ b = QLIST_FIRST(&s->cache_free);
+ QLIST_REMOVE(b, free);
+
+ assert(b->use == 0);
+ assert(!b->hashed);
+
+ b->use++;
+ goto done;
+ }
+
+ /* If not at the limit, try get a new one. */
+ if (s->cache_allocated < CURL_BLOCK_CACHE_MAX_BLOCKS) {
+ b = curl_cache_alloc(s);
+ if (b) {
+ b->use++;
+ goto done;
+ }
+ }
+
+ /* Take one from the LRU list. */
+ if (!QTAILQ_EMPTY(&s->cache_lru)) {
+ b = QTAILQ_FIRST(&s->cache_lru);
+ QTAILQ_REMOVE(&s->cache_lru, b, lru);
+
+ /* Remove it from the hash. */
+ QLIST_REMOVE(b, hash);
+
+ assert(b->use == 0);
+
+ b->hashed = false;
+ b->use++;
+ goto done;
+ }
+
+ done:
+ return b;
+}
+
+static void curl_cache_put(BDRVCURLState *s, block_t *b, bool valid)
+{
+ b->use--;
+
+ if (valid) {
+ /* If it's not hashed, hash it now. */
+ if (!b->hashed) {
+ b->hashed = true;
+ QLIST_INSERT_HEAD(&s->cache_hash[curl_cache_hash(s, b->start)],
+ b, hash);
+ }
+
+ /* If the block is no longer being used, put it on the LRU list. */
+ if (b->use == 0) {
+ QTAILQ_INSERT_TAIL(&s->cache_lru, b, lru);
+ }
+ } else {
+ b->hashed = false;
+ QLIST_INSERT_HEAD(&s->cache_free, b, free);
+ }
+}
+
+static block_t *cache_lookup(BDRVCURLState *s, uint64_t start)
+{
+ block_t *b;
+
+ QLIST_FOREACH(b, &s->cache_hash[curl_cache_hash(s, start)], hash) {
+ if (b->start <= start && start < b->start + b->count) {
+ assert(b->hashed);
+ b->use++;
+
+ /* Remove from the LRU list. */
+ QTAILQ_REMOVE(&s->cache_lru, b, lru);
+
+ return b;
+ }
+ }
+
+ return NULL;
+}
+
+static bool curl_cache_find(BDRVCURLState *s, CURLAIOCB *acb)
+{
+ block_t *b;
+
+ b = cache_lookup(s, acb->offset);
+ if (!b) {
+ return false;
+ }
+
+ trace_curl_cache_hit(qemu_coroutine_self(), acb->offset, acb->bytes);
+
+ qemu_iovec_from_buf(acb->qiov, acb->qiov_offset,
+ b->buf + curl_block_offset(s, acb->offset),
+ acb->bytes);
+
+ curl_cache_put(s, b, true);
+
+ acb->ret = 0;
+ return true;
+}
+
#ifdef NEED_CURL_TIMER_CALLBACK
/* Called from curl_multi_do_locked, with s->mutex held. */
static int curl_timer_cb(CURLM *multi, long timeout_ms, void *opaque)
@@ -274,11 +489,12 @@ static size_t curl_read_cb(void *ptr, size_t size, size_t
nmemb, void *opaque)
{
CURLState *state = (CURLState *)opaque;
BDRVCURLState *s = state->s;
+ block_t *b = state->cache_block;
size_t realsize = size * nmemb;
trace_curl_read_cb(realsize);
- if (!state || !state->orig_buf) {
+ if (!state || !b) {
goto read_end;
}
@@ -287,8 +503,9 @@ static size_t curl_read_cb(void *ptr, size_t size, size_t
nmemb, void *opaque)
goto read_end;
}
realsize = MIN(realsize, s->blocksize - state->buf_off);
- memcpy(state->orig_buf + state->buf_off, ptr, realsize);
+ memcpy(b->buf + state->buf_off, ptr, realsize);
state->buf_off += realsize;
+ b->count += realsize;
read_end:
/* curl will error out if we do not return this value */
@@ -296,35 +513,38 @@ read_end:
}
/* Called with s->mutex held. */
-static bool curl_find_buf(BDRVCURLState *s, uint64_t start, uint64_t len,
- CURLAIOCB *acb)
+static bool curl_pending_find(BDRVCURLState *s, CURLAIOCB *acb)
{
int i;
+ uint64_t start = acb->offset;
+ uint64_t len = acb->bytes;
uint64_t end = start + len;
uint64_t clamped_end = MIN(end, s->len);
uint64_t clamped_len = clamped_end - start;
for (i = 0; i < CURL_NUM_STATES; i++) {
CURLState *state = &s->states[i];
- /* The end of the currently valid data. */
- uint64_t buf_end = state->buf_start + state->buf_off;
- /* The end of the valid data when the IO completes. */
- uint64_t buf_fend = state->buf_start + s->blocksize;
+ block_t *b = state->cache_block;
+ uint64_t buf_end, buf_fend;
- if (!state->orig_buf)
- continue;
- if (!state->buf_off)
+ if (!b) {
continue;
+ }
+
+ /* The end of the currently valid data. */
+ buf_end = b->start + state->buf_off;
+ /* The end of the valid data when the IO completes. */
+ buf_fend = b->start + s->blocksize;
/*
* Does the existing buffer cover our section?
*/
- if ((start >= state->buf_start) &&
+ if ((start >= b->start) &&
(start <= buf_end) &&
- (clamped_end >= state->buf_start) &&
+ (clamped_end >= b->start) &&
(clamped_end <= buf_end))
{
- char *buf = state->orig_buf + curl_block_offset(s, start);
+ char *buf = b->buf + curl_block_offset(s, start);
trace_curl_pending_hit(qemu_coroutine_self(),
start, len);
@@ -343,9 +563,9 @@ static bool curl_find_buf(BDRVCURLState *s, uint64_t start,
uint64_t len,
* aiocb.
*/
if (state->in_use &&
- (start >= state->buf_start) &&
+ (start >= b->start) &&
(start <= buf_fend) &&
- (clamped_end >= state->buf_start) &&
+ (clamped_end >= b->start) &&
(clamped_end <= buf_fend))
{
int j;
@@ -388,10 +608,10 @@ static void curl_multi_check_completion(BDRVCURLState *s)
int i;
CURLState *state = NULL;
bool error = msg->data.result != CURLE_OK;
+ block_t *b;
curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE,
(char **)&state);
-
if (error) {
static int errcount = 100;
@@ -406,6 +626,8 @@ static void curl_multi_check_completion(BDRVCURLState *s)
}
}
+ b = state->cache_block;
+
for (i = 0; i < CURL_NUM_ACB; i++) {
CURLAIOCB *acb = state->acb[i];
@@ -418,7 +640,7 @@ static void curl_multi_check_completion(BDRVCURLState *s)
assert(state->buf_off >= acb->end);
qemu_iovec_from_buf(acb->qiov, acb->qiov_offset,
- state->orig_buf + acb->start,
+ b->buf + acb->start,
acb->end - acb->start);
if (acb->end - acb->start < acb->bytes) {
@@ -436,6 +658,9 @@ static void curl_multi_check_completion(BDRVCURLState *s)
qemu_mutex_lock(&s->mutex);
}
+ curl_cache_put(s, b, true);
+ state->cache_block = NULL;
+
curl_clean_state(state);
break;
}
@@ -612,8 +837,10 @@ static void curl_detach_aio_context(BlockDriverState *bs)
curl_easy_cleanup(s->states[i].curl);
s->states[i].curl = NULL;
}
- g_free(s->states[i].orig_buf);
- s->states[i].orig_buf = NULL;
+ if (s->states[i].cache_block) {
+ curl_cache_free(s, s->states[i].cache_block);
+ s->states[i].cache_block = NULL;
+ }
}
if (s->multi) {
curl_multi_cleanup(s->multi);
@@ -868,6 +1095,10 @@ static int curl_open(BlockDriverState *bs, QDict
*options, int flags,
}
trace_curl_open_size(s->len);
+ if (!curl_cache_init(s)) {
+ goto out;
+ }
+
qemu_mutex_lock(&s->mutex);
curl_clean_state(state);
qemu_mutex_unlock(&s->mutex);
@@ -898,6 +1129,7 @@ static void curl_setup_preadv(BlockDriverState *bs,
CURLAIOCB *acb)
{
CURLState *state;
int running;
+ block_t *b;
BDRVCURLState *s = bs->opaque;
@@ -910,11 +1142,16 @@ static void curl_setup_preadv(BlockDriverState *bs,
CURLAIOCB *acb)
qemu_mutex_lock(&s->mutex);
+ /* Can this request be handled using a cached block? */
+ if (curl_cache_find(s, acb)) {
+ goto out;
+ }
+
/*
- * Check whether the requested data can be found in an existing or
- * pending IO request.
+ * Check whether the requested data can be found in a pending IO
+ * request.
*/
- if (curl_find_buf(s, acb->offset, acb->bytes, acb)) {
+ if (curl_pending_find(s, acb)) {
goto out;
}
@@ -935,25 +1172,34 @@ static void curl_setup_preadv(BlockDriverState *bs,
CURLAIOCB *acb)
goto out;
}
- acb->start = curl_block_offset(s, acb->offset);
- acb->end = acb->start + MIN(acb->bytes, s->len - acb->offset);
-
- state->buf_off = 0;
- state->buf_start = curl_block_align(s, acb->offset);
- if (!state->orig_buf) {
- state->orig_buf = g_try_malloc(s->blocksize);
- }
- if (!state->orig_buf) {
+ b = curl_cache_get(s);
+ if (!b) {
curl_clean_state(state);
acb->ret = -ENOMEM;
goto out;
}
+
+ state->cache_block = b;
+
+ /*
+ * Any already cached or in-progress IO for
+ * curl_cache_base(acb->offset) would have been found by
+ * curl_cache_find() or curl_pending_find() respectively, so this
+ * must be a new request for that block.
+ */
+ b->start = curl_block_align(s, acb->offset);
+ b->count = 0; /* Nothing read so far. */
+
+ acb->start = curl_block_offset(s, acb->offset);
+ acb->end = acb->start + MIN(acb->bytes, s->len - acb->offset);
+
+ state->buf_off = 0;
state->acb[0] = acb;
snprintf(state->range, 127, "%" PRIu64 "-%" PRIu64,
- s->offset + state->buf_start,
- s->offset + state->buf_start + s->blocksize);
- trace_curl_setup_preadv(qemu_coroutine_self(), state->buf_start,
+ s->offset + b->start,
+ s->offset + b->start + s->blocksize);
+ trace_curl_setup_preadv(qemu_coroutine_self(), b->start,
s->blocksize);
curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range);
@@ -1027,6 +1273,7 @@ static void curl_close(BlockDriverState *bs)
BDRVCURLState *s = bs->opaque;
trace_curl_close();
+ curl_cache_deinit(s);
curl_detach_aio_context(bs);
qemu_mutex_destroy(&s->mutex);
diff --git a/block/trace-events b/block/trace-events
index 72b1e927bf..deac37c4ad 100644
--- a/block/trace-events
+++ b/block/trace-events
@@ -206,7 +206,10 @@ curl_setup_preadv(void *co, uint64_t offset, uint64_t
bytes) "co %p requests 0x%
curl_pending_hit(void *co, uint64_t start, uint64_t len) "co %p finds 0x%"
PRIx64 " + 0x%" PRIx64
curl_pending_piggyback(void *co, uint64_t start, uint64_t len) "co %p pending
0x%" PRIx64 " + 0x%" PRIx64
curl_pending_miss(void *co, uint64_t start, uint64_t len) "co %p misses 0x%"
PRIx64 " + 0x%" PRIx64
+curl_cache_hit(void *co, uint64_t start, uint64_t len) "co %p finds 0x%"
PRIx64 " + 0x%" PRIx64
curl_close(void) "close"
+curl_cache_alloc(size_t n) "%zu cache blocks allocated"
+curl_cache_free(size_t n) "%zu cache blocks allocated"
# file-posix.c
file_xfs_write_zeroes(const char *error) "cannot write zero range (%s)"
--
2.27.0
- [RFC PATCH 0/9] block/curl: Add caching of data downloaded from the remote server, David Edmondson, 2020/08/18
- [RFC PATCH 7/9] block/curl: Allow the user to control the number of cache blocks, David Edmondson, 2020/08/18
- [RFC PATCH 8/9] block/curl: Allow 16 sockets/ACB, David Edmondson, 2020/08/18
- [RFC PATCH 6/9] block/curl: Cache downloaded blocks,
David Edmondson <=
- [RFC PATCH 2/9] block/curl: Remove readahead support, David Edmondson, 2020/08/18
- [RFC PATCH 5/9] block/curl: Allow the blocksize to be specified by the user, David Edmondson, 2020/08/18
- [RFC PATCH 4/9] block/curl: Perform IO in fixed size chunks, David Edmondson, 2020/08/18
- [RFC PATCH 1/9] block/curl: Add an 'offset' parameter, affecting all range requests, David Edmondson, 2020/08/18
- [RFC PATCH 3/9] block/curl: Tracing, David Edmondson, 2020/08/18
- [RFC PATCH 9/9] block/curl: Add readahead support, David Edmondson, 2020/08/18
- Re: [RFC PATCH 0/9] block/curl: Add caching of data downloaded from the remote server, Stefan Hajnoczi, 2020/08/19