[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH -v5] ceph/rbd block driver for qemu-kvm
From: |
Yehuda Sadeh Weinraub |
Subject: |
Re: [Qemu-devel] [PATCH -v5] ceph/rbd block driver for qemu-kvm |
Date: |
Sat, 9 Oct 2010 08:53:31 -0700 |
On Sat, Oct 9, 2010 at 2:21 AM, Stefan Hajnoczi <address@hidden> wrote:
> On Fri, Oct 8, 2010 at 8:00 PM, Yehuda Sadeh <address@hidden> wrote:
> No flush operation is supported. Can the guest be sure written data
> is on stable storage when it receives completion?
>
That's part of the consistency that rados provides.
>> +/*
>> + * This aio completion is being called from rbd_aio_event_reader() and
>> + * runs in qemu context. It schedules a bh, but just in case the aio
>> + * was not cancelled before.
>
> Cancellation looks unsafe to me because acb is freed for cancel but
> then accessed here! Also see my comment on aio_cancel() below.
Right. Added ->cancelled field instead of releasing the acb.
>
>> +/*
>> + * Cancel aio. Since we don't reference acb in a non qemu threads,
>> + * it is safe to access it here.
>> + */
>> +static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
>> +{
>> + RBDAIOCB *acb = (RBDAIOCB *) blockacb;
>> + qemu_bh_delete(acb->bh);
>> + acb->bh = NULL;
>> + qemu_aio_release(acb);
>
> Any pending librados completions are still running here and will then
> cause acb to be accessed after they complete. If there is no safe way
> to cancel then wait for the request to complete.
Yeah, we'll keep the acb alive now and just set the ->cancelled. When
last rados cb arrives for this acb we'll clean it up.
>
>> +}
>> +
>> +static AIOPool rbd_aio_pool = {
>> + .aiocb_size = sizeof(RBDAIOCB),
>> + .cancel = rbd_aio_cancel,
>> +};
>> +
>> +/*
>> + * This is the callback function for rados_aio_read and _write
>> + *
>> + * Note: this function is being called from a non qemu thread so
>> + * we need to be careful about what we do here. Generally we only
>> + * write to the block notification pipe, and do the rest of the
>> + * io completion handling from rbd_aio_event_reader() which
>> + * runs in a qemu context.
>
> Do librados threads have all signals blocked? QEMU uses signals so it
> is important that this signal not get sent to a librados thread and
> discarded. I have seen this issue in the past when using threaded
> libraries in QEMU.
We're no longer blocking threads. We've hit that before too.
>
>> + */
>> +static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
>> +{
>> + rcb->ret = rados_aio_get_return_value(c);
>> + rados_aio_release(c);
>> + if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(&rcb)) < 0) {
>
> You are writing RADOSCB* so sizeof(rcb) should be used.
Oops.
>
>> + error_report("failed writing to acb->s->fds\n");
>> + qemu_free(rcb);
>> + }
>> +}
>> +
>> +/* Callback when all queued rados_aio requests are complete */
>> +
>> +static void rbd_aio_bh_cb(void *opaque)
>> +{
>> + RBDAIOCB *acb = opaque;
>> +
>> + if (!acb->write) {
>> + qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
>> + }
>> + qemu_vfree(acb->bounce);
>> + acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
>> + qemu_bh_delete(acb->bh);
>> + acb->bh = NULL;
>> +
>> + qemu_aio_release(acb);
>> +}
>> +
>> +static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
>> + int64_t sector_num,
>> + QEMUIOVector *qiov,
>> + int nb_sectors,
>> + BlockDriverCompletionFunc *cb,
>> + void *opaque, int write)
>> +{
>> + RBDAIOCB *acb;
>> + RADOSCB *rcb;
>> + rados_completion_t c;
>> + char n[RBD_MAX_SEG_NAME_SIZE];
>> + int64_t segnr, segoffs, segsize, last_segnr;
>> + int64_t off, size;
>> + char *buf;
>> +
>> + BDRVRBDState *s = bs->opaque;
>> +
>> + acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
>> + acb->write = write;
>> + acb->qiov = qiov;
>> + acb->bounce = qemu_blockalign(bs, qiov->size);
>> + acb->aiocnt = 0;
>> + acb->ret = 0;
>> + acb->error = 0;
>> + acb->s = s;
>> +
>> + if (!acb->bh) {
>> + acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
>> + }
>
> When do you expect acb->bh to be non-NULL?
I guess this part was copied from somewhere else. We're not expecting
it as we clean it up always, so check is removed.
>
>> +
>> + if (write) {
>> + qemu_iovec_to_buffer(acb->qiov, acb->bounce);
>> + }
>> +
>> + buf = acb->bounce;
>> +
>> + off = sector_num * BDRV_SECTOR_SIZE;
>> + size = nb_sectors * BDRV_SECTOR_SIZE;
>> + segnr = off / s->objsize;
>> + segoffs = off % s->objsize;
>> + segsize = s->objsize - segoffs;
>> +
>> + last_segnr = ((off + size - 1) / s->objsize);
>> + acb->aiocnt = (last_segnr - segnr) + 1;
>> +
>> + s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */
>> +
>> + if (write && s->read_only) {
>> + acb->ret = -EROFS;
>> + return NULL;
>> + }
>
> block.c:bdrv_aio_writev() will reject writes to read-only block
> devices. This check can be eliminated and it also prevents leaking
> acb here.
Removed.
Thanks,
Yehuda
diff --git a/block/rbd.c b/block/rbd.c
index a51fc36..575e481 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -51,6 +51,7 @@ typedef struct RBDAIOCB {
int aiocnt;
int error;
struct BDRVRBDState *s;
+ int cancelled;
} RBDAIOCB;
typedef struct RADOSCB {
@@ -325,8 +326,18 @@ static void rbd_complete_aio(RADOSCB *rcb)
int64_t r;
int i;
- r = rcb->ret;
acb->aiocnt--;
+
+ if (acb->cancelled) {
+ if (!acb->aiocnt) {
+ qemu_vfree(acb->bounce);
+ qemu_aio_release(acb);
+ }
+ goto done;
+ }
+
+ r = rcb->ret;
+
if (acb->write) {
if (r < 0) {
acb->ret = r;
@@ -356,6 +367,7 @@ static void rbd_complete_aio(RADOSCB *rcb)
if (!acb->aiocnt && acb->bh) {
qemu_bh_schedule(acb->bh);
}
+done:
qemu_free(rcb);
i = 0;
}
@@ -585,7 +597,7 @@ static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
RBDAIOCB *acb = (RBDAIOCB *) blockacb;
qemu_bh_delete(acb->bh);
acb->bh = NULL;
- qemu_aio_release(acb);
+ acb->cancelled = 1;
}
static AIOPool rbd_aio_pool = {
@@ -606,7 +618,7 @@ static void rbd_finish_aiocb(rados_completion_t c,
RADOSCB *rcb)
{
rcb->ret = rados_aio_get_return_value(c);
rados_aio_release(c);
- if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(&rcb)) < 0) {
+ if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(rcb)) < 0) {
error_report("failed writing to acb->s->fds\n");
qemu_free(rcb);
}
@@ -654,10 +666,8 @@ static BlockDriverAIOCB
*rbd_aio_rw_vector(BlockDriverState *bs,
acb->ret = 0;
acb->error = 0;
acb->s = s;
-
- if (!acb->bh) {
- acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
- }
+ acb->cancelled = 0;
+ acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
if (write) {
qemu_iovec_to_buffer(acb->qiov, acb->bounce);
@@ -676,11 +686,6 @@ static BlockDriverAIOCB
*rbd_aio_rw_vector(BlockDriverState *bs,
s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */
- if (write && s->read_only) {
- acb->ret = -EROFS;
- return NULL;
- }
-
while (size > 0) {
if (size < segsize) {
segsize = size;