qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH -v5] ceph/rbd block driver for qemu-kvm


From: Yehuda Sadeh Weinraub
Subject: Re: [Qemu-devel] [PATCH -v5] ceph/rbd block driver for qemu-kvm
Date: Sat, 9 Oct 2010 08:53:31 -0700

On Sat, Oct 9, 2010 at 2:21 AM, Stefan Hajnoczi <address@hidden> wrote:
> On Fri, Oct 8, 2010 at 8:00 PM, Yehuda Sadeh <address@hidden> wrote:
> No flush operation is supported.  Can the guest be sure written data
> is on stable storage when it receives completion?
>
That's part of the consistency that rados provides.

>> +/*
>> + * This aio completion is being called from rbd_aio_event_reader() and
>> + * runs in qemu context. It schedules a bh, but just in case the aio
>> + * was not cancelled before.
>
> Cancellation looks unsafe to me because acb is freed for cancel but
> then accessed here!  Also see my comment on aio_cancel() below.

Right. Added ->cancelled field instead of releasing the acb.

>
>> +/*
>> + * Cancel aio. Since we don't reference acb in a non qemu threads,
>> + * it is safe to access it here.
>> + */
>> +static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
>> +{
>> +    RBDAIOCB *acb = (RBDAIOCB *) blockacb;
>> +    qemu_bh_delete(acb->bh);
>> +    acb->bh = NULL;
>> +    qemu_aio_release(acb);
>
> Any pending librados completions are still running here and will then
> cause acb to be accessed after they complete.  If there is no safe way
> to cancel then wait for the request to complete.

Yeah, we'll keep the acb alive now and just set the ->cancelled.  When
last rados cb arrives for this acb we'll clean it up.
>
>> +}
>> +
>> +static AIOPool rbd_aio_pool = {
>> +    .aiocb_size = sizeof(RBDAIOCB),
>> +    .cancel = rbd_aio_cancel,
>> +};
>> +
>> +/*
>> + * This is the callback function for rados_aio_read and _write
>> + *
>> + * Note: this function is being called from a non qemu thread so
>> + * we need to be careful about what we do here. Generally we only
>> + * write to the block notification pipe, and do the rest of the
>> + * io completion handling from rbd_aio_event_reader() which
>> + * runs in a qemu context.
>
> Do librados threads have all signals blocked?  QEMU uses signals so it
> is important that this signal not get sent to a librados thread and
> discarded.  I have seen this issue in the past when using threaded
> libraries in QEMU.

We're no longer blocking threads. We've hit that before too.

>
>> + */
>> +static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
>> +{
>> +    rcb->ret = rados_aio_get_return_value(c);
>> +    rados_aio_release(c);
>> +    if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(&rcb)) < 0) {
>
> You are writing RADOSCB* so sizeof(rcb) should be used.
Oops.

>
>> +        error_report("failed writing to acb->s->fds\n");
>> +        qemu_free(rcb);
>> +    }
>> +}
>> +
>> +/* Callback when all queued rados_aio requests are complete */
>> +
>> +static void rbd_aio_bh_cb(void *opaque)
>> +{
>> +    RBDAIOCB *acb = opaque;
>> +
>> +    if (!acb->write) {
>> +        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
>> +    }
>> +    qemu_vfree(acb->bounce);
>> +    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
>> +    qemu_bh_delete(acb->bh);
>> +    acb->bh = NULL;
>> +
>> +    qemu_aio_release(acb);
>> +}
>> +
>> +static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
>> +                                           int64_t sector_num,
>> +                                           QEMUIOVector *qiov,
>> +                                           int nb_sectors,
>> +                                           BlockDriverCompletionFunc *cb,
>> +                                           void *opaque, int write)
>> +{
>> +    RBDAIOCB *acb;
>> +    RADOSCB *rcb;
>> +    rados_completion_t c;
>> +    char n[RBD_MAX_SEG_NAME_SIZE];
>> +    int64_t segnr, segoffs, segsize, last_segnr;
>> +    int64_t off, size;
>> +    char *buf;
>> +
>> +    BDRVRBDState *s = bs->opaque;
>> +
>> +    acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
>> +    acb->write = write;
>> +    acb->qiov = qiov;
>> +    acb->bounce = qemu_blockalign(bs, qiov->size);
>> +    acb->aiocnt = 0;
>> +    acb->ret = 0;
>> +    acb->error = 0;
>> +    acb->s = s;
>> +
>> +    if (!acb->bh) {
>> +        acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
>> +    }
>
> When do you expect acb->bh to be non-NULL?

I guess this part was copied from somewhere else. We're not expecting
it as we clean it up always, so check is removed.

>
>> +
>> +    if (write) {
>> +        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
>> +    }
>> +
>> +    buf = acb->bounce;
>> +
>> +    off = sector_num * BDRV_SECTOR_SIZE;
>> +    size = nb_sectors * BDRV_SECTOR_SIZE;
>> +    segnr = off / s->objsize;
>> +    segoffs = off % s->objsize;
>> +    segsize = s->objsize - segoffs;
>> +
>> +    last_segnr = ((off + size - 1) / s->objsize);
>> +    acb->aiocnt = (last_segnr - segnr) + 1;
>> +
>> +    s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */
>> +
>> +    if (write && s->read_only) {
>> +        acb->ret = -EROFS;
>> +        return NULL;
>> +    }
>
> block.c:bdrv_aio_writev() will reject writes to read-only block
> devices.  This check can be eliminated and it also prevents leaking
> acb here.

Removed.

Thanks,
Yehuda

diff --git a/block/rbd.c b/block/rbd.c
index a51fc36..575e481 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -51,6 +51,7 @@ typedef struct RBDAIOCB {
     int aiocnt;
     int error;
     struct BDRVRBDState *s;
+    int cancelled;
 } RBDAIOCB;

 typedef struct RADOSCB {
@@ -325,8 +326,18 @@ static void rbd_complete_aio(RADOSCB *rcb)
     int64_t r;
     int i;

-    r = rcb->ret;
     acb->aiocnt--;
+
+    if (acb->cancelled) {
+        if (!acb->aiocnt) {
+            qemu_vfree(acb->bounce);
+            qemu_aio_release(acb);
+        }
+        goto done;
+    }
+
+    r = rcb->ret;
+
     if (acb->write) {
         if (r < 0) {
             acb->ret = r;
@@ -356,6 +367,7 @@ static void rbd_complete_aio(RADOSCB *rcb)
     if (!acb->aiocnt && acb->bh) {
         qemu_bh_schedule(acb->bh);
     }
+done:
     qemu_free(rcb);
     i = 0;
 }
@@ -585,7 +597,7 @@ static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
     RBDAIOCB *acb = (RBDAIOCB *) blockacb;
     qemu_bh_delete(acb->bh);
     acb->bh = NULL;
-    qemu_aio_release(acb);

+    acb->cancelled = 1;
 }

 static AIOPool rbd_aio_pool = {
@@ -606,7 +618,7 @@ static void rbd_finish_aiocb(rados_completion_t c,
RADOSCB *rcb)
 {
     rcb->ret = rados_aio_get_return_value(c);
     rados_aio_release(c);
-    if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(&rcb)) < 0) {
+    if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(rcb)) < 0) {
         error_report("failed writing to acb->s->fds\n");
         qemu_free(rcb);
     }
@@ -654,10 +666,8 @@ static BlockDriverAIOCB
*rbd_aio_rw_vector(BlockDriverState *bs,
     acb->ret = 0;
     acb->error = 0;
     acb->s = s;
-
-    if (!acb->bh) {
-        acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
-    }
+    acb->cancelled = 0;
+    acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);

     if (write) {
         qemu_iovec_to_buffer(acb->qiov, acb->bounce);
@@ -676,11 +686,6 @@ static BlockDriverAIOCB
*rbd_aio_rw_vector(BlockDriverState *bs,

     s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */

-    if (write && s->read_only) {
-        acb->ret = -EROFS;
-        return NULL;
-    }
-
     while (size > 0) {
         if (size < segsize) {
             segsize = size;



reply via email to

[Prev in Thread] Current Thread [Next in Thread]