qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [RFC PATCH v2] Replication agent module


From: Stefan Hajnoczi
Subject: Re: [Qemu-devel] [RFC PATCH v2] Replication agent module
Date: Mon, 2 Apr 2012 12:22:40 +0100
User-agent: Mutt/1.5.21 (2010-09-15)

On Sun, Apr 01, 2012 at 03:05:48PM +0300, Ori Mamluk wrote:

Feedback on specific points below.  The main thing to think about is how
to integrate with QEMU's event loop.  You have used threads in places
but are also using qemu_set_fd_handler().  Most of QEMU's functions
(including the block layer) cannot be executed from arbitrary threads
because they are not thread-safe.  So it probably makes sense to run all
repagent network code in fd handler functions as part of the QEMU event
loop.

> By default the build and use of this module is disabled. To activate it
> you need to use a flag in ./configure and a commandline switch to qemu.

If the code is portable and doesn't have specific external library
dependencies then it's good to enable it by default.  Distros and users
can decide to disable it but I think building everything by default
makes sense - it reduces bitrot because people will be building your
code on different host platforms all the time.

> Missing features:
> * Dirty bitmap at the protected side
>       The protected volume side needs to persistently keep track of
> 'pending' IOs.
>       I want to add a bitmap (can hook like another 'filter' with the
> repagent), that will synchronously track IOs. And allow getting a list of
> such IOs.
>       Without such a bitmap a failure of any component (rephub or agent)
> will require reading the whole protected volume.

To double-check that I understand the point of this:

If the rephub is unavailable for a period of time and then comes up
again the rephub will be able to request only the regions of the
replicated volume that have been modified.

If the host or QEMU crashes does the dirty bitmap come into play?

> * Capture IOs at the recovery side
>       During fail-over or fail-over test, the repagent at the recovery
> side needs to capture all IOs (reads and writes) done by the fail-over VM
>       and answer them by passing them synchronously to the VRA.

Does fail-over mean that some management stack functionality will spawn
a new "recovery" VM if a VM disappears?  The recovery VM will have
access to the replicated volumes and will itself run repagent.

Can you explain this step-by-step?  What is a "VRA"?

> * I wrote a stand-alone Rephub for actively reading the protected volume
> and copying it - for test purposes.

Cool, would be great to see a rephub implementation.

> @@ -685,6 +693,12 @@ int bdrv_open(BlockDriverState *bs, const char
> *filename, int flags,
>      int ret;
>      char tmp_filename[PATH_MAX];
> 
> +    if (use_repagent && bs->device_name[0] != '\0' && strcmp(
> +            bs->device_name, "pflash0") != 0) {
> +        BlockDriver *bdrv_repagent = bdrv_find_format("repagent");
> +        drv = bdrv_repagent;
> +    }
> +
>      if (flags & BDRV_O_SNAPSHOT) {
>          BlockDriverState *bs1;
>          int64_t total_size;

You could add a -drive option.  See blockdev.c:drive_init().

  -drive ...,repagent=on|off  (default "off")

There still needs to be a way to configure the rephub network connection
details.  Is it ever useful to connect one volume to rephub A and
another volume to rephub B, then the connection details would have to be
per-drive?

> @@ -1842,6 +1866,18 @@ static int coroutine_fn
> bdrv_co_do_writev(BlockDriverState *bs,
>          ret = drv->bdrv_co_writev(bs, sector_num, nb_sectors, qiov);
>      }
> 
> +
> +#ifdef CONFIG_REPAGENT
> +    if (bs->device_name[0] != '\0') {
> +        /* We split the IO only at the highest stack driver layer.
> +           Currently we know that by checking device_name - only
> +           highest level (closest to the guest) has that name.
> +           */
> +/*           repagent_handle_protected_write(bs, sector_num,
> +                nb_sectors, qiov, ret);

The top-level BlockDriverState requirement can be handled if you do
something like this in blockdev.c:drive_init():

  if (use_repagent) {
      repagent_start(bs);
  }

Only the top-level BlockDriverState (bs) would be tracked.

> diff --git a/block/repagent/qemu-repagent.txt
> b/block/repagent/qemu-repagent.txt
> new file mode 100644
> index 0000000..f8def3f
> --- /dev/null
> +++ b/block/repagent/qemu-repagent.txt
> @@ -0,0 +1,73 @@
> +    repagent - replication agent - a Qemu module for enabling continuous
> async replication of VM volumes
> +
> +Introduction
> +    This document describes a feature in Qemu - a replication agent
> (named Repagent).
> +    The Repagent is a new module that exposes an API to an external
> replication system (AKA Rephub).
> +    This API allows a Rephub to communicate with a Qemu VM and
> continuously replicate its volumes.
> +    The imlementation of a Rephub is outside of the scope of this
> document. There may be several various Rephub

s/imlementation/implementation/

> +    implenetations using the same repagent in Qemu.

s/implenetations/implementations/

> +    The Repagent is storage driver that acts like a filter driver.

The Repagent driver acts like a filter driver.

> +    It can be regarded as a 'plugin' that is activated when the
> management system enables replication.
> +
> +Main feature of Repagent
> +    Repagent has the following main features:
> +    * Report volumes - report a list of all volumes in a VM to the
> Rephub.
> +    * Mirror writes - Report writes to a volume - send all writes made to
> a protected volume to the Rephub.
> +        The reporting of an IO is asyncronuous - i.e. the IO is not

s/asyncronuous/asynchronous/

> delayed by the Repagent to get any acknowledgement from the Rephub.
> +        It is only copied to the Rephub.
> +    * Remote IO - Read/write a  volume - allows the Rephub to read a
> protected volume, to enable the protected hub to syncronize

s/syncronize/synchronize/

> +       the content of a protected volume.
> +       Also used to read/write to a recovery volume - the replica of a
> protected volume.

It sounds like the write operation will be used to populate a recovery
volume?

> +
> +Description of the Repagent module
> +
> +Build and run options
> +    New configure option: --enable-repagent
> +    New command line option:
> +    -repagent [hub IP/name]
> +                    Enable replication support for disks
> +                    hub is the ip or name of the machine running the
> replication hub.
> +
> +Module APIs
> +    The Repagent module interfaces two main components:
> +    1. The Rephub - An external API based on socket messages
> +           See detailed comments about each message in rephub_cmds.h
> +    2. The generic block layer- block.c
> +        Repagent is a block driver. Most of the block driver functions
> are just a pass-through
> +        to the next driver.
> +        Writes are mirrors to the hub for replication
> +        Open function is used for registering each volume in Repagent.
> +
> +
> +General description of a Rephub  - a replication system the repagent
> connects to
> +    This section describes in high level a sample Rephub - a replication
> system that uses the repagent API
> +    to replicate disks.
> +    It describes a simple Rephub that comntinuously maintains a mirror of
> the volumes of a VM.
> +
> +    Say we have a VM we want to protect - call it PVM, say it has 2
> volumes - V1, V2.
> +    Our Rephub is called SingleRephub - a Rephub protecting a single VM.
> +
> +    Preparations
> +    1. The user chooses a host to rub SingleRephub - a different host

s/rub/run/

> than PVM, call it Host2
> +    2. The user creates two volumes on Host2 - same sizes of V1 and V2,
> call them V1R (V1 recovery) and V2R.
> +    3. The user runs SingleRephub process on Host2, and gives V1R and V2R
> as command line arguments.
> +        From now on SingleRephub waits for the protected VM repagent to
> connect.
> +    4. The user runs the protected VM PVM - and uses the switch -repagent
> <Host2 IP>.
> +
> +    Runtime
> +    1. The repagent module connects to SingleRephub on startup.
> +    2. repagent reports V1 and V2 to SingleRephub.
> +    3. SingleRephub starts to perform an initial synchronization of the
> protected volumes-
> +        it reads each protected volume (V1 and V2) - using read volume
> requests - and copies the data into the
> +        recovery volume V1R and V2R.
> +    4. SingleRephub enters 'protection' mode - each write to the
> protected volume is sent by the repagent to the Rephub,
> +        and the Rephub performs the write on the matching recovery
> volume.
> +
> +    * Note that during stage 3 writes to the protected volumes are not
> ignored - they're kept in a bitmap,
> +        and will be read again when stage 3 ends, in an interative
> convergin process.

s/convergin/convergence/

> diff --git a/block/repagent/repagent.c b/block/repagent/repagent.c
> new file mode 100644
> index 0000000..bdc0117
> --- /dev/null
> +++ b/block/repagent/repagent.c
> @@ -0,0 +1,355 @@

License header

> +#include <string.h>
> +#include <stdlib.h>
> +#include <stdio.h>
> +#include <pthread.h>
> +#include <stdint.h>
> +
> +#include "block.h"
> +#include "rephub_defs.h"
> +#include "block_int.h"
> +#include "repagent_client.h"
> +#include "repagent.h"
> +#include "rephub_cmds.h"
> +
> +#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj))
> +#define REPAGENT_MAX_NUM_VOLUMES (64)
> +#define REPAGENT_VOLUME_ID_NONE (0)
> +
> +typedef struct RepagentVolume {
> +    uint64_t vol_id;
> +    const char *vol_path;
> +    BlockDriverState *driver_ptr;
> +} RepagentVolume;
> +
> +struct RepAgentState {
> +    int is_init;

Please use bool.  We can use C99 and <stdbool.h>.

> +    int num_volumes;
> +    RepagentVolume *volumes[REPAGENT_MAX_NUM_VOLUMES];

Or use qemu-queue.h to linked lists.  That way there's no arbitrary
compile-time limit.

> +};
> +
> +typedef struct RepagentReadVolIo {
> +    QEMUIOVector qiov;
> +    RepCmdRemoteIoReq rep_cmd;
> +    uint8_t *buf;
> +    struct timeval start_time;
> +} RepagentReadVolIo;
> +
> +static int repagent_get_volume_by_driver(
> +        BlockDriverState *bs);
> +static int repagent_get_volume_by_name(const char *name);
> +static void repagent_report_volumes_to_hub(void);
> +static void repagent_remote_io_done(void *opaque, int ret);
> +static struct timeval tsub(struct timeval t1, struct timeval t2);
> +
> +RepAgentState g_rep_agent = { 0 };
> +
> +void repagent_init(const char *hubname, int port)
> +{
> +    /* It is the responsibility of the thread to free this struct */
> +    rephub_params *pParams = (rephub_params
> *)g_malloc(sizeof(rephub_params));

No need to cast void*.  In C it converts to any pointer type without a
compiler warning.

> +    if (hubname == NULL) {
> +        hubname = "127.0.0.1";
> +    }
> +    if (port == 0) {
> +        port = 9010;
> +    }
> +
> +    printf("repagent_init %s\n", hubname);
> +
> +    pParams->port = port;
> +    pParams->name = g_strdup(hubname);
> +
> +    pthread_t thread_id = 0;
> +
> +    /* Create the repagent client listener thread */
> +    pthread_create(&thread_id, 0, repagent_listen, (void *) pParams);
> +    pthread_detach(thread_id);

Please use qemu-thread.h which includes signal handling necessary for
QEMU threads.

> +}
> +
> +void repagent_register_drive(const char *drive_path,
> +        BlockDriverState *driver_ptr)
> +{
> +    /* Assert that the volume is not registered yet */
> +    int i = repagent_get_volume_by_name(drive_path);
> +    assert(i == -1);

Most of the code uses negative errno int return values.  So here it
would return -EEXIST or -EBUSY.

For more detailed error handling support see error.h.  This function
would take an Error **errp argument and use error_set().

> +
> +    /*Add the volume at the last place*/
> +    assert(g_rep_agent.num_volumes < REPAGENT_MAX_NUM_VOLUMES);
> +
> +    i = g_rep_agent.num_volumes;
> +    g_rep_agent.num_volumes++;
> +
> +    printf("zerto repagent: Registering drive. Num drives %d, path %s\n",
> +            g_rep_agent.num_volumes, drive_path);
> +    g_rep_agent.volumes[i] =
> +            (RepagentVolume *)g_malloc(sizeof(RepagentVolume));
> +    g_rep_agent.volumes[i]->driver_ptr = driver_ptr;
> +    /* orim todo strcpy? */
> +    g_rep_agent.volumes[i]->vol_path = drive_path;

g_strdup() would make the life-cycle easier than keeping a reference to
the caller's string.

> +
> +    /* Orim todo thread-safety? */
> +    g_rep_agent.num_volumes++;
> +
> +    repagent_report_volumes_to_hub();
> +}
> +
> +void repagent_deregister_drive(const char *drive_path,

drive_path is unused.

> +        BlockDriverState *driver_ptr)
> +{
> +    /* Orim todo thread-safety? */
> +    int i = repagent_get_volume_by_driver(driver_ptr);
> +    assert(i != -1);
> +
> +    RepagentVolume *vol = g_rep_agent.volumes[i];
> +    /* Put the last volume in the cell of the removed volume to maintain
> a
> +     * contiguous array */
> +    g_rep_agent.volumes[i] = g_rep_agent.volumes[g_rep_agent.num_volumes
> - 1];
> +    g_rep_agent.volumes[g_rep_agent.num_volumes - 1] = NULL;
> +    g_rep_agent.num_volumes--;
> +    g_free(vol);
> +
> +}
> +/* orim todo destruction? */
> +
> +static int repagent_get_volume_by_driver(
> +        BlockDriverState *bs)
> +{
> +    /* orim todo optimize search */
> +    int i = 0;
> +    for (i = 0; i < g_rep_agent.num_volumes ; i++) {
> +        RepagentVolume *p_vol = g_rep_agent.volumes[i];
> +        if (p_vol != NULL && p_vol->driver_ptr == (void *) bs) {
> +            return i;
> +        }
> +    }
> +    return -1;
> +}
> +
> +void repagent_handle_protected_write(BlockDriverState *bs, int64_t
> sector_num,
> +        int nb_sectors, QEMUIOVector *qiov, int ret_status)
> +{
> +    printf("zerto Protected write offset %lld, size %d, IO return status
> %d",
> +            (long long int) sector_num, nb_sectors, ret_status);
> +    if (bs->filename != NULL) {
> +        printf(", filename %s", bs->filename);
> +    }
> +
> +    printf("\n");
> +
> +    /* orim todo thread safety? */
> +    int i = repagent_get_volume_by_driver(bs);
> +    if (i == -1 || g_rep_agent.volumes[i]->vol_id ==
> REPAGENT_VOLUME_ID_NONE) {
> +        /* Unprotected */
> +        printf("Got a write to an unprotected volume.\n");
> +        return;
> +    }
> +
> +    RepagentVolume *p_vol = g_rep_agent.volumes[i];
> +
> +    /* Report IO to rephub */
> +
> +    int data_size = qiov->size;
> +    if (ret_status < 0) {
> +        /* On failed ios we don't send the data to the hub */
> +        data_size = 0;
> +    }
> +    uint8_t *pdata = NULL;
> +    RepCmdProtectedWrite *p_cmd = (RepCmdProtectedWrite *) repcmd_new(
> +            REPHUB_CMD_PROTECTED_WRITE, data_size, (uint8_t **) &pdata);
> +
> +    if (ret_status >= 0) {
> +        qemu_iovec_to_buffer(qiov, pdata);
> +    }
> +
> +    p_cmd->volume_id = p_vol->vol_id;
> +    p_cmd->offset_sectors = sector_num;
> +    p_cmd->size_sectors = nb_sectors;
> +    p_cmd->ret_status = ret_status;
> +
> +    if (repagent_client_send((RepCmd *) p_cmd) != 0) {
> +        printf("Error sending command\n");
> +    }
> +}
> +
> +static void repagent_report_volumes_to_hub(void)
> +{
> +    /* Report IO to rephub */
> +    int i;
> +    RepCmdDataReportVmVolumes *p_cmd_data = NULL;
> +    RepCmdReportVmVolumes *p_cmd = (RepCmdReportVmVolumes *) repcmd_new(
> +            REPHUB_CMD_REPORT_VM_VOLUMES,
> +            g_rep_agent.num_volumes * sizeof(RepVmVolumeInfo),
> +            (uint8_t **) &p_cmd_data);
> +    p_cmd->num_volumes = g_rep_agent.num_volumes;
> +    printf("reporting %u volumes\n", g_rep_agent.num_volumes);
> +    for (i = 0; i < g_rep_agent.num_volumes ; i++) {
> +        assert(g_rep_agent.volumes[i] != NULL);
> +        printf("reporting volume %s size %u\n",
> +                g_rep_agent.volumes[i]->vol_path,
> +                (uint32_t) sizeof(p_cmd_data->volumes[i].name));
> +        strncpy((char *) p_cmd_data->volumes[i].name,
> +                g_rep_agent.volumes[i]->vol_path,
> +                sizeof(p_cmd_data->volumes[i].name));
> +        p_cmd_data->volumes[i].volume_id =
> g_rep_agent.volumes[i]->vol_id;
> +    }
> +    if (repagent_client_send((RepCmd *) p_cmd) != 0) {
> +        printf("Error sending command\n");
> +    }
> +}
> +
> +int repaget_start_protect(RepCmdStartProtect *pcmd,
> +        RepCmdDataStartProtect *pcmd_data)
> +{
> +    printf("Start protect vol %s, ID %llu\n", pcmd_data->volume_name,
> +            (unsigned long long) pcmd->volume_id);
> +    int vol_index = repagent_get_volume_by_name(pcmd_data->volume_name);
> +    if (g_rep_agent.num_volumes > 0
> +            && strcmp(pcmd_data->volume_name, "stam") == 0) {
> +        /* Choose the first one for rephub */
> +        vol_index = 0;
> +    }
> +    if (vol_index < 0) {
> +        printf("The volume doesn't exist\n");
> +        return TRUE;
> +    }
> +    /* orim todo protect */
> +    g_rep_agent.volumes[vol_index]->vol_id = pcmd->volume_id;
> +
> +    return TRUE;
> +}
> +
> +static int repagent_get_volume_by_name(const char *name)
> +{
> +    int i = 0;
> +    for (i = 0; i < g_rep_agent.num_volumes ; i++) {
> +        if (g_rep_agent.volumes[i] != NULL
> +                && strcmp(name, g_rep_agent.volumes[i]->vol_path) == 0) {
> +            return i;
> +        }
> +    }
> +    return -1;
> +}
> +
> +static int repagent_get_volume_by_id(uint64_t vol_id)
> +{
> +    int i = 0;
> +    for (i = 0; i < g_rep_agent.num_volumes ; i++) {
> +        if (g_rep_agent.volumes[i] != NULL
> +                && g_rep_agent.volumes[i]->vol_id == vol_id) {
> +            return i;
> +        }
> +    }
> +    return -1;
> +}
> +
> +int repagent_remote_io(RepCmdRemoteIoReq *pcmd, uint8_t *pdata)
> +{
> +    int index = repagent_get_volume_by_id(pcmd->volume_id);
> +    int size_bytes = pcmd->size_sectors * 512;
> +    if (index < 0) {
> +        printf("Vol read - Could not find vol id %llx\n",
> +                (unsigned long long int) pcmd->volume_id);
> +        RepCmdRemoteIoRes *p_res_cmd = (RepCmdRemoteIoRes *) repcmd_new(
> +                REPHUB_CMD_REMOTE_IO_RES, 0, NULL);
> +        p_res_cmd->req_id = pcmd->req_id;
> +        p_res_cmd->volume_id = pcmd->volume_id;
> +        p_res_cmd->io_status = -1;
> +        repagent_client_send((RepCmd *) p_res_cmd);
> +        return TRUE;
> +    }
> +
> +    printf("Vol read - driver %p, volId %llu, offset %llu, size %u\n",
> +            g_rep_agent.volumes[index]->driver_ptr,
> +            (unsigned long long int) pcmd->volume_id,
> +            (unsigned long long int) pcmd->offset_sectors,
> pcmd->size_sectors);
> +
> +    {
> +        RepagentReadVolIo *io_xaction = calloc(1,
> sizeof(RepagentReadVolIo));
> +
> +/*        BlockDriverAIOCB *acb; */
> +
> +        ZERO_MEM_OBJ(io_xaction);
> +
> +        qemu_iovec_init(&io_xaction->qiov, 1);
> +
> +        /*read_xact->buf =
> +        qemu_blockalign(g_rep_agent.volumes[index]->driver_ptr,
> size_bytes); */
> +        io_xaction->buf = (uint8_t *) g_malloc(size_bytes);
> +        io_xaction->rep_cmd = *pcmd;
> +        qemu_iovec_add(&io_xaction->qiov, io_xaction->buf, size_bytes);
> +
> +        gettimeofday(&io_xaction->start_time, NULL);
> +        /* orim TODO - use the returned acb to cancel the request on
> shutdown */
> +        /*acb = */
> +        if (pcmd->is_read) {
> +            bdrv_aio_readv(g_rep_agent.volumes[index]->driver_ptr,
> +                    io_xaction->rep_cmd.offset_sectors,
> &io_xaction->qiov,
> +                    io_xaction->rep_cmd.size_sectors,
> repagent_remote_io_done,
> +                    io_xaction);
> +        } else {
> +            bdrv_aio_writev(g_rep_agent.volumes[index]->driver_ptr,
> +                    io_xaction->rep_cmd.offset_sectors,
> &io_xaction->qiov,
> +                    io_xaction->rep_cmd.size_sectors,
> repagent_remote_io_done,
> +                    io_xaction);
> +        }
> +    }
> +
> +    return TRUE;
> +}
> +
> +static void repagent_remote_io_done(void *opaque, int ret)
> +{
> +    struct timeval t2;
> +    RepagentReadVolIo *io_xaction = (RepagentReadVolIo *) opaque;
> +    uint8_t *pdata = NULL;
> +    RepCmdRemoteIoRes *pcmd = (RepCmdRemoteIoRes *) repcmd_new(
> +            REPHUB_CMD_REMOTE_IO_RES, io_xaction->rep_cmd.size_sectors *
> 512,
> +            &pdata);
> +    pcmd->req_id = io_xaction->rep_cmd.req_id;
> +    pcmd->volume_id = io_xaction->rep_cmd.volume_id;
> +    pcmd->io_status = -1;
> +
> +    printf("Remote IO request - volId %llu, offset %llu, size %u, is_read
> %u\n",
> +            (unsigned long long int) io_xaction->rep_cmd.volume_id,
> +            (unsigned long long int) io_xaction->rep_cmd.offset_sectors,
> +            io_xaction->rep_cmd.size_sectors,
> +            io_xaction->rep_cmd.is_read);
> +    gettimeofday(&t2, NULL);
> +
> +    if (ret >= 0) {
> +        /* Read response - send the data to the hub */
> +        t2 = tsub(t2, io_xaction->start_time);
> +        printf("Remote IO done. Took %u seconds, %u us.",
> +                (uint32_t) t2.tv_sec, (uint32_t) t2.tv_usec);
> +
> +        pcmd->io_status = 0;    /* Success */
> +        /* orim todo optimize - don't copy, use the qiov buffer */
> +        qemu_iovec_to_buffer(&io_xaction->qiov, pdata);
> +    } else {
> +        printf("readv failed: %s\n", strerror(-ret));
> +    }
> +
> +    repagent_client_send((RepCmd *) pcmd);
> +
> +    /*qemu_vfree(read_xact->buf); */
> +    g_free(io_xaction->buf);
> +
> +    g_free(io_xaction);
> +}
> +
> +static struct timeval tsub(struct timeval t1, struct timeval t2)
> +{
> +    t1.tv_usec -= t2.tv_usec;
> +    if (t1.tv_usec < 0) {
> +        t1.tv_usec += 1000000;
> +        t1.tv_sec--;
> +    }
> +    t1.tv_sec -= t2.tv_sec;
> +    return t1;
> +}
> +
> +void repagent_client_connected(void)
> +{
> +    /* orim todo thread protection */
> +    repagent_report_volumes_to_hub();
> +}
> diff --git a/block/repagent/repagent.h b/block/repagent/repagent.h
> new file mode 100644
> index 0000000..0f69820
> --- /dev/null
> +++ b/block/repagent/repagent.h
> @@ -0,0 +1,52 @@
> +/*
> + * QEMU System Emulator
> + *
> + * Copyright (c) 2003-2008 Fabrice Bellard

This is all your code so please update this with your name and 2012 as
the year.

> + *
> + * Permission is hereby granted, free of charge, to any person obtaining
> a copy
> + * of this software and associated documentation files (the "Software"),
> to deal
> + * in the Software without restriction, including without limitation the
> rights
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or
> sell
> + * copies of the Software, and to permit persons to whom the Software is
> + * furnished to do so, subject to the following conditions:
> + *
> + * The above copyright notice and this permission notice shall be
> included in
> + * all copies or substantial portions of the Software.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
> EXPRESS OR
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
> MERCHANTABILITY,
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
> SHALL
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
> ARISING FROM,
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> IN
> + * THE SOFTWARE.
> + */
> +#ifndef REPAGENT_H
> +#define REPAGENT_H
> +#include <stdint.h>
> +
> +#include "qemu-common.h"
> +
> +typedef struct RepAgentState RepAgentState;
> +typedef struct RepCmdStartProtect RepCmdStartProtect;
> +typedef struct RepCmdDataStartProtect RepCmdDataStartProtect;
> +struct RepCmdRemoteIoReq;
> +
> +/* orim temporary */
> +extern int use_repagent;
> +
> +
> +void repagent_init(const char *hubname, int port);
> +void repagent_handle_protected_write(BlockDriverState *bs,
> +        int64_t sector_num, int nb_sectors, QEMUIOVector *qiov, int
> ret_status);
> +void repagent_register_drive(const char *drive_path,
> +        BlockDriverState *driver_ptr);
> +void repagent_deregister_drive(const char *drive_path,
> +        BlockDriverState *driver_ptr);
> +int repaget_start_protect(RepCmdStartProtect *pcmd,

s/repaget/repagent/


> +        RepCmdDataStartProtect *pcmd_data);
> +int repagent_remote_io(struct RepCmdRemoteIoReq *pcmd, uint8_t *pdata);
> +void repagent_client_connected(void);
> +
> +
> +#endif /* REPAGENT_H */
> diff --git a/block/repagent/repagent_client.c
> b/block/repagent/repagent_client.c
> new file mode 100644
> index 0000000..ee4aeb7
> --- /dev/null
> +++ b/block/repagent/repagent_client.c
> @@ -0,0 +1,162 @@
> +#include "repcmd.h"
> +#include "rephub_cmds.h"
> +#include "repcmd_listener.h"
> +#include "repagent_client.h"
> +#include "repagent.h"
> +#include "main-loop.h"
> +
> +#include <string.h>
> +#include <stdlib.h>
> +#include <errno.h>
> +#include <stdio.h>
> +#include <resolv.h>
> +#include <sys/socket.h>
> +#include <arpa/inet.h>
> +#include <netinet/in.h>
> +#include <unistd.h>
> +
> +#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj))
> +
> +static void repagent_process_cmd(RepCmd *pCmd, uint8_t *pData, void
> *clientPtr);
> +
> +typedef struct repagent_client_state {
> +    int is_connected;
> +    int is_terminate_receive;
> +    int hsock;
> +} repagent_client_state;
> +
> +static repagent_client_state g_client_state = { 0 };
> +
> +static void repagent_client_read(void *opaque)
> +{
> +    printf("repagent_client_read\n");
> +    int bytes_read =
> repcmd_listener_socket_read_next_buf(g_client_state.hsock);

Did I miss where the socket is set to nonblocking?  This function should
not block if used as an fd handler in QEMU's main loop.

> +    if (bytes_read <= 0) {
> +        printf("repagent_client_read failed (%d), errno=%d\n",
> +        bytes_read, errno);
> +        g_client_state.is_connected = 0;
> +    }
> +}
> +
> +void *repagent_listen(void *pParam)
> +{
> +    rephub_params *pServerParams = (rephub_params *) pParam;
> +    int host_port = pServerParams->port;
> +    const char *host_name = pServerParams->name;
> +
> +    printf("Creating repagent listener thread...\n");
> +    g_free(pServerParams);
> +
> +    struct sockaddr_in my_addr;
> +
> +    int err;
> +    int retries = 0;
> +
> +    g_client_state.hsock = socket(AF_INET, SOCK_STREAM, 0);

For portability there is qemu_socket.h.

> +    if (g_client_state.hsock == -1) {
> +        printf("Error initializing socket %d\n", errno);
> +        return (void *) -1;
> +    }
> +
> +    int param = 1;
> +
> +    if ((setsockopt(g_client_state.hsock, SOL_SOCKET, SO_REUSEADDR,
> +            (char *) &param, sizeof(int)) == -1)
> +            || (setsockopt(g_client_state.hsock, SOL_SOCKET,
> SO_KEEPALIVE,
> +                    (char *) &param, sizeof(int)) == -1)) {
> +        printf("Error setting options %d\n", errno);
> +        return (void *) -1;
> +    }
> +
> +    my_addr.sin_family = AF_INET;
> +    my_addr.sin_port = htons(host_port);
> +    memset(&(my_addr.sin_zero), 0, 8);
> +
> +    my_addr.sin_addr.s_addr = inet_addr(host_name);
> +
> +    /* Reconnect loop */
> +    while (!g_client_state.is_terminate_receive) {
> +
> +        if (connect(g_client_state.hsock, (struct sockaddr *) &my_addr,
> +                sizeof(my_addr)) == -1) {
> +            err = errno;
> +            if (err != EINPROGRESS) {
> +                retries++;
> +                fprintf(
> +                        stderr,
> +                        "Error connecting socket %d. Host %s, port %u.
> Retry count %d\n",
> +                        errno, host_name, host_port, retries);
> +                usleep(5 * 1000 * 1000);
> +                continue;
> +            }
> +        }
> +        retries = 0;
> +
> +        printf("After connect\n");
> +        g_client_state.is_connected = 1;
> +        repagent_client_connected();
> +        repcmd_listener_init(repagent_process_cmd, NULL);
> +        static int c;
> +        /* repcmd_listener_socket_thread_listener(g_client_state.hsock);
> */
> +        qemu_set_fd_handler(g_client_state.hsock, repagent_client_read,
> NULL,
> +                NULL);
> +        while (g_client_state.is_connected) {
> +            printf("Connected (%d)...\n", c++);
> +            usleep(1 * 1000 * 1000);
> +        }

This thread isn't actually doing anything!  I think this function could
be integrated into the QEMU event loop instead of having a thread.

> +        /* Unregister */
> +        qemu_set_fd_handler(g_client_state.hsock, NULL, NULL, NULL);
> +
> +        printf("Disconnected\n");
> +        g_client_state.is_connected = 0;
> +        close(g_client_state.hsock);
> +
> +    }
> +    return 0;
> +}
> +
> +void repagent_process_cmd(RepCmd *pcmd, uint8_t *pdata, void *clientPtr)
> +{
> +    int is_free_data = 1;
> +    printf("Repagent got cmd %d\n", pcmd->hdr.cmdid);
> +    switch (pcmd->hdr.cmdid) {
> +    case REPHUB_CMD_START_PROTECT: {
> +        is_free_data = repaget_start_protect((RepCmdStartProtect *) pcmd,
> +                (RepCmdDataStartProtect *) pdata);
> +    }
> +        break;
> +    case REPHUB_CMD_REMOTE_IO_REQ: {
> +        is_free_data = repagent_remote_io((RepCmdRemoteIoReq *) pcmd,
> pdata);
> +    }
> +        break;
> +    default:
> +        assert(0);
> +        break;
> +
> +    }
> +
> +    if (is_free_data) {
> +        g_free(pdata);
> +    }
> +}
> +
> +int repagent_client_send(RepCmd *p_cmd)
> +{
> +    int bytecount = 0;
> +    printf("Send cmd %u, data size %u\n", p_cmd->hdr.cmdid,
> +            p_cmd->hdr.data_size_bytes);
> +    if (!g_client_state.is_connected) {
> +        printf("Not connected to hub\n");
> +        return -1;
> +    }
> +
> +    bytecount = send(g_client_state.hsock, p_cmd,
> +            sizeof(RepCmd) + p_cmd->hdr.data_size_bytes, 0);
> +    if (bytecount < sizeof(RepCmd) + p_cmd->hdr.data_size_bytes) {
> +        printf("Bad send %d, errno %d\n", bytecount, errno);
> +        return bytecount;
> +    }
> +
> +    /* Success */
> +    return 0;
> +}
> diff --git a/block/repagent/repagent_client.h
> b/block/repagent/repagent_client.h
> new file mode 100644
> index 0000000..62a5377
> --- /dev/null
> +++ b/block/repagent/repagent_client.h
> @@ -0,0 +1,36 @@
> +/*
> + * QEMU System Emulator
> + *
> + * Copyright (c) 2003-2008 Fabrice Bellard
> + *
> + * Permission is hereby granted, free of charge, to any person obtaining
> a copy
> + * of this software and associated documentation files (the "Software"),
> to deal
> + * in the Software without restriction, including without limitation the
> rights
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or
> sell
> + * copies of the Software, and to permit persons to whom the Software is
> + * furnished to do so, subject to the following conditions:
> + *
> + * The above copyright notice and this permission notice shall be
> included in
> + * all copies or substantial portions of the Software.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
> EXPRESS OR
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
> MERCHANTABILITY,
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
> SHALL
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
> ARISING FROM,
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> IN
> + * THE SOFTWARE.
> + */
> +#ifndef REPAGENT_CLIENT_H
> +#define REPAGENT_CLIENT_H
> +#include "repcmd.h"
> +
> +typedef struct rephub_params {
> +    char *name;
> +    int port;
> +} rephub_params;
> +
> +void *repagent_listen(void *pParam);

If you name this repagent_listen_thread then it's clearer why you need
the void* return value and argument.

> +int repagent_client_send(RepCmd *p_cmd);
> +
> +#endif /* REPAGENT_CLIENT_H */
> diff --git a/block/repagent/repagent_drv.c b/block/repagent/repagent_drv.c
> new file mode 100644
> index 0000000..8c9f2b6
> --- /dev/null
> +++ b/block/repagent/repagent_drv.c
> @@ -0,0 +1,177 @@
> +#include <string.h>
> +#include <stdlib.h>
> +#include <stdio.h>
> +#include <stdint.h>
> +
> +#include "block.h"
> +#include "rephub_defs.h"
> +#include "block_int.h"
> +#include "repagent_client.h"
> +#include "repagent.h"
> +#include "rephub_cmds.h"
> +#include "module.h"
> +
> +#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj))
> +
> +static int repagent_open(BlockDriverState *bs, int flags)
> +{
> +    repagent_register_drive(bs->filename,  bs);
> +    printf("%s\n", __func__);
> +    bs->sg = bs->file->sg;
> +    return 0;
> +}
> +
> +static int coroutine_fn repagent_co_readv(BlockDriverState *bs,
> +        int64_t sector_num, int nb_sectors, QEMUIOVector *qiov)
> +{
> +    printf("%s\n", __func__);
> +    return bdrv_co_readv(bs->file, sector_num, nb_sectors, qiov);
> +}
> +
> +static int coroutine_fn repagent_co_writev(BlockDriverState *bs,
> +        int64_t sector_num, int nb_sectors, QEMUIOVector *qiov)
> +{
> +    printf("%s\n", __func__);
> +    int ret = 0;
> +    repagent_handle_protected_write(bs, sector_num,
> +            nb_sectors, qiov, ret);
> +    printf("Before write off %lld, size %d\n", (long long int)
> sector_num,
> +            nb_sectors);
> +    ret = bdrv_co_writev(bs->file, sector_num, nb_sectors, qiov);
> +    printf("After write ret %d\n", ret);
> +    return ret;
> +}
> +
> +static void repagent_close(BlockDriverState *bs)
> +{
> +    printf("%s\n", __func__);
> +}
> +
> +static int coroutine_fn repagent_co_flush(BlockDriverState *bs)
> +{
> +    printf("%s\n", __func__);
> +    return bdrv_co_flush(bs->file);
> +}
> +
> +static int64_t repagent_getlength(BlockDriverState *bs)
> +{
> +    printf("%s\n", __func__);
> +    return bdrv_getlength(bs->file);
> +}
> +
> +static int repagent_truncate(BlockDriverState *bs, int64_t offset)
> +{
> +    printf("%s\n", __func__);
> +    return bdrv_truncate(bs->file, offset);
> +}
> +
> +static int repagent_probe(const uint8_t *buf, int buf_size,
> +        const char *filename)
> +{
> +    printf("%s\n", __func__);
> +    return 1; /* everything can be opened as raw image */
> +}
> +
> +static int coroutine_fn repagent_co_discard(BlockDriverState *bs,
> +                                       int64_t sector_num, int
> nb_sectors)
> +{
> +    return bdrv_co_discard(bs->file, sector_num, nb_sectors);
> +}
> +
> +static int repagent_is_inserted(BlockDriverState *bs)
> +{
> +    printf("%s\n", __func__);
> +    return bdrv_is_inserted(bs->file);
> +}
> +
> +static int repagent_media_changed(BlockDriverState *bs)
> +{
> +    printf("%s\n", __func__);
> +    return bdrv_media_changed(bs->file);
> +}
> +
> +static void repagent_eject(BlockDriverState *bs, bool eject_flag)
> +{
> +    printf("%s\n", __func__);
> +    bdrv_eject(bs->file, eject_flag);
> +}
> +
> +static void repagent_lock_medium(BlockDriverState *bs, bool locked)
> +{
> +    printf("%s\n", __func__);
> +    bdrv_lock_medium(bs->file, locked);
> +}
> +
> +static int repagent_ioctl(BlockDriverState *bs, unsigned long int req,
> +        void *buf)
> +{
> +    printf("%s\n", __func__);
> +   return bdrv_ioctl(bs->file, req, buf);
> +}
> +
> +static BlockDriverAIOCB *repagent_aio_ioctl(BlockDriverState *bs,
> +        unsigned long int req, void *buf,
> +        BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    printf("%s\n", __func__);
> +   return bdrv_aio_ioctl(bs->file, req, buf, cb, opaque);
> +}
> +
> +static int repagent_create(const char *filename, QEMUOptionParameter
> *options)
> +{
> +    printf("%s\n", __func__);
> +    return bdrv_create_file(filename, options);
> +}
> +
> +static QEMUOptionParameter repagent_create_options[] = {
> +    {
> +        .name = BLOCK_OPT_SIZE,
> +        .type = OPT_SIZE,
> +        .help = "Virtual disk size"
> +    },
> +    { NULL }
> +};
> +
> +static int repagent_has_zero_init(BlockDriverState *bs)
> +{
> +    printf("%s\n", __func__);
> +    return bdrv_has_zero_init(bs->file);
> +}
> +
> +static BlockDriver bdrv_repagent = {
> +    .format_name        = "repagent",
> +
> +    /* It's really 0, but we need to make g_malloc() happy */
> +    .instance_size      = 1,
> +
> +    .bdrv_open          = repagent_open,
> +    .bdrv_close         = repagent_close,
> +
> +    .bdrv_co_readv          = repagent_co_readv,
> +    .bdrv_co_writev         = repagent_co_writev,
> +    .bdrv_co_flush_to_disk  = repagent_co_flush,
> +    .bdrv_co_discard        = repagent_co_discard,
> +
> +    .bdrv_probe         = repagent_probe,
> +    .bdrv_getlength     = repagent_getlength,
> +    .bdrv_truncate      = repagent_truncate,
> +
> +    .bdrv_is_inserted   = repagent_is_inserted,
> +    .bdrv_media_changed = repagent_media_changed,
> +    .bdrv_eject         = repagent_eject,
> +    .bdrv_lock_medium   = repagent_lock_medium,
> +
> +    .bdrv_ioctl         = repagent_ioctl,
> +    .bdrv_aio_ioctl     = repagent_aio_ioctl,
> +
> +    .bdrv_create        = repagent_create,
> +    .create_options     = repagent_create_options,
> +    .bdrv_has_zero_init = repagent_has_zero_init,
> +};
> +
> +static void bdrv_repagent_init(void)
> +{
> +    bdrv_register(&bdrv_repagent);
> +}
> +
> +block_init(bdrv_repagent_init);
> diff --git a/block/repagent/repcmd.h b/block/repagent/repcmd.h
> new file mode 100644
> index 0000000..e32acf5
> --- /dev/null
> +++ b/block/repagent/repcmd.h
> @@ -0,0 +1,59 @@
> +/*
> + * QEMU System Emulator
> + *
> + * Copyright (c) 2003-2008 Fabrice Bellard
> + *
> + * Permission is hereby granted, free of charge, to any person obtaining
> a copy
> + * of this software and associated documentation files (the "Software"),
> to deal
> + * in the Software without restriction, including without limitation the
> rights
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or
> sell
> + * copies of the Software, and to permit persons to whom the Software is
> + * furnished to do so, subject to the following conditions:
> + *
> + * The above copyright notice and this permission notice shall be
> included in
> + * all copies or substantial portions of the Software.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
> EXPRESS OR
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
> MERCHANTABILITY,
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
> SHALL
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
> ARISING FROM,
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> IN
> + * THE SOFTWARE.
> + */
> +#ifndef REPCMD_H
> +#define REPCMD_H
> +
> +#include <stdint.h>
> +
> +#define REPCMD_MAGIC1 (0x1122)
> +#define REPCMD_MAGIC2 (0x3344)
> +#define REPCMD_NUM_U32_PARAMS (11)
> +
> +enum RepCmds {
> +    REPCMD_FIRST_INVALID                    = 0,
> +    REPCMD_FIRST_HUBCMD                     = 1,
> +    REPHUB_CMD_PROTECTED_WRITE              = 2,
> +    REPHUB_CMD_REPORT_VM_VOLUMES            = 3,
> +    REPHUB_CMD_START_PROTECT                = 4,
> +    REPHUB_CMD_REMOTE_IO_REQ                 = 5,
> +    REPHUB_CMD_REMOTE_IO_RES                                 = 6,
> +    REPHUB_CMD_AGENT_SHUTDOWN               = 7,
> +};
> +
> +typedef struct RepCmdHdr {
> +    uint16_t magic1;
> +    uint16_t cmdid;
> +    uint32_t data_size_bytes;
> +} RepCmdHdr;
> +
> +typedef struct RepCmd {
> +    RepCmdHdr hdr;
> +    unsigned int parameters[REPCMD_NUM_U32_PARAMS];

If it's a U32 then please use uint32_t.

> +    unsigned int magic2;

uint32_t

> +    uint8_t data[0];
> +} RepCmd;
> +
> +RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata);

Please make the arguments consistent with the packet structs:
uint16_t cmdid
uint32_t data_size_bytes

It's confusing and error-prone to change the spelling of the fields and
their types.

> +
> +#endif /* REPCMD_H */
> diff --git a/block/repagent/repcmd_listener.c
> b/block/repagent/repcmd_listener.c
> new file mode 100644
> index 0000000..54d3f60
> --- /dev/null
> +++ b/block/repagent/repcmd_listener.c
> @@ -0,0 +1,173 @@
> +#include <fcntl.h>
> +#include <string.h>
> +#include <stdlib.h>
> +#include <errno.h>
> +#include <stdio.h>
> +#include <netinet/in.h>
> +#include <resolv.h>
> +#include <sys/socket.h>
> +#include <arpa/inet.h>
> +#include <unistd.h>
> +#include <pthread.h>
> +#include <assert.h>
> +
> +/* Use the CONFIG_REPAGENT flag to determine whether
> + * we're under qemu build or a hub When under
> + * qemu use g_malloc */
> +#ifdef CONFIG_REPAGENT
> +#include <glib.h>
> +#define REPCMD_MALLOC g_malloc
> +#else
> +#define REPCMD_MALLOC malloc
> +#endif
> +
> +#include "repcmd.h"
> +#include "repcmd_listener.h"
> +
> +#define ZERO_MEM_OBJ(pObj) memset((void *)pObj, 0, sizeof(*pObj))
> +
> +
> +typedef struct RepCmdRxCmdState {
> +    RepCmd curCmd;
> +    uint8_t *pReadBuf;
> +    int bytesToGet;
> +    int bytesGotten;

Byte counts and memory size can be done nicely with size_t - it is
unsigned so you eliminate the possibility of negative value bugs.

> +    int isGotHeader;

bool

> +    uint8_t *pdata;
> +} RepCmdRxCmdState;
> +
> +typedef struct RepCmdListenerState {
> +    int is_terminate_receive;

bool

> +    pfn_received_cmd_cb  receive_cb;
> +    void *opaque;
> +    int hsock;
> +    RepCmdRxCmdState cur_cmd;
> +} RepCmdListenerState;
> +
> +static RepCmdListenerState g_listenerState = { 0 };

Global variables are zeroed on startup, there's no need for a zero
initializer.

> +
> +static int repcmd_listener_process_rx(int bytecount);
> +
> +void repcmd_listener_init(pfn_received_cmd_cb callback, void *opaque)
> +{
> +    ZERO_MEM_OBJ(&g_listenerState);
> +    g_listenerState.receive_cb = callback;
> +    g_listenerState.opaque = opaque;
> +
> +    g_listenerState.cur_cmd.bytesToGet = sizeof(RepCmd);

Hint that bytesToGet should be size_t since sizeof() evaluates to a
size_t.

> +    g_listenerState.cur_cmd.pReadBuf =
> +            (uint8_t *) &g_listenerState.cur_cmd.curCmd;
> +}
> +
> +int repcmd_listener_socket_read_next_buf(int hsock)
> +{
> +    RepCmdRxCmdState *cmd_state = &g_listenerState.cur_cmd;
> +    int bytecount = recv(hsock, cmd_state->pReadBuf +
> cmd_state->bytesGotten,
> +            cmd_state->bytesToGet - cmd_state->bytesGotten, 0);
> +    return repcmd_listener_process_rx(bytecount);
> +}
> +
> +/* Returns 0 for initiated termination or socket error value on error */
> +int repcmd_listener_socket_thread_listener(int hsock)
> +{
> +    int ret = 0;
> +    /* receive loop */
> +    while (!g_listenerState.is_terminate_receive) {
> +        ret = repcmd_listener_socket_read_next_buf(hsock);
> +        if (ret <= 0) {
> +            return ret;
> +        }
> +    }
> +    return 0;
> +}
> +
> +static int repcmd_listener_process_rx(int bytecount)
> +{
> +    RepCmdRxCmdState *cmd_state = &g_listenerState.cur_cmd;
> +    if (bytecount == -1) {
> +        fprintf(stderr, "Error receiving data %d\n", errno);
> +        return errno;

return -errno.  Positive values are already used for valid byte counts
below.

> +    }
> +
> +    if (bytecount == 0) {
> +        printf("Disconnected\n");
> +        return 0;

What happens if we get disconnected, looks like this thread will
continue running with nothing to do?

> +    }
> +    cmd_state->bytesGotten += bytecount;
> +/*     printf("Recieved bytes %d, got %d/%d\n",
> +            bytecount, cmd_state->bytesGotten, cmd_state->bytesToGet); */
> +    /* print content */
> +    if (0) {
> +        int i;
> +        for (i = 0; i < bytecount ; i += 4) {
> +            /*printf("%d/%d", i, bytecount/4); */
> +            printf(
> +                    "%#x ",
> +                    *(int *) (&cmd_state->pReadBuf[cmd_state->bytesGotten
> +                            - bytecount + i]));
> +
> +        }
> +        printf("\n");
> +    }
> +    assert(cmd_state->bytesGotten <= cmd_state->bytesToGet);
> +    if (cmd_state->bytesGotten == cmd_state->bytesToGet) {
> +        int isGotData = 0;
> +        cmd_state->bytesGotten = 0;
> +        if (!cmd_state->isGotHeader) {
> +            /* We just got the header */
> +            cmd_state->isGotHeader = 1;
> +
> +            assert(cmd_state->curCmd.hdr.magic1 == REPCMD_MAGIC1);
> +            assert(cmd_state->curCmd.magic2 == REPCMD_MAGIC2);

Is there a more robust error handling strategy than aborting the QEMU
process?

> +            if (cmd_state->curCmd.hdr.data_size_bytes > 0) {
> +                cmd_state->pdata = (uint8_t *)REPCMD_MALLOC(
> +                        cmd_state->curCmd.hdr.data_size_bytes);
> +/*                    printf("malloc %p\n", cmd_state->pdata); */
> +                cmd_state->pReadBuf = cmd_state->pdata;
> +            } else {
> +                /* no data */
> +                isGotData = 1;
> +                cmd_state->pdata = NULL;
> +            }
> +            cmd_state->bytesToGet =
> cmd_state->curCmd.hdr.data_size_bytes;
> +        } else {
> +            isGotData = 1;
> +        }
> +
> +        if (isGotData) {
> +            /* Got command and data */
> +            (*g_listenerState.receive_cb)(&cmd_state->curCmd,
> cmd_state->pdata,
> +                    g_listenerState.opaque);
> +
> +            /* It's the callee responsibility to free cmd_state->pdata */
> +            cmd_state->pdata = NULL;
> +            ZERO_MEM_OBJ(&cmd_state->curCmd);
> +            cmd_state->pReadBuf = (uint8_t *) &cmd_state->curCmd;
> +            cmd_state->bytesGotten = 0;
> +            cmd_state->bytesToGet = sizeof(RepCmd);
> +            cmd_state->isGotHeader = 0;
> +        }
> +    }

Since we're running in a dedicated thread sequential code would be
simpler than this state-machine approach with isGotHeader and isGotData?

> +    return bytecount;
> +}
> +
> +RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata)
> +{
> +    RepCmd *p_cmd = (RepCmd *)REPCMD_MALLOC(sizeof(RepCmd) + data_size);
> +    assert(p_cmd != NULL);
> +
> +    /* Zero the CMD (not the data) */
> +    ZERO_MEM_OBJ(p_cmd);
> +
> +    p_cmd->hdr.cmdid = cmd_id;
> +    p_cmd->hdr.magic1 = REPCMD_MAGIC1;
> +    p_cmd->magic2 = REPCMD_MAGIC2;
> +    p_cmd->hdr.data_size_bytes = data_size;
> +
> +    if (p_out_pdata != NULL) {
> +        *p_out_pdata = p_cmd->data;
> +    }
> +
> +    return p_cmd;
> +}
> +
> diff --git a/block/repagent/repcmd_listener.h
> b/block/repagent/repcmd_listener.h
> new file mode 100644
> index 0000000..19b9ea9
> --- /dev/null
> +++ b/block/repagent/repcmd_listener.h
> @@ -0,0 +1,34 @@
> +/*
> + * QEMU System Emulator
> + *
> + * Copyright (c) 2003-2008 Fabrice Bellard
> + *
> + * Permission is hereby granted, free of charge, to any person obtaining
> a copy
> + * of this software and associated documentation files (the "Software"),
> to deal
> + * in the Software without restriction, including without limitation the
> rights
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or
> sell
> + * copies of the Software, and to permit persons to whom the Software is
> + * furnished to do so, subject to the following conditions:
> + *
> + * The above copyright notice and this permission notice shall be
> included in
> + * all copies or substantial portions of the Software.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
> EXPRESS OR
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
> MERCHANTABILITY,
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
> SHALL
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
> ARISING FROM,
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> IN
> + * THE SOFTWARE.
> + */
> +#ifndef REPCMD_LISTENER_H
> +#define REPCMD_LISTENER_H
> +#include <stdint.h>
> +typedef void (*pfn_received_cmd_cb)(RepCmd *pcmd,
> +                uint8_t *pdata, void *opaque);
> +
> +void repcmd_listener_init(pfn_received_cmd_cb callback, void *opaque);
> +int repcmd_listener_socket_read_next_buf(int hsock);
> +int repcmd_listener_socket_thread_listener(int hsock);
> +
> +#endif /* REPCMD_LISTENER_H */
> diff --git a/block/repagent/rephub_cmds.h b/block/repagent/rephub_cmds.h
> new file mode 100644
> index 0000000..cb737e6
> --- /dev/null
> +++ b/block/repagent/rephub_cmds.h
> @@ -0,0 +1,154 @@
> +/*
> + * QEMU System Emulator
> + *
> + * Copyright (c) 2003-2008 Fabrice Bellard
> + *
> + * Permission is hereby granted, free of charge, to any person obtaining
> a copy
> + * of this software and associated documentation files (the "Software"),
> to deal
> + * in the Software without restriction, including without limitation the
> rights
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or
> sell
> + * copies of the Software, and to permit persons to whom the Software is
> + * furnished to do so, subject to the following conditions:
> + *
> + * The above copyright notice and this permission notice shall be
> included in
> + * all copies or substantial portions of the Software.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
> EXPRESS OR
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
> MERCHANTABILITY,
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
> SHALL
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
> ARISING FROM,
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> IN
> + * THE SOFTWARE.
> + */
> +#ifndef REPHUB_CMDS_H
> +#define REPHUB_CMDS_H
> +
> +#include <stdint.h>
> +#include "repcmd.h"
> +#include "rephub_defs.h"
> +
> +/*********************************************************
> + * RepCmd Report a protected IO
> + *
> + * REPHUB_CMD_PROTECTED_WRITE
> + * Direction: agent->hub
> + *
> + * Any write of a protected volume is send with this
> + * message to the hub, with its status.
> + * When case the status is bad no data is sent
> + *********************************************************/
> +typedef struct RepCmdProtectedWrite {
> +    RepCmdHdr hdr;
> +    uint64_t volume_id;
> +    uint64_t offset_sectors;
> +    /* The size field duplicates the RepCmd size,
> +     * but it is needed for reporting failed IOs' sizes */
> +    uint32_t size_sectors;
> +    int ret_status;
> +} RepCmdProtectedWrite;
> +
> +/*********************************************************
> + * RepCmd Report VM volumes
> + *
> + * REPHUB_CMD_REPORT_VM_VOLUMES
> + * Direction: agent->hub
> + *
> + * The agent reports all the volumes of the VM
> + * to the hub.
> + *********************************************************/
> +typedef struct RepVmVolumeInfo {
> +    char name[REPHUB_MAX_VOL_NAME_LEN];
> +    uint64_t volume_id;
> +    uint32_t size_mb;
> +    uint32_t padding;
> +} RepVmVolumeInfo;
> +
> +typedef struct RepCmdReportVmVolumes {
> +    RepCmdHdr hdr;
> +    int num_volumes;
> +} RepCmdReportVmVolumes;
> +
> +typedef struct RepCmdDataReportVmVolumes {
> +    RepVmVolumeInfo volumes[0];
> +} RepCmdDataReportVmVolumes;
> +
> +
> +/*********************************************************
> + * RepCmd Start protect
> + *
> + * REPHUB_CMD_START_PROTECT
> + * Direction: hub->agent
> + *
> + * The hub instructs the agent to start protecting
> + * a volume. When a volume is protected all its writes
> + * are sent to to the hub.
> + * With this command the hub also assigns a volume ID to
> + * the given volume name.
> + *********************************************************/
> +typedef struct RepCmdStartProtect {
> +    RepCmdHdr hdr;
> +    uint64_t volume_id;
> +} RepCmdStartProtect;
> +
> +typedef struct RepCmdDataStartProtect {
> +    char volume_name[REPHUB_MAX_VOL_NAME_LEN];
> +} RepCmdDataStartProtect;
> +
> +
> +/*********************************************************
> + * RepCmd Remote IO Request
> + *
> + * REPHUB_CMD_REMOTE_IO_REQ
> + * Direction: hub->agent
> + *
> + * The hub issues an IO to a volume.
> + * This command is used for:
> + * - Reading protected volume during sync
> + * - Read/write to a recovery volume during
> + *     protect and failover or failover test
> + * This command is a request, the read data is returned
> + * by the response command REPHUB_CMD_REMOTE_IO_RES
> + *********************************************************/
> +typedef struct RepCmdRemoteIoReq {
> +    RepCmdHdr hdr;
> +    int req_id;
> +    int size_sectors;
> +    uint64_t volume_id;
> +    uint64_t offset_sectors;
> +    int is_read;
> +} RepCmdRemoteIoReq;
> +
> +/*********************************************************
> + * RepCmd Read Volume Response
> + *
> + * REPHUB_CMD_REMOTE_IO_RES
> + * Direction: agent->hub
> + *
> + * A response to REPHUB_CMD_REMOTE_IO_REQ.
> + * Sends the data read from a protected volume
> + *********************************************************/
> +typedef struct RepCmdRemoteIoRes {
> +    RepCmdHdr hdr;
> +    int req_id;
> +    int io_status;
> +    uint64_t volume_id;
> +} RepCmdRemoteIoRes;
> +
> +/*********************************************************
> + * RepCmd Agent shutdown
> + *
> + * REPHUB_CMD_AGENT_SHUTDOWN
> + * Direction: agent->hub
> + *
> + * Notifies the hub that the agent is about to shutdown.
> + * This allows a graceful shutdown. Any disconnection
> + * of an agent without sending this command will result
> + * in a full sync of the VM volumes.
> + *********************************************************/
> +typedef struct RepCmdAgentShutdown {
> +    RepCmdHdr hdr;
> +} RepCmdAgentShutdown;
> +
> +
> +#endif /* REPHUB_CMDS_H */
> diff --git a/block/repagent/rephub_defs.h b/block/repagent/rephub_defs.h
> new file mode 100644
> index 0000000..e34e0ce
> --- /dev/null
> +++ b/block/repagent/rephub_defs.h
> @@ -0,0 +1,40 @@
> +/*
> + * QEMU System Emulator
> + *
> + * Copyright (c) 2003-2008 Fabrice Bellard
> + *
> + * Permission is hereby granted, free of charge, to any person obtaining
> a copy
> + * of this software and associated documentation files (the "Software"),
> to deal
> + * in the Software without restriction, including without limitation the
> rights
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or
> sell
> + * copies of the Software, and to permit persons to whom the Software is
> + * furnished to do so, subject to the following conditions:
> + *
> + * The above copyright notice and this permission notice shall be
> included in
> + * all copies or substantial portions of the Software.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
> EXPRESS OR
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
> MERCHANTABILITY,
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
> SHALL
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
> ARISING FROM,
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> IN
> + * THE SOFTWARE.
> + */
> +#ifndef REP_HUB_DEFS_H
> +#define REP_HUB_DEFS_H
> +
> +#include <stdint.h>
> +
> +#define REPHUB_MAX_VOL_NAME_LEN (1024)
> +#define REPHUB_MAX_NUM_VOLUMES (512)
> +
> +#ifndef TRUE
> +    #define TRUE (1)
> +#endif
> +
> +#ifndef FALSE
> +    #define FALSE (0)
> +#endif

Use C99 <stdbool.h>

> +
> +#endif /* REP_HUB_DEFS_H */



reply via email to

[Prev in Thread] Current Thread [Next in Thread]