qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH v6 07/12] curl: make use of CURLDataCache.


From: Fam Zheng
Subject: [Qemu-devel] [PATCH v6 07/12] curl: make use of CURLDataCache.
Date: Fri, 24 May 2013 13:37:02 +0800

Make subsequecial changes to make use of introduced CURLDataCache. Moved
acb struct from CURLState to BDRVCURLState, and changed to list.

Signed-off-by: Fam Zheng <address@hidden>
---
 block/curl.c | 170 ++++++++++++++++++++++++++++++++---------------------------
 1 file changed, 92 insertions(+), 78 deletions(-)

diff --git a/block/curl.c b/block/curl.c
index a99d8b5..5405485 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -39,7 +39,6 @@
                    CURLPROTO_TFTP)
 
 #define CURL_NUM_STATES 8
-#define CURL_NUM_ACB    8
 #define SECTOR_SIZE     512
 #define READ_AHEAD_SIZE (256 * 1024)
 
@@ -52,9 +51,7 @@ typedef struct CURLAIOCB {
 
     int64_t sector_num;
     int nb_sectors;
-
-    size_t start;
-    size_t end;
+    QLIST_ENTRY(CURLAIOCB) next;
 } CURLAIOCB;
 
 typedef struct CURLDataCache {
@@ -62,20 +59,18 @@ typedef struct CURLDataCache {
     int64_t base_pos;
     size_t data_len;
     int64_t write_pos;
+    /* Ref count for CURLState */
+    int use_count;
     QLIST_ENTRY(CURLDataCache) next;
 } CURLDataCache;
 
 typedef struct CURLState
 {
     struct BDRVCURLState *s;
-    CURLAIOCB *acb[CURL_NUM_ACB];
     CURL *curl;
-    char *orig_buf;
-    size_t buf_start;
-    size_t buf_off;
-    size_t buf_len;
     char range[128];
     char errmsg[CURL_ERROR_SIZE];
+    CURLDataCache *cache;
     char in_use;
 } CURLState;
 
@@ -90,6 +85,7 @@ typedef struct BDRVCURLState {
     CURLM *multi;
     size_t len;
     CURLState states[CURL_NUM_STATES];
+    QLIST_HEAD(, CURLAIOCB) acbs;
     QLIST_HEAD(, CURLSockInfo) socks;
     char *url;
     size_t readahead_size;
@@ -219,31 +215,35 @@ static void curl_complete_io(BDRVCURLState *bs, CURLAIOCB 
*acb,
 
 static size_t curl_read_cb(void *ptr, size_t size, size_t nmemb, void *opaque)
 {
-    CURLState *s = ((CURLState*)opaque);
+    CURLState *s = opaque;
+    CURLDataCache *c = s->cache;
     size_t realsize = size * nmemb;
-    int i;
-
-    DPRINTF("CURL: Just reading %zd bytes\n", realsize);
+    CURLAIOCB *acb;
 
-    if (!s || !s->orig_buf)
+    if (!c || !c->data) {
         goto read_end;
+    }
+    if (c->write_pos >= c->data_len) {
+        goto read_end;
+    }
+    memcpy(c->data + c->write_pos, ptr,
+           MIN(realsize, c->data_len - c->write_pos));
+    c->write_pos += realsize;
+    if (c->write_pos >= c->data_len) {
+        c->write_pos = c->data_len;
+    }
 
-    memcpy(s->orig_buf + s->buf_off, ptr, realsize);
-    s->buf_off += realsize;
-
-    for(i=0; i<CURL_NUM_ACB; i++) {
-        CURLAIOCB *acb = s->acb[i];
-
-        if (!acb)
-            continue;
-
-        if ((s->buf_off >= acb->end)) {
-            qemu_iovec_from_buf(acb->qiov, 0, s->orig_buf + acb->start,
-                                acb->end - acb->start);
-            acb->common.cb(acb->common.opaque, 0);
-            qemu_aio_release(acb);
-            s->acb[i] = NULL;
+    acb = QLIST_FIRST(&s->s->acbs);
+    while (acb) {
+        int64_t aio_base = acb->sector_num * SECTOR_SIZE;
+        size_t aio_len = acb->nb_sectors * SECTOR_SIZE;
+        CURLAIOCB *next = QLIST_NEXT(acb, next);
+        if (aio_base >= c->base_pos &&
+            aio_base + aio_len <= c->base_pos + c->write_pos) {
+            QLIST_REMOVE(acb, next);
+            curl_complete_io(s->s, acb, c);
         }
+        acb = next;
     }
 
 read_end:
@@ -273,10 +273,12 @@ static void curl_fd_handler(void *arg)
         CURLMsg *msg;
         msg = curl_multi_info_read(s->multi, &msgs_in_queue);
 
-        if (!msg)
+        if (!msg) {
             break;
-        if (msg->msg == CURLMSG_NONE)
+        }
+        if (msg->msg == CURLMSG_NONE) {
             break;
+        }
 
         switch (msg->msg) {
             case CURLMSG_DONE:
@@ -286,19 +288,17 @@ static void curl_fd_handler(void *arg)
                                   CURLINFO_PRIVATE,
                                   (char **)&state);
 
-                /* ACBs for successful messages get completed in curl_read_cb 
*/
+                /* ACBs for successful messages get completed in curl_read_cb,
+                 * fail existing acbs for now */
                 if (msg->data.result != CURLE_OK) {
-                    int i;
-                    for (i = 0; i < CURL_NUM_ACB; i++) {
-                        CURLAIOCB *acb = state->acb[i];
-
-                        if (acb == NULL) {
-                            continue;
-                        }
-
+                    CURLAIOCB *acb = QLIST_FIRST(&s->acbs);
+                    while (acb) {
+                        CURLAIOCB *next = QLIST_NEXT(acb, next);
+                        DPRINTF("EIO, %s\n", state->errmsg);
                         acb->common.cb(acb->common.opaque, -EIO);
+                        QLIST_REMOVE(acb, next);
                         qemu_aio_release(acb);
-                        state->acb[i] = NULL;
+                        acb = next;
                     }
                 }
 
@@ -315,13 +315,10 @@ static void curl_fd_handler(void *arg)
 static CURLState *curl_init_state(BDRVCURLState *s)
 {
     CURLState *state = NULL;
-    int i, j;
+    int i;
 
     do {
         for (i=0; i<CURL_NUM_STATES; i++) {
-            for (j=0; j<CURL_NUM_ACB; j++)
-                if (s->states[i].acb[j])
-                    continue;
             if (s->states[i].in_use)
                 continue;
 
@@ -378,6 +375,10 @@ static void curl_clean_state(CURLState *s)
     if (s->s->multi)
         curl_multi_remove_handle(s->s->multi, s->curl);
     s->in_use = 0;
+    if (s->cache) {
+        s->cache->use_count--;
+        assert(s->cache->use_count >= 0);
+    }
 }
 
 static void curl_parse_filename(const char *filename, QDict *options,
@@ -481,6 +482,7 @@ static int curl_open(BlockDriverState *bs, QDict *options, 
int flags)
 
     QLIST_INIT(&s->socks);
     QLIST_INIT(&s->cache);
+    QLIST_INIT(&s->acbs);
 
     DPRINTF("CURL: Opening %s\n", file);
     s->url = g_strdup(file);
@@ -549,14 +551,8 @@ out_noclean:
 static int curl_aio_flush(void *opaque)
 {
     BDRVCURLState *s = opaque;
-    int i, j;
-
-    for (i=0; i < CURL_NUM_STATES; i++) {
-        for(j=0; j < CURL_NUM_ACB; j++) {
-            if (s->states[i].acb[j]) {
-                return 1;
-            }
-        }
+    if (!QLIST_EMPTY(&s->acbs)) {
+        return 1;
     }
     return 0;
 }
@@ -579,7 +575,7 @@ static void curl_readv_bh_cb(void *p)
     CURLAIOCB *acb = p;
     BDRVCURLState *s = acb->common.bs->opaque;
     int64_t aio_base, aio_bytes;
-    int64_t start, end;
+    int running;
 
     qemu_bh_delete(acb->bh);
     acb->bh = NULL;
@@ -587,7 +583,9 @@ static void curl_readv_bh_cb(void *p)
     aio_base = acb->sector_num * SECTOR_SIZE;
     aio_bytes = acb->nb_sectors * SECTOR_SIZE;
 
-    start = acb->sector_num * SECTOR_SIZE;
+    if (aio_base + aio_bytes > s->len) {
+        goto err_release;
+    }
 
     cache = curl_find_cache(s, aio_base, aio_bytes);
     if (cache) {
@@ -598,29 +596,41 @@ static void curl_readv_bh_cb(void *p)
     // No cache found, so let's start a new request
     state = curl_init_state(s);
     if (!state) {
-        acb->common.cb(acb->common.opaque, -EIO);
-        qemu_aio_release(acb);
-        return;
+        goto err_release;
     }
 
-    acb->start = 0;
-    acb->end = (acb->nb_sectors * SECTOR_SIZE);
-
-    state->buf_off = 0;
-    if (state->orig_buf)
-        g_free(state->orig_buf);
-    state->buf_start = start;
-    state->buf_len = acb->end + s->readahead_size;
-    end = MIN(start + state->buf_len, s->len) - 1;
-    state->orig_buf = g_malloc(state->buf_len);
-    state->acb[0] = acb;
-
-    snprintf(state->range, sizeof(state->range) - 1, "%zd-%zd", start, end);
-    DPRINTF("CURL (AIO): Reading %d at %zd (%s)\n",
-            (acb->nb_sectors * SECTOR_SIZE), start, state->range);
-    curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range);
+    cache = g_malloc0(sizeof(CURLDataCache));
+    cache->base_pos = acb->sector_num * SECTOR_SIZE;
+    cache->data_len = aio_bytes + s->readahead_size;
+    cache->write_pos = 0;
+    cache->data = g_malloc(cache->data_len);
 
+    QLIST_INSERT_HEAD(&s->acbs, acb, next);
+    snprintf(state->range, sizeof(state->range) - 1, "%zd-%zd", 
cache->base_pos,
+             cache->base_pos + cache->data_len);
+    DPRINTF("Reading range: %s\n", state->range);
+    curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range);
+    QLIST_INSERT_HEAD(&s->cache, cache, next);
+    state->cache = cache;
+    cache->use_count++;
     curl_multi_add_handle(s->multi, state->curl);
+    /* kick off curl to start the action */
+    curl_multi_socket_action(s->multi, 0, CURL_SOCKET_TIMEOUT, &running);
+    return;
+
+err_release:
+    if (cache) {
+        if (cache->data) {
+            g_free(cache->data);
+            cache->data = NULL;
+        }
+        g_free(cache);
+        cache = NULL;
+    }
+    acb->common.cb(acb->common.opaque, -EIO);
+    qemu_aio_release(acb);
+    return;
+
 
 }
 
@@ -667,14 +677,18 @@ static void curl_close(BlockDriverState *bs)
             curl_easy_cleanup(s->states[i].curl);
             s->states[i].curl = NULL;
         }
-        if (s->states[i].orig_buf) {
-            g_free(s->states[i].orig_buf);
-            s->states[i].orig_buf = NULL;
-        }
     }
     if (s->multi)
         curl_multi_cleanup(s->multi);
 
+    while (!QLIST_EMPTY(&s->acbs)) {
+        CURLAIOCB *acb = QLIST_FIRST(&s->acbs);
+        acb->common.cb(acb->common.opaque, -EIO);
+        QLIST_REMOVE(acb, next);
+        qemu_aio_release(acb);
+        acb = NULL;
+    }
+
     while (!QLIST_EMPTY(&s->cache)) {
         CURLDataCache *cache = QLIST_FIRST(&s->cache);
         if (cache->data) {
-- 
1.8.2.3




reply via email to

[Prev in Thread] Current Thread [Next in Thread]