qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [RFC PATCH v2] Replication agent module


From: Ori Mamluk
Subject: [Qemu-devel] [RFC PATCH v2] Replication agent module
Date: Sun, 1 Apr 2012 15:05:48 +0300

Repagent is a new block driver that allows an external replication system
to hook to the Qemu storage stack to replicate a volume of a VM.
This RFC patch adds the repagent client module to Qemu.

Documentation of the module role and API is in the patch at
replication/qemu-repagent.txt
In high level, the API is shaped like a filter-driver for the block layer
- following a discussion on the mailing list triggered by the first
version submitted.
The idea is to leave the specific applications out of the Qemu storage
logic by treating it as a filter hooking into the storage layer, rather
than being a feature of that layer.

The main motivation behind the module is to allow replication of VMs in a
virtualization environment like RhevM.
To achieve this we need basic replication support in Qemu.

By default the build and use of this module is disabled. To activate it
you need to use a flag in ./configure and a commandline switch to qemu.

The module is not feature complete, I wanted to get approval for the basic
approach and interface first, and then complete the features.

Missing features:
* Dirty bitmap at the protected side
        The protected volume side needs to persistently keep track of
'pending' IOs.
        I want to add a bitmap (can hook like another 'filter' with the
repagent), that will synchronously track IOs. And allow getting a list of
such IOs.
        Without such a bitmap a failure of any component (rephub or agent)
will require reading the whole protected volume.
* Capture IOs at the recovery side
        During fail-over or fail-over test, the repagent at the recovery
side needs to capture all IOs (reads and writes) done by the fail-over VM
        and answer them by passing them synchronously to the VRA.
* Reporting of IO failures
        TBD
* Sample Rephub
        An application that along with Repagent is a full replication
solution.
* There is still much debugging code (including printfs) in the code. This
willnaturally go away is more mature versions.

This is the second submission of this module.
Changes from the first patch version:
* Changed the module to be a block driver instead of integrating it with
the generic block layer - for a filter-like API.
* Removed dedicated thread for the Rephub socket - now using
qemu_set_fd_handler instead.

Points and open issues:
* It's a lot of code sent in a single patch - I'm not sure how/if to
better split it - I'll gladly take any directions of how it should be
done.
* I don't have deep knowledge of the structure of the storage stack. I
used the "raw" format as an example to make "repagent" a
        filter driver that passes-through most of the calls. Is it the
right way to do it?
* I didn't find a good way to tell the block layer that repagent filter
should be used. I currently use a temporary global flag - 'use_repagent'
        It is set in main after parsing the commandline options, and
checked in block.c. I guess there should be a better way to convey this
option.

Tests I ran:
* I hooked a Qemu VM with Repagent to a full zerto replication solution
based on vmware, and was able to replicate and recover the VM on a remote
site.
* I wrote a stand-alone Rephub for actively reading the protected volume
and copying it - for test purposes.

Appreciate any feedback or suggestions.
Thanks,
Ori.

From: Ori Mamluk <address@hidden(none)>

Added replication agent module to Qemu

Added build options to ./configure:  --enable-replication
--disable-replicat
Added a commandline option to enable: -repagent <rep hub IP>
Added the module files under replication.
Added the repagent API usage in block.c

Signed-off-by: Ori Mamluk <address@hidden>
---
 Makefile                         |    3 +-
 Makefile.objs                    |    6 +
 block.c                          |   44 +++++-
 block/repagent/qemu-repagent.txt |   73 ++++++++
 block/repagent/repagent.c        |  355
++++++++++++++++++++++++++++++++++++++
 block/repagent/repagent.h        |   52 ++++++
 block/repagent/repagent_client.c |  162 +++++++++++++++++
 block/repagent/repagent_client.h |   36 ++++
 block/repagent/repagent_drv.c    |  177 +++++++++++++++++++
 block/repagent/repcmd.h          |   59 +++++++
 block/repagent/repcmd_listener.c |  173 ++++++++++++++++++
 block/repagent/repcmd_listener.h |   34 ++++
 block/repagent/rephub_cmds.h     |  154 ++++++++++++++++
 block/repagent/rephub_defs.h     |   40 +++++
 blockdev.c                       |    1 +
 configure                        |   12 ++
 qemu-options.hx                  |    6 +
 vl.c                             |   12 ++
 18 files changed, 1396 insertions(+), 3 deletions(-)
 mode change 100644 => 100755 Makefile.objs
 create mode 100644 block/repagent/qemu-repagent.txt
 create mode 100644 block/repagent/repagent.c
 create mode 100644 block/repagent/repagent.h
 create mode 100644 block/repagent/repagent_client.c
 create mode 100644 block/repagent/repagent_client.h
 create mode 100644 block/repagent/repagent_drv.c
 create mode 100644 block/repagent/repcmd.h
 create mode 100644 block/repagent/repcmd_listener.c
 create mode 100644 block/repagent/repcmd_listener.h
 create mode 100644 block/repagent/rephub_cmds.h
 create mode 100644 block/repagent/rephub_defs.h
 mode change 100644 => 100755 qemu-options.hx

diff --git a/Makefile b/Makefile
index 49c775b..f7966f8 100644
--- a/Makefile
+++ b/Makefile
@@ -224,6 +224,7 @@ clean:
        rm -f $(GENERATED_SOURCES)
        rm -rf $(qapi-dir)
        $(MAKE) -C tests/tcg clean
+       rm -f block/repagent/*.{o,d}
        for d in $(ALL_SUBDIRS) $(QEMULIBS) libcacard; do \
        if test -d $$d; then $(MAKE) -C $$d $@ || exit 1; fi; \
        rm -f $$d/qemu-options.def; \
@@ -390,4 +391,4 @@ tar:
        rm -rf /tmp/$(FILE)

 # Include automatically generated dependency files
--include $(wildcard *.d audio/*.d slirp/*.d block/*.d net/*.d ui/*.d
qapi/*.d qga/*.d)
+-include $(wildcard *.d audio/*.d slirp/*.d block/*.d net/*.d ui/*.d
qapi/*.d qga/*.d block/repagent/*.d)
diff --git a/Makefile.objs b/Makefile.objs
old mode 100644
new mode 100755
index b39d76c..36165ae
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -30,6 +30,11 @@ block-obj-y += $(coroutine-obj-y) $(qobject-obj-y)
$(version-obj-y)
 block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o

+# Replication agent block driver - repagent
+repagent-nested-y = repagent_client.o  repagent.o  repcmd_listener.o
repagent_drv.o
+repagent-obj-y = $(addprefix block/repagent/, $(repagent-nested-y))
+block-obj-y += $(repagent-obj-y)
+
 block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o
vpc.o vvfat.o
 block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o
qcow2-snapshot.o qcow2-cache.o
 block-nested-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o
qed-cluster.o
@@ -75,6 +80,7 @@ fsdev-obj-$(CONFIG_VIRTFS) += $(addprefix fsdev/,
$(fsdev-nested-y))
 # single QEMU executable should support all CPUs and machines.

 common-obj-y = $(block-obj-y) blockdev.o
+common-obj-y += $(repagent-obj-$(CONFIG_REPAGENT))
 common-obj-y += $(net-obj-y)
 common-obj-y += $(qobject-obj-y)
 common-obj-$(CONFIG_LINUX) += $(fsdev-obj-$(CONFIG_LINUX))
diff --git a/block.c b/block.c
index 52ffe14..8e11c03 100644
--- a/block.c
+++ b/block.c
@@ -32,6 +32,10 @@
 #include "qmp-commands.h"
 #include "qemu-timer.h"

+#ifdef CONFIG_REPAGENT
+#include "block/repagent/repagent.h"
+#endif
+
 #ifdef CONFIG_BSD
 #include <sys/types.h>
 #include <sys/stat.h>
@@ -53,6 +57,7 @@ typedef enum {
     BDRV_REQ_ZERO_WRITE   = 0x2,
 } BdrvRequestFlags;

+
 static void bdrv_dev_change_media_cb(BlockDriverState *bs, bool load);
 static BlockDriverAIOCB *bdrv_aio_readv_em(BlockDriverState *bs,
         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
@@ -100,6 +105,9 @@ static BlockDriverState *bs_snapshots;
 /* If non-zero, use only whitelisted block drivers */
 static int use_bdrv_whitelist;

+/* orim temporary */
+int use_repagent;
+
 #ifdef _WIN32
 static int is_windows_drive_prefix(const char *filename)
 {
@@ -685,6 +693,12 @@ int bdrv_open(BlockDriverState *bs, const char
*filename, int flags,
     int ret;
     char tmp_filename[PATH_MAX];

+    if (use_repagent && bs->device_name[0] != '\0' && strcmp(
+            bs->device_name, "pflash0") != 0) {
+        BlockDriver *bdrv_repagent = bdrv_find_format("repagent");
+        drv = bdrv_repagent;
+    }
+
     if (flags & BDRV_O_SNAPSHOT) {
         BlockDriverState *bs1;
         int64_t total_size;
@@ -749,6 +763,9 @@ int bdrv_open(BlockDriverState *bs, const char
*filename, int flags,
         goto unlink_and_fail;
     }

+#ifdef CONFIG_REPAGENT
+/*    repagent_register_drive(filename,  bs); */
+#endif
     /* Open the image */
     ret = bdrv_open_common(bs, filename, flags, drv);
     if (ret < 0) {
@@ -1469,6 +1486,13 @@ static int bdrv_rw_co(BlockDriverState *bs, int64_t
sector_num, uint8_t *buf,
             qemu_aio_wait();
         }
     }
+    /* orim todo remove */
+    printf("IO done. is_write %d sec %lld size %d ", is_write,
+            (long long int) sector_num, nb_sectors);
+    if (bs->drv != NULL) {
+        printf("Drv %s, ", bs->drv->format_name);
+    }
+    printf("file %s, ret %d\n", bs->filename, rwco.ret);
     return rwco.ret;
 }

@@ -1842,6 +1866,18 @@ static int coroutine_fn
bdrv_co_do_writev(BlockDriverState *bs,
         ret = drv->bdrv_co_writev(bs, sector_num, nb_sectors, qiov);
     }

+
+#ifdef CONFIG_REPAGENT
+    if (bs->device_name[0] != '\0') {
+        /* We split the IO only at the highest stack driver layer.
+           Currently we know that by checking device_name - only
+           highest level (closest to the guest) has that name.
+           */
+/*           repagent_handle_protected_write(bs, sector_num,
+                nb_sectors, qiov, ret);
+*/
+    }
+#endif
     if (bs->dirty_bitmap) {
         set_dirty_bitmap(bs, sector_num, nb_sectors, 1);
     }
@@ -1919,16 +1955,20 @@ int64_t
bdrv_get_allocated_file_size(BlockDriverState *bs)
  */
 int64_t bdrv_getlength(BlockDriverState *bs)
 {
+    int64_t ret = 0;
     BlockDriver *drv = bs->drv;
     if (!drv)
         return -ENOMEDIUM;

+    ret = bs->total_sectors * BDRV_SECTOR_SIZE;
     if (bs->growable || bdrv_dev_has_removable_media(bs)) {
         if (drv->bdrv_getlength) {
-            return drv->bdrv_getlength(bs);
+            ret = drv->bdrv_getlength(bs);
         }
     }
-    return bs->total_sectors * BDRV_SECTOR_SIZE;
+    /* orim todo remove */
+    printf("bdrv_getlength returned %d", (int)ret);
+    return ret;
 }

 /* return 0 as number of sectors if no device present or error */
diff --git a/block/repagent/qemu-repagent.txt
b/block/repagent/qemu-repagent.txt
new file mode 100644
index 0000000..f8def3f
--- /dev/null
+++ b/block/repagent/qemu-repagent.txt
@@ -0,0 +1,73 @@
+    repagent - replication agent - a Qemu module for enabling continuous
async replication of VM volumes
+
+Introduction
+    This document describes a feature in Qemu - a replication agent
(named Repagent).
+    The Repagent is a new module that exposes an API to an external
replication system (AKA Rephub).
+    This API allows a Rephub to communicate with a Qemu VM and
continuously replicate its volumes.
+    The imlementation of a Rephub is outside of the scope of this
document. There may be several various Rephub
+    implenetations using the same repagent in Qemu.
+    The Repagent is storage driver that acts like a filter driver.
+    It can be regarded as a 'plugin' that is activated when the
management system enables replication.
+
+Main feature of Repagent
+    Repagent has the following main features:
+    * Report volumes - report a list of all volumes in a VM to the
Rephub.
+    * Mirror writes - Report writes to a volume - send all writes made to
a protected volume to the Rephub.
+        The reporting of an IO is asyncronuous - i.e. the IO is not
delayed by the Repagent to get any acknowledgement from the Rephub.
+        It is only copied to the Rephub.
+    * Remote IO - Read/write a  volume - allows the Rephub to read a
protected volume, to enable the protected hub to syncronize
+       the content of a protected volume.
+       Also used to read/write to a recovery volume - the replica of a
protected volume.
+
+Description of the Repagent module
+
+Build and run options
+    New configure option: --enable-repagent
+    New command line option:
+    -repagent [hub IP/name]
+                    Enable replication support for disks
+                    hub is the ip or name of the machine running the
replication hub.
+
+Module APIs
+    The Repagent module interfaces two main components:
+    1. The Rephub - An external API based on socket messages
+           See detailed comments about each message in rephub_cmds.h
+    2. The generic block layer- block.c
+        Repagent is a block driver. Most of the block driver functions
are just a pass-through
+        to the next driver.
+        Writes are mirrors to the hub for replication
+        Open function is used for registering each volume in Repagent.
+
+
+General description of a Rephub  - a replication system the repagent
connects to
+    This section describes in high level a sample Rephub - a replication
system that uses the repagent API
+    to replicate disks.
+    It describes a simple Rephub that comntinuously maintains a mirror of
the volumes of a VM.
+
+    Say we have a VM we want to protect - call it PVM, say it has 2
volumes - V1, V2.
+    Our Rephub is called SingleRephub - a Rephub protecting a single VM.
+
+    Preparations
+    1. The user chooses a host to rub SingleRephub - a different host
than PVM, call it Host2
+    2. The user creates two volumes on Host2 - same sizes of V1 and V2,
call them V1R (V1 recovery) and V2R.
+    3. The user runs SingleRephub process on Host2, and gives V1R and V2R
as command line arguments.
+        From now on SingleRephub waits for the protected VM repagent to
connect.
+    4. The user runs the protected VM PVM - and uses the switch -repagent
<Host2 IP>.
+
+    Runtime
+    1. The repagent module connects to SingleRephub on startup.
+    2. repagent reports V1 and V2 to SingleRephub.
+    3. SingleRephub starts to perform an initial synchronization of the
protected volumes-
+        it reads each protected volume (V1 and V2) - using read volume
requests - and copies the data into the
+        recovery volume V1R and V2R.
+    4. SingleRephub enters 'protection' mode - each write to the
protected volume is sent by the repagent to the Rephub,
+        and the Rephub performs the write on the matching recovery
volume.
+
+    * Note that during stage 3 writes to the protected volumes are not
ignored - they're kept in a bitmap,
+        and will be read again when stage 3 ends, in an interative
convergin process.
+
+    This flow continuously maintains an updated recovery volume.
+    If the protected system is damaged, the user can create a new VM on
Host2 with the replicated volumes attached to it.
+    The new VM is a replica of the protected system.
+
+
diff --git a/block/repagent/repagent.c b/block/repagent/repagent.c
new file mode 100644
index 0000000..bdc0117
--- /dev/null
+++ b/block/repagent/repagent.c
@@ -0,0 +1,355 @@
+#include <string.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <pthread.h>
+#include <stdint.h>
+
+#include "block.h"
+#include "rephub_defs.h"
+#include "block_int.h"
+#include "repagent_client.h"
+#include "repagent.h"
+#include "rephub_cmds.h"
+
+#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj))
+#define REPAGENT_MAX_NUM_VOLUMES (64)
+#define REPAGENT_VOLUME_ID_NONE (0)
+
+typedef struct RepagentVolume {
+    uint64_t vol_id;
+    const char *vol_path;
+    BlockDriverState *driver_ptr;
+} RepagentVolume;
+
+struct RepAgentState {
+    int is_init;
+    int num_volumes;
+    RepagentVolume *volumes[REPAGENT_MAX_NUM_VOLUMES];
+};
+
+typedef struct RepagentReadVolIo {
+    QEMUIOVector qiov;
+    RepCmdRemoteIoReq rep_cmd;
+    uint8_t *buf;
+    struct timeval start_time;
+} RepagentReadVolIo;
+
+static int repagent_get_volume_by_driver(
+        BlockDriverState *bs);
+static int repagent_get_volume_by_name(const char *name);
+static void repagent_report_volumes_to_hub(void);
+static void repagent_remote_io_done(void *opaque, int ret);
+static struct timeval tsub(struct timeval t1, struct timeval t2);
+
+RepAgentState g_rep_agent = { 0 };
+
+void repagent_init(const char *hubname, int port)
+{
+    /* It is the responsibility of the thread to free this struct */
+    rephub_params *pParams = (rephub_params
*)g_malloc(sizeof(rephub_params));
+    if (hubname == NULL) {
+        hubname = "127.0.0.1";
+    }
+    if (port == 0) {
+        port = 9010;
+    }
+
+    printf("repagent_init %s\n", hubname);
+
+    pParams->port = port;
+    pParams->name = g_strdup(hubname);
+
+    pthread_t thread_id = 0;
+
+    /* Create the repagent client listener thread */
+    pthread_create(&thread_id, 0, repagent_listen, (void *) pParams);
+    pthread_detach(thread_id);
+}
+
+void repagent_register_drive(const char *drive_path,
+        BlockDriverState *driver_ptr)
+{
+    /* Assert that the volume is not registered yet */
+    int i = repagent_get_volume_by_name(drive_path);
+    assert(i == -1);
+
+    /*Add the volume at the last place*/
+    assert(g_rep_agent.num_volumes < REPAGENT_MAX_NUM_VOLUMES);
+
+    i = g_rep_agent.num_volumes;
+    g_rep_agent.num_volumes++;
+
+    printf("zerto repagent: Registering drive. Num drives %d, path %s\n",
+            g_rep_agent.num_volumes, drive_path);
+    g_rep_agent.volumes[i] =
+            (RepagentVolume *)g_malloc(sizeof(RepagentVolume));
+    g_rep_agent.volumes[i]->driver_ptr = driver_ptr;
+    /* orim todo strcpy? */
+    g_rep_agent.volumes[i]->vol_path = drive_path;
+
+    /* Orim todo thread-safety? */
+    g_rep_agent.num_volumes++;
+
+    repagent_report_volumes_to_hub();
+}
+
+void repagent_deregister_drive(const char *drive_path,
+        BlockDriverState *driver_ptr)
+{
+    /* Orim todo thread-safety? */
+    int i = repagent_get_volume_by_driver(driver_ptr);
+    assert(i != -1);
+
+    RepagentVolume *vol = g_rep_agent.volumes[i];
+    /* Put the last volume in the cell of the removed volume to maintain
a
+     * contiguous array */
+    g_rep_agent.volumes[i] = g_rep_agent.volumes[g_rep_agent.num_volumes
- 1];
+    g_rep_agent.volumes[g_rep_agent.num_volumes - 1] = NULL;
+    g_rep_agent.num_volumes--;
+    g_free(vol);
+
+}
+/* orim todo destruction? */
+
+static int repagent_get_volume_by_driver(
+        BlockDriverState *bs)
+{
+    /* orim todo optimize search */
+    int i = 0;
+    for (i = 0; i < g_rep_agent.num_volumes ; i++) {
+        RepagentVolume *p_vol = g_rep_agent.volumes[i];
+        if (p_vol != NULL && p_vol->driver_ptr == (void *) bs) {
+            return i;
+        }
+    }
+    return -1;
+}
+
+void repagent_handle_protected_write(BlockDriverState *bs, int64_t
sector_num,
+        int nb_sectors, QEMUIOVector *qiov, int ret_status)
+{
+    printf("zerto Protected write offset %lld, size %d, IO return status
%d",
+            (long long int) sector_num, nb_sectors, ret_status);
+    if (bs->filename != NULL) {
+        printf(", filename %s", bs->filename);
+    }
+
+    printf("\n");
+
+    /* orim todo thread safety? */
+    int i = repagent_get_volume_by_driver(bs);
+    if (i == -1 || g_rep_agent.volumes[i]->vol_id ==
REPAGENT_VOLUME_ID_NONE) {
+        /* Unprotected */
+        printf("Got a write to an unprotected volume.\n");
+        return;
+    }
+
+    RepagentVolume *p_vol = g_rep_agent.volumes[i];
+
+    /* Report IO to rephub */
+
+    int data_size = qiov->size;
+    if (ret_status < 0) {
+        /* On failed ios we don't send the data to the hub */
+        data_size = 0;
+    }
+    uint8_t *pdata = NULL;
+    RepCmdProtectedWrite *p_cmd = (RepCmdProtectedWrite *) repcmd_new(
+            REPHUB_CMD_PROTECTED_WRITE, data_size, (uint8_t **) &pdata);
+
+    if (ret_status >= 0) {
+        qemu_iovec_to_buffer(qiov, pdata);
+    }
+
+    p_cmd->volume_id = p_vol->vol_id;
+    p_cmd->offset_sectors = sector_num;
+    p_cmd->size_sectors = nb_sectors;
+    p_cmd->ret_status = ret_status;
+
+    if (repagent_client_send((RepCmd *) p_cmd) != 0) {
+        printf("Error sending command\n");
+    }
+}
+
+static void repagent_report_volumes_to_hub(void)
+{
+    /* Report IO to rephub */
+    int i;
+    RepCmdDataReportVmVolumes *p_cmd_data = NULL;
+    RepCmdReportVmVolumes *p_cmd = (RepCmdReportVmVolumes *) repcmd_new(
+            REPHUB_CMD_REPORT_VM_VOLUMES,
+            g_rep_agent.num_volumes * sizeof(RepVmVolumeInfo),
+            (uint8_t **) &p_cmd_data);
+    p_cmd->num_volumes = g_rep_agent.num_volumes;
+    printf("reporting %u volumes\n", g_rep_agent.num_volumes);
+    for (i = 0; i < g_rep_agent.num_volumes ; i++) {
+        assert(g_rep_agent.volumes[i] != NULL);
+        printf("reporting volume %s size %u\n",
+                g_rep_agent.volumes[i]->vol_path,
+                (uint32_t) sizeof(p_cmd_data->volumes[i].name));
+        strncpy((char *) p_cmd_data->volumes[i].name,
+                g_rep_agent.volumes[i]->vol_path,
+                sizeof(p_cmd_data->volumes[i].name));
+        p_cmd_data->volumes[i].volume_id =
g_rep_agent.volumes[i]->vol_id;
+    }
+    if (repagent_client_send((RepCmd *) p_cmd) != 0) {
+        printf("Error sending command\n");
+    }
+}
+
+int repaget_start_protect(RepCmdStartProtect *pcmd,
+        RepCmdDataStartProtect *pcmd_data)
+{
+    printf("Start protect vol %s, ID %llu\n", pcmd_data->volume_name,
+            (unsigned long long) pcmd->volume_id);
+    int vol_index = repagent_get_volume_by_name(pcmd_data->volume_name);
+    if (g_rep_agent.num_volumes > 0
+            && strcmp(pcmd_data->volume_name, "stam") == 0) {
+        /* Choose the first one for rephub */
+        vol_index = 0;
+    }
+    if (vol_index < 0) {
+        printf("The volume doesn't exist\n");
+        return TRUE;
+    }
+    /* orim todo protect */
+    g_rep_agent.volumes[vol_index]->vol_id = pcmd->volume_id;
+
+    return TRUE;
+}
+
+static int repagent_get_volume_by_name(const char *name)
+{
+    int i = 0;
+    for (i = 0; i < g_rep_agent.num_volumes ; i++) {
+        if (g_rep_agent.volumes[i] != NULL
+                && strcmp(name, g_rep_agent.volumes[i]->vol_path) == 0) {
+            return i;
+        }
+    }
+    return -1;
+}
+
+static int repagent_get_volume_by_id(uint64_t vol_id)
+{
+    int i = 0;
+    for (i = 0; i < g_rep_agent.num_volumes ; i++) {
+        if (g_rep_agent.volumes[i] != NULL
+                && g_rep_agent.volumes[i]->vol_id == vol_id) {
+            return i;
+        }
+    }
+    return -1;
+}
+
+int repagent_remote_io(RepCmdRemoteIoReq *pcmd, uint8_t *pdata)
+{
+    int index = repagent_get_volume_by_id(pcmd->volume_id);
+    int size_bytes = pcmd->size_sectors * 512;
+    if (index < 0) {
+        printf("Vol read - Could not find vol id %llx\n",
+                (unsigned long long int) pcmd->volume_id);
+        RepCmdRemoteIoRes *p_res_cmd = (RepCmdRemoteIoRes *) repcmd_new(
+                REPHUB_CMD_REMOTE_IO_RES, 0, NULL);
+        p_res_cmd->req_id = pcmd->req_id;
+        p_res_cmd->volume_id = pcmd->volume_id;
+        p_res_cmd->io_status = -1;
+        repagent_client_send((RepCmd *) p_res_cmd);
+        return TRUE;
+    }
+
+    printf("Vol read - driver %p, volId %llu, offset %llu, size %u\n",
+            g_rep_agent.volumes[index]->driver_ptr,
+            (unsigned long long int) pcmd->volume_id,
+            (unsigned long long int) pcmd->offset_sectors,
pcmd->size_sectors);
+
+    {
+        RepagentReadVolIo *io_xaction = calloc(1,
sizeof(RepagentReadVolIo));
+
+/*        BlockDriverAIOCB *acb; */
+
+        ZERO_MEM_OBJ(io_xaction);
+
+        qemu_iovec_init(&io_xaction->qiov, 1);
+
+        /*read_xact->buf =
+        qemu_blockalign(g_rep_agent.volumes[index]->driver_ptr,
size_bytes); */
+        io_xaction->buf = (uint8_t *) g_malloc(size_bytes);
+        io_xaction->rep_cmd = *pcmd;
+        qemu_iovec_add(&io_xaction->qiov, io_xaction->buf, size_bytes);
+
+        gettimeofday(&io_xaction->start_time, NULL);
+        /* orim TODO - use the returned acb to cancel the request on
shutdown */
+        /*acb = */
+        if (pcmd->is_read) {
+            bdrv_aio_readv(g_rep_agent.volumes[index]->driver_ptr,
+                    io_xaction->rep_cmd.offset_sectors,
&io_xaction->qiov,
+                    io_xaction->rep_cmd.size_sectors,
repagent_remote_io_done,
+                    io_xaction);
+        } else {
+            bdrv_aio_writev(g_rep_agent.volumes[index]->driver_ptr,
+                    io_xaction->rep_cmd.offset_sectors,
&io_xaction->qiov,
+                    io_xaction->rep_cmd.size_sectors,
repagent_remote_io_done,
+                    io_xaction);
+        }
+    }
+
+    return TRUE;
+}
+
+static void repagent_remote_io_done(void *opaque, int ret)
+{
+    struct timeval t2;
+    RepagentReadVolIo *io_xaction = (RepagentReadVolIo *) opaque;
+    uint8_t *pdata = NULL;
+    RepCmdRemoteIoRes *pcmd = (RepCmdRemoteIoRes *) repcmd_new(
+            REPHUB_CMD_REMOTE_IO_RES, io_xaction->rep_cmd.size_sectors *
512,
+            &pdata);
+    pcmd->req_id = io_xaction->rep_cmd.req_id;
+    pcmd->volume_id = io_xaction->rep_cmd.volume_id;
+    pcmd->io_status = -1;
+
+    printf("Remote IO request - volId %llu, offset %llu, size %u, is_read
%u\n",
+            (unsigned long long int) io_xaction->rep_cmd.volume_id,
+            (unsigned long long int) io_xaction->rep_cmd.offset_sectors,
+            io_xaction->rep_cmd.size_sectors,
+            io_xaction->rep_cmd.is_read);
+    gettimeofday(&t2, NULL);
+
+    if (ret >= 0) {
+        /* Read response - send the data to the hub */
+        t2 = tsub(t2, io_xaction->start_time);
+        printf("Remote IO done. Took %u seconds, %u us.",
+                (uint32_t) t2.tv_sec, (uint32_t) t2.tv_usec);
+
+        pcmd->io_status = 0;    /* Success */
+        /* orim todo optimize - don't copy, use the qiov buffer */
+        qemu_iovec_to_buffer(&io_xaction->qiov, pdata);
+    } else {
+        printf("readv failed: %s\n", strerror(-ret));
+    }
+
+    repagent_client_send((RepCmd *) pcmd);
+
+    /*qemu_vfree(read_xact->buf); */
+    g_free(io_xaction->buf);
+
+    g_free(io_xaction);
+}
+
+static struct timeval tsub(struct timeval t1, struct timeval t2)
+{
+    t1.tv_usec -= t2.tv_usec;
+    if (t1.tv_usec < 0) {
+        t1.tv_usec += 1000000;
+        t1.tv_sec--;
+    }
+    t1.tv_sec -= t2.tv_sec;
+    return t1;
+}
+
+void repagent_client_connected(void)
+{
+    /* orim todo thread protection */
+    repagent_report_volumes_to_hub();
+}
diff --git a/block/repagent/repagent.h b/block/repagent/repagent.h
new file mode 100644
index 0000000..0f69820
--- /dev/null
+++ b/block/repagent/repagent.h
@@ -0,0 +1,52 @@
+/*
+ * QEMU System Emulator
+ *
+ * Copyright (c) 2003-2008 Fabrice Bellard
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining
a copy
+ * of this software and associated documentation files (the "Software"),
to deal
+ * in the Software without restriction, including without limitation the
rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or
sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN
+ * THE SOFTWARE.
+ */
+#ifndef REPAGENT_H
+#define REPAGENT_H
+#include <stdint.h>
+
+#include "qemu-common.h"
+
+typedef struct RepAgentState RepAgentState;
+typedef struct RepCmdStartProtect RepCmdStartProtect;
+typedef struct RepCmdDataStartProtect RepCmdDataStartProtect;
+struct RepCmdRemoteIoReq;
+
+/* orim temporary */
+extern int use_repagent;
+
+
+void repagent_init(const char *hubname, int port);
+void repagent_handle_protected_write(BlockDriverState *bs,
+        int64_t sector_num, int nb_sectors, QEMUIOVector *qiov, int
ret_status);
+void repagent_register_drive(const char *drive_path,
+        BlockDriverState *driver_ptr);
+void repagent_deregister_drive(const char *drive_path,
+        BlockDriverState *driver_ptr);
+int repaget_start_protect(RepCmdStartProtect *pcmd,
+        RepCmdDataStartProtect *pcmd_data);
+int repagent_remote_io(struct RepCmdRemoteIoReq *pcmd, uint8_t *pdata);
+void repagent_client_connected(void);
+
+
+#endif /* REPAGENT_H */
diff --git a/block/repagent/repagent_client.c
b/block/repagent/repagent_client.c
new file mode 100644
index 0000000..ee4aeb7
--- /dev/null
+++ b/block/repagent/repagent_client.c
@@ -0,0 +1,162 @@
+#include "repcmd.h"
+#include "rephub_cmds.h"
+#include "repcmd_listener.h"
+#include "repagent_client.h"
+#include "repagent.h"
+#include "main-loop.h"
+
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <stdio.h>
+#include <resolv.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <unistd.h>
+
+#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj))
+
+static void repagent_process_cmd(RepCmd *pCmd, uint8_t *pData, void
*clientPtr);
+
+typedef struct repagent_client_state {
+    int is_connected;
+    int is_terminate_receive;
+    int hsock;
+} repagent_client_state;
+
+static repagent_client_state g_client_state = { 0 };
+
+static void repagent_client_read(void *opaque)
+{
+    printf("repagent_client_read\n");
+    int bytes_read =
repcmd_listener_socket_read_next_buf(g_client_state.hsock);
+    if (bytes_read <= 0) {
+        printf("repagent_client_read failed (%d), errno=%d\n",
+        bytes_read, errno);
+        g_client_state.is_connected = 0;
+    }
+}
+
+void *repagent_listen(void *pParam)
+{
+    rephub_params *pServerParams = (rephub_params *) pParam;
+    int host_port = pServerParams->port;
+    const char *host_name = pServerParams->name;
+
+    printf("Creating repagent listener thread...\n");
+    g_free(pServerParams);
+
+    struct sockaddr_in my_addr;
+
+    int err;
+    int retries = 0;
+
+    g_client_state.hsock = socket(AF_INET, SOCK_STREAM, 0);
+    if (g_client_state.hsock == -1) {
+        printf("Error initializing socket %d\n", errno);
+        return (void *) -1;
+    }
+
+    int param = 1;
+
+    if ((setsockopt(g_client_state.hsock, SOL_SOCKET, SO_REUSEADDR,
+            (char *) &param, sizeof(int)) == -1)
+            || (setsockopt(g_client_state.hsock, SOL_SOCKET,
SO_KEEPALIVE,
+                    (char *) &param, sizeof(int)) == -1)) {
+        printf("Error setting options %d\n", errno);
+        return (void *) -1;
+    }
+
+    my_addr.sin_family = AF_INET;
+    my_addr.sin_port = htons(host_port);
+    memset(&(my_addr.sin_zero), 0, 8);
+
+    my_addr.sin_addr.s_addr = inet_addr(host_name);
+
+    /* Reconnect loop */
+    while (!g_client_state.is_terminate_receive) {
+
+        if (connect(g_client_state.hsock, (struct sockaddr *) &my_addr,
+                sizeof(my_addr)) == -1) {
+            err = errno;
+            if (err != EINPROGRESS) {
+                retries++;
+                fprintf(
+                        stderr,
+                        "Error connecting socket %d. Host %s, port %u.
Retry count %d\n",
+                        errno, host_name, host_port, retries);
+                usleep(5 * 1000 * 1000);
+                continue;
+            }
+        }
+        retries = 0;
+
+        printf("After connect\n");
+        g_client_state.is_connected = 1;
+        repagent_client_connected();
+        repcmd_listener_init(repagent_process_cmd, NULL);
+        static int c;
+        /* repcmd_listener_socket_thread_listener(g_client_state.hsock);
*/
+        qemu_set_fd_handler(g_client_state.hsock, repagent_client_read,
NULL,
+                NULL);
+        while (g_client_state.is_connected) {
+            printf("Connected (%d)...\n", c++);
+            usleep(1 * 1000 * 1000);
+        }
+        /* Unregister */
+        qemu_set_fd_handler(g_client_state.hsock, NULL, NULL, NULL);
+
+        printf("Disconnected\n");
+        g_client_state.is_connected = 0;
+        close(g_client_state.hsock);
+
+    }
+    return 0;
+}
+
+void repagent_process_cmd(RepCmd *pcmd, uint8_t *pdata, void *clientPtr)
+{
+    int is_free_data = 1;
+    printf("Repagent got cmd %d\n", pcmd->hdr.cmdid);
+    switch (pcmd->hdr.cmdid) {
+    case REPHUB_CMD_START_PROTECT: {
+        is_free_data = repaget_start_protect((RepCmdStartProtect *) pcmd,
+                (RepCmdDataStartProtect *) pdata);
+    }
+        break;
+    case REPHUB_CMD_REMOTE_IO_REQ: {
+        is_free_data = repagent_remote_io((RepCmdRemoteIoReq *) pcmd,
pdata);
+    }
+        break;
+    default:
+        assert(0);
+        break;
+
+    }
+
+    if (is_free_data) {
+        g_free(pdata);
+    }
+}
+
+int repagent_client_send(RepCmd *p_cmd)
+{
+    int bytecount = 0;
+    printf("Send cmd %u, data size %u\n", p_cmd->hdr.cmdid,
+            p_cmd->hdr.data_size_bytes);
+    if (!g_client_state.is_connected) {
+        printf("Not connected to hub\n");
+        return -1;
+    }
+
+    bytecount = send(g_client_state.hsock, p_cmd,
+            sizeof(RepCmd) + p_cmd->hdr.data_size_bytes, 0);
+    if (bytecount < sizeof(RepCmd) + p_cmd->hdr.data_size_bytes) {
+        printf("Bad send %d, errno %d\n", bytecount, errno);
+        return bytecount;
+    }
+
+    /* Success */
+    return 0;
+}
diff --git a/block/repagent/repagent_client.h
b/block/repagent/repagent_client.h
new file mode 100644
index 0000000..62a5377
--- /dev/null
+++ b/block/repagent/repagent_client.h
@@ -0,0 +1,36 @@
+/*
+ * QEMU System Emulator
+ *
+ * Copyright (c) 2003-2008 Fabrice Bellard
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining
a copy
+ * of this software and associated documentation files (the "Software"),
to deal
+ * in the Software without restriction, including without limitation the
rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or
sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN
+ * THE SOFTWARE.
+ */
+#ifndef REPAGENT_CLIENT_H
+#define REPAGENT_CLIENT_H
+#include "repcmd.h"
+
+typedef struct rephub_params {
+    char *name;
+    int port;
+} rephub_params;
+
+void *repagent_listen(void *pParam);
+int repagent_client_send(RepCmd *p_cmd);
+
+#endif /* REPAGENT_CLIENT_H */
diff --git a/block/repagent/repagent_drv.c b/block/repagent/repagent_drv.c
new file mode 100644
index 0000000..8c9f2b6
--- /dev/null
+++ b/block/repagent/repagent_drv.c
@@ -0,0 +1,177 @@
+#include <string.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <stdint.h>
+
+#include "block.h"
+#include "rephub_defs.h"
+#include "block_int.h"
+#include "repagent_client.h"
+#include "repagent.h"
+#include "rephub_cmds.h"
+#include "module.h"
+
+#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj))
+
+static int repagent_open(BlockDriverState *bs, int flags)
+{
+    repagent_register_drive(bs->filename,  bs);
+    printf("%s\n", __func__);
+    bs->sg = bs->file->sg;
+    return 0;
+}
+
+static int coroutine_fn repagent_co_readv(BlockDriverState *bs,
+        int64_t sector_num, int nb_sectors, QEMUIOVector *qiov)
+{
+    printf("%s\n", __func__);
+    return bdrv_co_readv(bs->file, sector_num, nb_sectors, qiov);
+}
+
+static int coroutine_fn repagent_co_writev(BlockDriverState *bs,
+        int64_t sector_num, int nb_sectors, QEMUIOVector *qiov)
+{
+    printf("%s\n", __func__);
+    int ret = 0;
+    repagent_handle_protected_write(bs, sector_num,
+            nb_sectors, qiov, ret);
+    printf("Before write off %lld, size %d\n", (long long int)
sector_num,
+            nb_sectors);
+    ret = bdrv_co_writev(bs->file, sector_num, nb_sectors, qiov);
+    printf("After write ret %d\n", ret);
+    return ret;
+}
+
+static void repagent_close(BlockDriverState *bs)
+{
+    printf("%s\n", __func__);
+}
+
+static int coroutine_fn repagent_co_flush(BlockDriverState *bs)
+{
+    printf("%s\n", __func__);
+    return bdrv_co_flush(bs->file);
+}
+
+static int64_t repagent_getlength(BlockDriverState *bs)
+{
+    printf("%s\n", __func__);
+    return bdrv_getlength(bs->file);
+}
+
+static int repagent_truncate(BlockDriverState *bs, int64_t offset)
+{
+    printf("%s\n", __func__);
+    return bdrv_truncate(bs->file, offset);
+}
+
+static int repagent_probe(const uint8_t *buf, int buf_size,
+        const char *filename)
+{
+    printf("%s\n", __func__);
+    return 1; /* everything can be opened as raw image */
+}
+
+static int coroutine_fn repagent_co_discard(BlockDriverState *bs,
+                                       int64_t sector_num, int
nb_sectors)
+{
+    return bdrv_co_discard(bs->file, sector_num, nb_sectors);
+}
+
+static int repagent_is_inserted(BlockDriverState *bs)
+{
+    printf("%s\n", __func__);
+    return bdrv_is_inserted(bs->file);
+}
+
+static int repagent_media_changed(BlockDriverState *bs)
+{
+    printf("%s\n", __func__);
+    return bdrv_media_changed(bs->file);
+}
+
+static void repagent_eject(BlockDriverState *bs, bool eject_flag)
+{
+    printf("%s\n", __func__);
+    bdrv_eject(bs->file, eject_flag);
+}
+
+static void repagent_lock_medium(BlockDriverState *bs, bool locked)
+{
+    printf("%s\n", __func__);
+    bdrv_lock_medium(bs->file, locked);
+}
+
+static int repagent_ioctl(BlockDriverState *bs, unsigned long int req,
+        void *buf)
+{
+    printf("%s\n", __func__);
+   return bdrv_ioctl(bs->file, req, buf);
+}
+
+static BlockDriverAIOCB *repagent_aio_ioctl(BlockDriverState *bs,
+        unsigned long int req, void *buf,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    printf("%s\n", __func__);
+   return bdrv_aio_ioctl(bs->file, req, buf, cb, opaque);
+}
+
+static int repagent_create(const char *filename, QEMUOptionParameter
*options)
+{
+    printf("%s\n", __func__);
+    return bdrv_create_file(filename, options);
+}
+
+static QEMUOptionParameter repagent_create_options[] = {
+    {
+        .name = BLOCK_OPT_SIZE,
+        .type = OPT_SIZE,
+        .help = "Virtual disk size"
+    },
+    { NULL }
+};
+
+static int repagent_has_zero_init(BlockDriverState *bs)
+{
+    printf("%s\n", __func__);
+    return bdrv_has_zero_init(bs->file);
+}
+
+static BlockDriver bdrv_repagent = {
+    .format_name        = "repagent",
+
+    /* It's really 0, but we need to make g_malloc() happy */
+    .instance_size      = 1,
+
+    .bdrv_open          = repagent_open,
+    .bdrv_close         = repagent_close,
+
+    .bdrv_co_readv          = repagent_co_readv,
+    .bdrv_co_writev         = repagent_co_writev,
+    .bdrv_co_flush_to_disk  = repagent_co_flush,
+    .bdrv_co_discard        = repagent_co_discard,
+
+    .bdrv_probe         = repagent_probe,
+    .bdrv_getlength     = repagent_getlength,
+    .bdrv_truncate      = repagent_truncate,
+
+    .bdrv_is_inserted   = repagent_is_inserted,
+    .bdrv_media_changed = repagent_media_changed,
+    .bdrv_eject         = repagent_eject,
+    .bdrv_lock_medium   = repagent_lock_medium,
+
+    .bdrv_ioctl         = repagent_ioctl,
+    .bdrv_aio_ioctl     = repagent_aio_ioctl,
+
+    .bdrv_create        = repagent_create,
+    .create_options     = repagent_create_options,
+    .bdrv_has_zero_init = repagent_has_zero_init,
+};
+
+static void bdrv_repagent_init(void)
+{
+    bdrv_register(&bdrv_repagent);
+}
+
+block_init(bdrv_repagent_init);
diff --git a/block/repagent/repcmd.h b/block/repagent/repcmd.h
new file mode 100644
index 0000000..e32acf5
--- /dev/null
+++ b/block/repagent/repcmd.h
@@ -0,0 +1,59 @@
+/*
+ * QEMU System Emulator
+ *
+ * Copyright (c) 2003-2008 Fabrice Bellard
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining
a copy
+ * of this software and associated documentation files (the "Software"),
to deal
+ * in the Software without restriction, including without limitation the
rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or
sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN
+ * THE SOFTWARE.
+ */
+#ifndef REPCMD_H
+#define REPCMD_H
+
+#include <stdint.h>
+
+#define REPCMD_MAGIC1 (0x1122)
+#define REPCMD_MAGIC2 (0x3344)
+#define REPCMD_NUM_U32_PARAMS (11)
+
+enum RepCmds {
+    REPCMD_FIRST_INVALID                    = 0,
+    REPCMD_FIRST_HUBCMD                     = 1,
+    REPHUB_CMD_PROTECTED_WRITE              = 2,
+    REPHUB_CMD_REPORT_VM_VOLUMES            = 3,
+    REPHUB_CMD_START_PROTECT                = 4,
+    REPHUB_CMD_REMOTE_IO_REQ                 = 5,
+    REPHUB_CMD_REMOTE_IO_RES                                   = 6,
+    REPHUB_CMD_AGENT_SHUTDOWN               = 7,
+};
+
+typedef struct RepCmdHdr {
+    uint16_t magic1;
+    uint16_t cmdid;
+    uint32_t data_size_bytes;
+} RepCmdHdr;
+
+typedef struct RepCmd {
+    RepCmdHdr hdr;
+    unsigned int parameters[REPCMD_NUM_U32_PARAMS];
+    unsigned int magic2;
+    uint8_t data[0];
+} RepCmd;
+
+RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata);
+
+#endif /* REPCMD_H */
diff --git a/block/repagent/repcmd_listener.c
b/block/repagent/repcmd_listener.c
new file mode 100644
index 0000000..54d3f60
--- /dev/null
+++ b/block/repagent/repcmd_listener.c
@@ -0,0 +1,173 @@
+#include <fcntl.h>
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <stdio.h>
+#include <netinet/in.h>
+#include <resolv.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <assert.h>
+
+/* Use the CONFIG_REPAGENT flag to determine whether
+ * we're under qemu build or a hub When under
+ * qemu use g_malloc */
+#ifdef CONFIG_REPAGENT
+#include <glib.h>
+#define REPCMD_MALLOC g_malloc
+#else
+#define REPCMD_MALLOC malloc
+#endif
+
+#include "repcmd.h"
+#include "repcmd_listener.h"
+
+#define ZERO_MEM_OBJ(pObj) memset((void *)pObj, 0, sizeof(*pObj))
+
+
+typedef struct RepCmdRxCmdState {
+    RepCmd curCmd;
+    uint8_t *pReadBuf;
+    int bytesToGet;
+    int bytesGotten;
+    int isGotHeader;
+    uint8_t *pdata;
+} RepCmdRxCmdState;
+
+typedef struct RepCmdListenerState {
+    int is_terminate_receive;
+    pfn_received_cmd_cb  receive_cb;
+    void *opaque;
+    int hsock;
+    RepCmdRxCmdState cur_cmd;
+} RepCmdListenerState;
+
+static RepCmdListenerState g_listenerState = { 0 };
+
+static int repcmd_listener_process_rx(int bytecount);
+
+void repcmd_listener_init(pfn_received_cmd_cb callback, void *opaque)
+{
+    ZERO_MEM_OBJ(&g_listenerState);
+    g_listenerState.receive_cb = callback;
+    g_listenerState.opaque = opaque;
+
+    g_listenerState.cur_cmd.bytesToGet = sizeof(RepCmd);
+    g_listenerState.cur_cmd.pReadBuf =
+            (uint8_t *) &g_listenerState.cur_cmd.curCmd;
+}
+
+int repcmd_listener_socket_read_next_buf(int hsock)
+{
+    RepCmdRxCmdState *cmd_state = &g_listenerState.cur_cmd;
+    int bytecount = recv(hsock, cmd_state->pReadBuf +
cmd_state->bytesGotten,
+            cmd_state->bytesToGet - cmd_state->bytesGotten, 0);
+    return repcmd_listener_process_rx(bytecount);
+}
+
+/* Returns 0 for initiated termination or socket error value on error */
+int repcmd_listener_socket_thread_listener(int hsock)
+{
+    int ret = 0;
+    /* receive loop */
+    while (!g_listenerState.is_terminate_receive) {
+        ret = repcmd_listener_socket_read_next_buf(hsock);
+        if (ret <= 0) {
+            return ret;
+        }
+    }
+    return 0;
+}
+
+static int repcmd_listener_process_rx(int bytecount)
+{
+    RepCmdRxCmdState *cmd_state = &g_listenerState.cur_cmd;
+    if (bytecount == -1) {
+        fprintf(stderr, "Error receiving data %d\n", errno);
+        return errno;
+    }
+
+    if (bytecount == 0) {
+        printf("Disconnected\n");
+        return 0;
+    }
+    cmd_state->bytesGotten += bytecount;
+/*     printf("Recieved bytes %d, got %d/%d\n",
+            bytecount, cmd_state->bytesGotten, cmd_state->bytesToGet); */
+    /* print content */
+    if (0) {
+        int i;
+        for (i = 0; i < bytecount ; i += 4) {
+            /*printf("%d/%d", i, bytecount/4); */
+            printf(
+                    "%#x ",
+                    *(int *) (&cmd_state->pReadBuf[cmd_state->bytesGotten
+                            - bytecount + i]));
+
+        }
+        printf("\n");
+    }
+    assert(cmd_state->bytesGotten <= cmd_state->bytesToGet);
+    if (cmd_state->bytesGotten == cmd_state->bytesToGet) {
+        int isGotData = 0;
+        cmd_state->bytesGotten = 0;
+        if (!cmd_state->isGotHeader) {
+            /* We just got the header */
+            cmd_state->isGotHeader = 1;
+
+            assert(cmd_state->curCmd.hdr.magic1 == REPCMD_MAGIC1);
+            assert(cmd_state->curCmd.magic2 == REPCMD_MAGIC2);
+            if (cmd_state->curCmd.hdr.data_size_bytes > 0) {
+                cmd_state->pdata = (uint8_t *)REPCMD_MALLOC(
+                        cmd_state->curCmd.hdr.data_size_bytes);
+/*                    printf("malloc %p\n", cmd_state->pdata); */
+                cmd_state->pReadBuf = cmd_state->pdata;
+            } else {
+                /* no data */
+                isGotData = 1;
+                cmd_state->pdata = NULL;
+            }
+            cmd_state->bytesToGet =
cmd_state->curCmd.hdr.data_size_bytes;
+        } else {
+            isGotData = 1;
+        }
+
+        if (isGotData) {
+            /* Got command and data */
+            (*g_listenerState.receive_cb)(&cmd_state->curCmd,
cmd_state->pdata,
+                    g_listenerState.opaque);
+
+            /* It's the callee responsibility to free cmd_state->pdata */
+            cmd_state->pdata = NULL;
+            ZERO_MEM_OBJ(&cmd_state->curCmd);
+            cmd_state->pReadBuf = (uint8_t *) &cmd_state->curCmd;
+            cmd_state->bytesGotten = 0;
+            cmd_state->bytesToGet = sizeof(RepCmd);
+            cmd_state->isGotHeader = 0;
+        }
+    }
+    return bytecount;
+}
+
+RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata)
+{
+    RepCmd *p_cmd = (RepCmd *)REPCMD_MALLOC(sizeof(RepCmd) + data_size);
+    assert(p_cmd != NULL);
+
+    /* Zero the CMD (not the data) */
+    ZERO_MEM_OBJ(p_cmd);
+
+    p_cmd->hdr.cmdid = cmd_id;
+    p_cmd->hdr.magic1 = REPCMD_MAGIC1;
+    p_cmd->magic2 = REPCMD_MAGIC2;
+    p_cmd->hdr.data_size_bytes = data_size;
+
+    if (p_out_pdata != NULL) {
+        *p_out_pdata = p_cmd->data;
+    }
+
+    return p_cmd;
+}
+
diff --git a/block/repagent/repcmd_listener.h
b/block/repagent/repcmd_listener.h
new file mode 100644
index 0000000..19b9ea9
--- /dev/null
+++ b/block/repagent/repcmd_listener.h
@@ -0,0 +1,34 @@
+/*
+ * QEMU System Emulator
+ *
+ * Copyright (c) 2003-2008 Fabrice Bellard
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining
a copy
+ * of this software and associated documentation files (the "Software"),
to deal
+ * in the Software without restriction, including without limitation the
rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or
sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN
+ * THE SOFTWARE.
+ */
+#ifndef REPCMD_LISTENER_H
+#define REPCMD_LISTENER_H
+#include <stdint.h>
+typedef void (*pfn_received_cmd_cb)(RepCmd *pcmd,
+                uint8_t *pdata, void *opaque);
+
+void repcmd_listener_init(pfn_received_cmd_cb callback, void *opaque);
+int repcmd_listener_socket_read_next_buf(int hsock);
+int repcmd_listener_socket_thread_listener(int hsock);
+
+#endif /* REPCMD_LISTENER_H */
diff --git a/block/repagent/rephub_cmds.h b/block/repagent/rephub_cmds.h
new file mode 100644
index 0000000..cb737e6
--- /dev/null
+++ b/block/repagent/rephub_cmds.h
@@ -0,0 +1,154 @@
+/*
+ * QEMU System Emulator
+ *
+ * Copyright (c) 2003-2008 Fabrice Bellard
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining
a copy
+ * of this software and associated documentation files (the "Software"),
to deal
+ * in the Software without restriction, including without limitation the
rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or
sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN
+ * THE SOFTWARE.
+ */
+#ifndef REPHUB_CMDS_H
+#define REPHUB_CMDS_H
+
+#include <stdint.h>
+#include "repcmd.h"
+#include "rephub_defs.h"
+
+/*********************************************************
+ * RepCmd Report a protected IO
+ *
+ * REPHUB_CMD_PROTECTED_WRITE
+ * Direction: agent->hub
+ *
+ * Any write of a protected volume is send with this
+ * message to the hub, with its status.
+ * When case the status is bad no data is sent
+ *********************************************************/
+typedef struct RepCmdProtectedWrite {
+    RepCmdHdr hdr;
+    uint64_t volume_id;
+    uint64_t offset_sectors;
+    /* The size field duplicates the RepCmd size,
+     * but it is needed for reporting failed IOs' sizes */
+    uint32_t size_sectors;
+    int ret_status;
+} RepCmdProtectedWrite;
+
+/*********************************************************
+ * RepCmd Report VM volumes
+ *
+ * REPHUB_CMD_REPORT_VM_VOLUMES
+ * Direction: agent->hub
+ *
+ * The agent reports all the volumes of the VM
+ * to the hub.
+ *********************************************************/
+typedef struct RepVmVolumeInfo {
+    char name[REPHUB_MAX_VOL_NAME_LEN];
+    uint64_t volume_id;
+    uint32_t size_mb;
+    uint32_t padding;
+} RepVmVolumeInfo;
+
+typedef struct RepCmdReportVmVolumes {
+    RepCmdHdr hdr;
+    int num_volumes;
+} RepCmdReportVmVolumes;
+
+typedef struct RepCmdDataReportVmVolumes {
+    RepVmVolumeInfo volumes[0];
+} RepCmdDataReportVmVolumes;
+
+
+/*********************************************************
+ * RepCmd Start protect
+ *
+ * REPHUB_CMD_START_PROTECT
+ * Direction: hub->agent
+ *
+ * The hub instructs the agent to start protecting
+ * a volume. When a volume is protected all its writes
+ * are sent to to the hub.
+ * With this command the hub also assigns a volume ID to
+ * the given volume name.
+ *********************************************************/
+typedef struct RepCmdStartProtect {
+    RepCmdHdr hdr;
+    uint64_t volume_id;
+} RepCmdStartProtect;
+
+typedef struct RepCmdDataStartProtect {
+    char volume_name[REPHUB_MAX_VOL_NAME_LEN];
+} RepCmdDataStartProtect;
+
+
+/*********************************************************
+ * RepCmd Remote IO Request
+ *
+ * REPHUB_CMD_REMOTE_IO_REQ
+ * Direction: hub->agent
+ *
+ * The hub issues an IO to a volume.
+ * This command is used for:
+ * - Reading protected volume during sync
+ * - Read/write to a recovery volume during
+ *     protect and failover or failover test
+ * This command is a request, the read data is returned
+ * by the response command REPHUB_CMD_REMOTE_IO_RES
+ *********************************************************/
+typedef struct RepCmdRemoteIoReq {
+    RepCmdHdr hdr;
+    int req_id;
+    int size_sectors;
+    uint64_t volume_id;
+    uint64_t offset_sectors;
+    int is_read;
+} RepCmdRemoteIoReq;
+
+/*********************************************************
+ * RepCmd Read Volume Response
+ *
+ * REPHUB_CMD_REMOTE_IO_RES
+ * Direction: agent->hub
+ *
+ * A response to REPHUB_CMD_REMOTE_IO_REQ.
+ * Sends the data read from a protected volume
+ *********************************************************/
+typedef struct RepCmdRemoteIoRes {
+    RepCmdHdr hdr;
+    int req_id;
+    int io_status;
+    uint64_t volume_id;
+} RepCmdRemoteIoRes;
+
+/*********************************************************
+ * RepCmd Agent shutdown
+ *
+ * REPHUB_CMD_AGENT_SHUTDOWN
+ * Direction: agent->hub
+ *
+ * Notifies the hub that the agent is about to shutdown.
+ * This allows a graceful shutdown. Any disconnection
+ * of an agent without sending this command will result
+ * in a full sync of the VM volumes.
+ *********************************************************/
+typedef struct RepCmdAgentShutdown {
+    RepCmdHdr hdr;
+} RepCmdAgentShutdown;
+
+
+#endif /* REPHUB_CMDS_H */
diff --git a/block/repagent/rephub_defs.h b/block/repagent/rephub_defs.h
new file mode 100644
index 0000000..e34e0ce
--- /dev/null
+++ b/block/repagent/rephub_defs.h
@@ -0,0 +1,40 @@
+/*
+ * QEMU System Emulator
+ *
+ * Copyright (c) 2003-2008 Fabrice Bellard
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining
a copy
+ * of this software and associated documentation files (the "Software"),
to deal
+ * in the Software without restriction, including without limitation the
rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or
sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN
+ * THE SOFTWARE.
+ */
+#ifndef REP_HUB_DEFS_H
+#define REP_HUB_DEFS_H
+
+#include <stdint.h>
+
+#define REPHUB_MAX_VOL_NAME_LEN (1024)
+#define REPHUB_MAX_NUM_VOLUMES (512)
+
+#ifndef TRUE
+    #define TRUE (1)
+#endif
+
+#ifndef FALSE
+    #define FALSE (0)
+#endif
+
+#endif /* REP_HUB_DEFS_H */
diff --git a/blockdev.c b/blockdev.c
index d78aa51..3c3afbc 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -20,6 +20,7 @@
 #include "trace.h"
 #include "arch_init.h"

+
 static QTAILQ_HEAD(drivelist, DriveInfo) drives =
QTAILQ_HEAD_INITIALIZER(drives);

 static const char *const if_name[IF_COUNT] = {
diff --git a/configure b/configure
index 66a65d9..3d17ae5 100755
--- a/configure
+++ b/configure
@@ -189,6 +189,7 @@ spice=""
 rbd=""
 smartcard=""
 smartcard_nss=""
+repagent=""
 usb_redir=""
 opengl=""
 zlib="yes"
@@ -806,6 +807,10 @@ for opt do
   ;;
   --enable-smartcard-nss) smartcard_nss="yes"
   ;;
+  --disable-repagent) repagent="no"
+  ;;
+  --enable-repagent) repagent="yes"
+  ;;
   --disable-usb-redir) usb_redir="no"
   ;;
   --enable-usb-redir) usb_redir="yes"
@@ -1104,6 +1109,7 @@ echo "  --disable-usb-redir      disable usb network
redirection support"
 echo "  --enable-usb-redir       enable usb network redirection support"
 echo "  --disable-guest-agent    disable building of the QEMU Guest
Agent"
 echo "  --enable-guest-agent     enable building of the QEMU Guest Agent"
+echo "  --enable-repagent        enable replication support"
 echo ""
 echo "NOTE: The object files are built at the place where configure is
launched"
 exit 1
@@ -3214,6 +3220,10 @@ if test "$smartcard_nss" = "yes" ; then
   echo "libcacard_cflags=$libcacard_cflags" >> $config_host_mak
 fi

+if test "$repagent" = "yes" ; then
+  echo "CONFIG_REPAGENT=y" >> $config_host_mak
+fi
+
 if test "$usb_redir" = "yes" ; then
   echo "CONFIG_USB_REDIR=y" >> $config_host_mak
 fi
@@ -3893,3 +3903,5 @@ symlink $source_path/Makefile.user $d/Makefile
 if test "$docs" = "yes" ; then
   mkdir -p QMP
 fi
+
+echo "Repagent         $repagent"
diff --git a/qemu-options.hx b/qemu-options.hx
old mode 100644
new mode 100755
index daefce3..15fb938
--- a/qemu-options.hx
+++ b/qemu-options.hx
@@ -2711,3 +2711,9 @@ HXCOMM This is the last statement. Insert new
options before this line!
 STEXI
 @end table
 ETEXI
+
+DEF("repagent", HAS_ARG, QEMU_OPTION_repagent,
+    "-repagent [hub IP/name]\n"
+    "                Enable replication support for disks\n"
+    "                hub is the ip or name of the machine running the
replication hub.\n",
+    QEMU_ARCH_ALL)
diff --git a/vl.c b/vl.c
index 97ab2b9..0fe2cff 100644
--- a/vl.c
+++ b/vl.c
@@ -167,6 +167,7 @@ int main(int argc, char **argv)

 #include "ui/qemu-spice.h"

+#include "block/repagent/repagent.h"
 //#define DEBUG_NET
 //#define DEBUG_SLIRP

@@ -2411,6 +2412,17 @@ int main(int argc, char **argv, char **envp)
                 drive_add(IF_DEFAULT, popt->index - QEMU_OPTION_hda,
optarg,
                           HD_OPTS);
                 break;
+            case QEMU_OPTION_repagent:
+#ifdef CONFIG_REPAGENT
+                use_repagent = 1;
+                repagent_init(optarg, 0);
+                printf("Repagent enabled\n");
+#else
+                fprintf(stderr, "Repagent support is disabled. "
+                    "Don't use -repagent option.\n");
+                exit(1);
+#endif
+                break;
             case QEMU_OPTION_drive:
                 if (drive_def(optarg) == NULL) {
                     exit(1);
-- 
1.7.6.5



reply via email to

[Prev in Thread] Current Thread [Next in Thread]