qemu-block
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH 5/7] block/rbd: migrate from aio to coroutines


From: Peter Lieven
Subject: Re: [PATCH 5/7] block/rbd: migrate from aio to coroutines
Date: Thu, 14 Jan 2021 20:39:09 +0100
User-agent: Mozilla/5.0 (X11; Linux x86_64; rv:68.0) Gecko/20100101 Thunderbird/68.10.0

Am 14.01.21 um 20:19 schrieb Jason Dillaman:
> On Sun, Dec 27, 2020 at 11:42 AM Peter Lieven <pl@kamp.de> wrote:
>> Signed-off-by: Peter Lieven <pl@kamp.de>
>> ---
>>  block/rbd.c | 247 ++++++++++++++++++----------------------------------
>>  1 file changed, 84 insertions(+), 163 deletions(-)
>>
>> diff --git a/block/rbd.c b/block/rbd.c
>> index 27b232f4d8..2d77d0007f 100644
>> --- a/block/rbd.c
>> +++ b/block/rbd.c
>> @@ -66,22 +66,6 @@ typedef enum {
>>      RBD_AIO_FLUSH
>>  } RBDAIOCmd;
>>
>> -typedef struct RBDAIOCB {
>> -    BlockAIOCB common;
>> -    int64_t ret;
>> -    QEMUIOVector *qiov;
>> -    RBDAIOCmd cmd;
>> -    int error;
>> -    struct BDRVRBDState *s;
>> -} RBDAIOCB;
>> -
>> -typedef struct RADOSCB {
>> -    RBDAIOCB *acb;
>> -    struct BDRVRBDState *s;
>> -    int64_t size;
>> -    int64_t ret;
>> -} RADOSCB;
>> -
>>  typedef struct BDRVRBDState {
>>      rados_t cluster;
>>      rados_ioctx_t io_ctx;
>> @@ -94,6 +78,13 @@ typedef struct BDRVRBDState {
>>      AioContext *aio_context;
>>  } BDRVRBDState;
>>
>> +typedef struct RBDTask {
>> +    BDRVRBDState *s;
>> +    Coroutine *co;
>> +    bool complete;
>> +    int64_t ret;
>> +} RBDTask;
>> +
>>  static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
>>                              BlockdevOptionsRbd *opts, bool cache,
>>                              const char *keypairs, const char *secretid,
>> @@ -316,13 +307,6 @@ static int qemu_rbd_set_keypairs(rados_t cluster, const 
>> char *keypairs_json,
>>      return ret;
>>  }
>>
>> -static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
>> -{
>> -    RBDAIOCB *acb = rcb->acb;
>> -    iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
>> -               acb->qiov->size - offs);
>> -}
>> -
>>  /* FIXME Deprecate and remove keypairs or make it available in QMP. */
>>  static int qemu_rbd_do_create(BlockdevCreateOptions *options,
>>                                const char *keypairs, const char 
>> *password_secret,
>> @@ -440,46 +424,6 @@ exit:
>>      return ret;
>>  }
>>
>> -/*
>> - * This aio completion is being called from rbd_finish_bh() and runs in qemu
>> - * BH context.
>> - */
>> -static void qemu_rbd_complete_aio(RADOSCB *rcb)
>> -{
>> -    RBDAIOCB *acb = rcb->acb;
>> -    int64_t r;
>> -
>> -    r = rcb->ret;
>> -
>> -    if (acb->cmd != RBD_AIO_READ) {
>> -        if (r < 0) {
>> -            acb->ret = r;
>> -            acb->error = 1;
>> -        } else if (!acb->error) {
>> -            acb->ret = rcb->size;
>> -        }
>> -    } else {
>> -        if (r < 0) {
>> -            qemu_rbd_memset(rcb, 0);
>> -            acb->ret = r;
>> -            acb->error = 1;
>> -        } else if (r < rcb->size) {
>> -            qemu_rbd_memset(rcb, r);
>> -            if (!acb->error) {
>> -                acb->ret = rcb->size;
>> -            }
>> -        } else if (!acb->error) {
>> -            acb->ret = r;
>> -        }
>> -    }
>> -
>> -    g_free(rcb);
>> -
>> -    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
>> -
>> -    qemu_aio_unref(acb);
>> -}
>> -
>>  static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
>>  {
>>      const char **vals;
>> @@ -817,88 +761,49 @@ static int qemu_rbd_resize(BlockDriverState *bs, 
>> uint64_t size)
>>      return 0;
>>  }
>>
>> -static const AIOCBInfo rbd_aiocb_info = {
>> -    .aiocb_size = sizeof(RBDAIOCB),
>> -};
>> -
>> -static void rbd_finish_bh(void *opaque)
>> +static void qemu_rbd_finish_bh(void *opaque)
>>  {
>> -    RADOSCB *rcb = opaque;
>> -    qemu_rbd_complete_aio(rcb);
>> +    RBDTask *task = opaque;
>> +    task->complete = 1;
>> +    aio_co_wake(task->co);
>>  }
>>
>> -/*
>> - * This is the callback function for rbd_aio_read and _write
>> - *
>> - * Note: this function is being called from a non qemu thread so
>> - * we need to be careful about what we do here. Generally we only
>> - * schedule a BH, and do the rest of the io completion handling
>> - * from rbd_finish_bh() which runs in a qemu context.
>> - */
>> -static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
>> +static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
>>  {
>> -    RBDAIOCB *acb = rcb->acb;
>> -
>> -    rcb->ret = rbd_aio_get_return_value(c);
>> +    task->ret = rbd_aio_get_return_value(c);
>>      rbd_aio_release(c);
>> -
>> -    replay_bh_schedule_oneshot_event(acb->s->aio_context, rbd_finish_bh, 
>> rcb);
>> +    aio_bh_schedule_oneshot(task->s->aio_context, qemu_rbd_finish_bh, task);
>>  }
>>
>> -static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
>> -                                 int64_t off,
>> -                                 QEMUIOVector *qiov,
>> -                                 int64_t size,
>> -                                 BlockCompletionFunc *cb,
>> -                                 void *opaque,
>> -                                 RBDAIOCmd cmd)
>> +static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
>> +                                          uint64_t offset,
>> +                                          uint64_t bytes,
>> +                                          QEMUIOVector *qiov,
>> +                                          int flags,
>> +                                          RBDAIOCmd cmd)
>>  {
>> -    RBDAIOCB *acb;
>> -    RADOSCB *rcb = NULL;
>> -    rbd_completion_t c;
>> -    int r;
>> -
>>      BDRVRBDState *s = bs->opaque;
>> +    RBDTask task = { .s = s, .co = qemu_coroutine_self() };
>> +    rbd_completion_t c;
>> +    int r, ret = -EIO;
>>
>> -    acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
>> -    acb->cmd = cmd;
>> -    acb->qiov = qiov;
>> -    assert(!qiov || qiov->size == size);
>> -
>> -    rcb = g_new(RADOSCB, 1);
>> -
>> -    acb->ret = 0;
>> -    acb->error = 0;
>> -    acb->s = s;
>> +    assert(!qiov || qiov->size == bytes);
>>
>> -    rcb->acb = acb;
>> -    rcb->s = acb->s;
>> -    rcb->size = size;
>> -    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, 
>> &c);
>> +    r = rbd_aio_create_completion(&task,
>> +                                  (rbd_callback_t) qemu_rbd_completion_cb, 
>> &c);
>>      if (r < 0) {
>>          goto failed;
>>      }
>>
>>      switch (cmd) {
>> -    case RBD_AIO_WRITE:
>> -        /*
>> -         * RBD APIs don't allow us to write more than actual size, so in 
>> order
>> -         * to support growing images, we resize the image before write
>> -         * operations that exceed the current size.
>> -         */
>> -        if (off + size > s->image_size) {
>> -            r = qemu_rbd_resize(bs, off + size);
> I realize this is inherited code that is being refactored, but is it
> really possible for QEMU to issue IOs outside the block extents? This
> also has the same issue as the previous commit comment where the
> "image_size" is only initialized at image open time and therefore this
> path will keep getting hit if IOs are issued beyond the original block
> device extents.


This is for the case that you came to the idea that putting e.g. a qcow2 file 
on rbd which actually can grow during runtime.

If you put a raw image on an rbd image the only way to grow is with 
block_resize (which invoces truncate) or with qemu-img resize...


I personally was unsure if it makes sense at all to supprt anything except raw 
on rbd, but it was supported before so I left this in.


>
>> -            if (r < 0) {
>> -                goto failed_completion;
>> -            }
>> -        }
>> -        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
>> -        break;
>>      case RBD_AIO_READ:
>> -        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
>> +        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
>> +        break;
>> +    case RBD_AIO_WRITE:
>> +        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
>>          break;
>>      case RBD_AIO_DISCARD:
>> -        r = rbd_aio_discard(s->image, off, size, c);
>> +        r = rbd_aio_discard(s->image, offset, bytes, c);
>>          break;
>>      case RBD_AIO_FLUSH:
>>          r = rbd_aio_flush(s->image, c);
>> @@ -908,44 +813,71 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
>>      }
>>
>>      if (r < 0) {
>> +        task.complete = 1;
>> +        task.ret = r;
>> +    }
>> +
>> +    while (!task.complete) {
>> +        qemu_coroutine_yield();
>> +    }
>> +
>> +    if (task.ret < 0) {
>> +        error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
>> +                     PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, 
>> offset,
>> +                     bytes, flags, task.ret, strerror(-task.ret));
>>          goto failed_completion;
>>      }
>> -    return &acb->common;
>> +
>> +    /* zero pad short reads */
>> +    if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
>> +        qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
>> +    }
>> +
>> +    return 0;
>>
>>  failed_completion:
>>      rbd_aio_release(c);
> Wasn't this already released in "qemu_rbd_completion_cb"?


You are right, in case of task.ret < 0 we need to go to "failed" and not 
"failed_completion".

If the I/O is successful we leave the coroutine with "return 0" just above the 
label.


Peter




reply via email to

[Prev in Thread] Current Thread [Next in Thread]