[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [RFC PATCH 1/4] qemu-char: add the "1-server-N-client"
From: |
Marc-André Lureau |
Subject: |
Re: [Qemu-devel] [RFC PATCH 1/4] qemu-char: add the "1-server-N-client" support |
Date: |
Thu, 10 Nov 2016 11:38:02 +0000 |
Hi
On Thu, Nov 10, 2016 at 6:47 AM Wei Wang <address@hidden> wrote:
> This patch enables a qemu server socket to be connected by multiple
> client sockets.
>
> Thanks for sharing this early version of the series, I hope some early
feedback will help you. I'll be waiting for a more complete implementation
for detailed review.
Is this patch necessary as a first step? I would rather start with a
vhost-pci 1-1 Master-Slave series. Keep 1-n for a following improvement.
This would also probably post-pone the discussion regarding connection-id,
or uuid.
In short, I think it would help if you can break your proposal in smaller
independant steps.
Signed-off-by: Wei Wang <address@hidden>
> ---
> include/sysemu/char.h | 64 ++++++-
> qapi-schema.json | 3 +-
> qemu-char.c | 512
> ++++++++++++++++++++++++++++++++++++++------------
> 3 files changed, 456 insertions(+), 123 deletions(-)
>
> diff --git a/include/sysemu/char.h b/include/sysemu/char.h
> index ee7e554..ff5dda6 100644
> --- a/include/sysemu/char.h
> +++ b/include/sysemu/char.h
> @@ -58,17 +58,24 @@ struct ParallelIOArg {
>
> typedef void IOEventHandler(void *opaque, int event);
>
> +#define MAX_CLIENTS 256
> +#define ANONYMOUS_CLIENT (~((uint64_t)0))
> struct CharDriverState {
> QemuMutex chr_write_lock;
> void (*init)(struct CharDriverState *s);
> int (*chr_write)(struct CharDriverState *s, const uint8_t *buf, int
> len);
> + int (*chr_write_n)(struct CharDriverState *s, uint64_t id, const
> uint8_t *buf, int len);
> int (*chr_sync_read)(struct CharDriverState *s,
> const uint8_t *buf, int len);
> + int (*chr_sync_read_n)(struct CharDriverState *s, uint64_t id,
> + const uint8_t *buf, int len);
> GSource *(*chr_add_watch)(struct CharDriverState *s, GIOCondition
> cond);
> void (*chr_update_read_handler)(struct CharDriverState *s);
> int (*chr_ioctl)(struct CharDriverState *s, int cmd, void *arg);
> int (*get_msgfds)(struct CharDriverState *s, int* fds, int num);
> + int (*get_msgfds_n)(struct CharDriverState *s, uint64_t id, int* fds,
> int num);
> int (*set_msgfds)(struct CharDriverState *s, int *fds, int num);
> + int (*set_msgfds_n)(struct CharDriverState *s, uint64_t id, int *fds,
> int num);
> int (*chr_add_client)(struct CharDriverState *chr, int fd);
> int (*chr_wait_connected)(struct CharDriverState *chr, Error **errp);
> IOEventHandler *chr_event;
> @@ -77,6 +84,7 @@ struct CharDriverState {
> void *handler_opaque;
> void (*chr_close)(struct CharDriverState *chr);
> void (*chr_disconnect)(struct CharDriverState *chr);
> + void (*chr_disconnect_n)(struct CharDriverState *chr, uint64_t id);
> void (*chr_accept_input)(struct CharDriverState *chr);
> void (*chr_set_echo)(struct CharDriverState *chr, bool echo);
> void (*chr_set_fe_open)(struct CharDriverState *chr, int fe_open);
> @@ -91,7 +99,10 @@ struct CharDriverState {
> int explicit_be_open;
> int avail_connections;
> int is_mux;
> - guint fd_in_tag;
> + guint fd_in_tag[MAX_CLIENTS];
> + uint64_t max_connections;
> + unsigned long *conn_bitmap;
> + uint64_t conn_id;
> QemuOpts *opts;
> bool replay;
> QTAILQ_ENTRY(CharDriverState) next;
> @@ -281,6 +292,20 @@ int qemu_chr_fe_write(CharDriverState *s, const
> uint8_t *buf, int len);
> int qemu_chr_fe_write_all(CharDriverState *s, const uint8_t *buf, int
> len);
>
> /**
> + * @qemu_chr_fe_write_all_n:
> + *
> + * Write data to the selected character backend from the front end.
> + *
> + * @id the connection id of the character backend
> + * @buf the data
> + * @len the number of bytes to send
> + *
> + * Returns: the number of bytes consumed
> + */
> +int qemu_chr_fe_write_all_n(CharDriverState *s, uint64_t id,
> + const uint8_t *buf, int len);
> +
> +/**
> * @qemu_chr_fe_read_all:
> *
> * Read data to a buffer from the back end.
> @@ -293,6 +318,20 @@ int qemu_chr_fe_write_all(CharDriverState *s, const
> uint8_t *buf, int len);
> int qemu_chr_fe_read_all(CharDriverState *s, uint8_t *buf, int len);
>
> /**
> + * @qemu_chr_fe_read_all_n:
> + *
> + * Read data to a buffer from the selected back end.
> + *
> + * @id the connection id
> + * @buf the data buffer
> + * @len the number of bytes to read
> + *
> + * Returns: the number of bytes read
> + */
> +int qemu_chr_fe_read_all_n(CharDriverState *s, uint64_t id,
> + uint8_t *buf, int len);
> +
> +/**
> * @qemu_chr_fe_ioctl:
> *
> * Issue a device specific ioctl to a backend. This function is
> thread-safe.
> @@ -331,6 +370,19 @@ int qemu_chr_fe_get_msgfd(CharDriverState *s);
> */
> int qemu_chr_fe_get_msgfds(CharDriverState *s, int *fds, int num);
>
> +
> +/**
> + * @qemu_chr_fe_get_msgfds_n:
> + *
> + * The multi-client version of @qemu_chr_fe_get_msgfds.
> + *
> + * Returns: -1 if fd passing isn't supported or there are no pending file
> + * descriptors. If file descriptors are returned, subsequent
> calls to
> + * this function will return -1 until a client sends a new set
> of file
> + * descriptors.
> + */
> +int qemu_chr_fe_get_msgfds_n(CharDriverState *s, uint64_t id, int *fds,
> int num);
> +
> /**
> * @qemu_chr_fe_set_msgfds:
> *
> @@ -345,6 +397,16 @@ int qemu_chr_fe_get_msgfds(CharDriverState *s, int
> *fds, int num);
> int qemu_chr_fe_set_msgfds(CharDriverState *s, int *fds, int num);
>
> /**
> + * @qemu_chr_fe_set_msgfds_n:
> + *
> + * The multi-client version of @qemu_chr_fe_set_msgfds.
> + *
> + * Returns: -1 if fd passing isn't supported.
> + */
> +int qemu_chr_fe_set_msgfds_n(CharDriverState *s, uint64_t id, int *fds,
> int num);
> +
> +
> +/**
> * @qemu_chr_fe_claim:
> *
> * Claim a backend before using it, should be called before calling
> diff --git a/qapi-schema.json b/qapi-schema.json
> index 5658723..9bb5d7d 100644
> --- a/qapi-schema.json
> +++ b/qapi-schema.json
> @@ -3327,7 +3327,8 @@
> '*wait' : 'bool',
> '*nodelay' : 'bool',
> '*telnet' : 'bool',
> - '*reconnect' : 'int' },
> + '*reconnect' : 'int' ,
> + '*connections' : 'uint64' },
> 'base': 'ChardevCommon' }
>
> ##
> diff --git a/qemu-char.c b/qemu-char.c
> index 5f82ebb..dfad6d1 100644
> --- a/qemu-char.c
> +++ b/qemu-char.c
> @@ -265,6 +265,35 @@ static int qemu_chr_fe_write_buffer(CharDriverState
> *s, const uint8_t *buf, int
> return res;
> }
>
> +static int qemu_chr_fe_write_buffer_n(CharDriverState *s, uint64_t id,
> + const uint8_t *buf, int len, int
> *offset)
> +{
> + int res = 0;
> + *offset = 0;
> +
> + qemu_mutex_lock(&s->chr_write_lock);
> + while (*offset < len) {
> + retry:
> + res = s->chr_write_n(s, id, buf + *offset, len - *offset);
> + if (res < 0 && errno == EAGAIN) {
> + g_usleep(100);
> + goto retry;
> + }
> +
> + if (res <= 0) {
> + break;
> + }
> +
> + *offset += res;
> + }
> + if (*offset > 0) {
> + qemu_chr_fe_write_log(s, buf, *offset);
> + }
> + qemu_mutex_unlock(&s->chr_write_lock);
> +
> + return res;
> +}
> +
> int qemu_chr_fe_write(CharDriverState *s, const uint8_t *buf, int len)
> {
> int ret;
> @@ -317,6 +346,31 @@ int qemu_chr_fe_write_all(CharDriverState *s, const
> uint8_t *buf, int len)
> return offset;
> }
>
> +int qemu_chr_fe_write_all_n(CharDriverState *s, uint64_t id,
> + const uint8_t *buf, int len)
> +{
> + int offset;
> + int res;
> +
> + if (s->replay && replay_mode == REPLAY_MODE_PLAY) {
> + replay_char_write_event_load(&res, &offset);
> + assert(offset <= len);
> + qemu_chr_fe_write_buffer_n(s, id, buf, offset, &offset);
> + return res;
> + }
> +
> + res = qemu_chr_fe_write_buffer_n(s, id, buf, len, &offset);
> +
> + if (s->replay && replay_mode == REPLAY_MODE_RECORD) {
> + replay_char_write_event_save(res, offset);
> + }
> +
> + if (res < 0) {
> + return res;
> + }
> + return offset;
> +}
> +
> int qemu_chr_fe_read_all(CharDriverState *s, uint8_t *buf, int len)
> {
> int offset = 0, counter = 10;
> @@ -325,7 +379,7 @@ int qemu_chr_fe_read_all(CharDriverState *s, uint8_t
> *buf, int len)
> if (!s->chr_sync_read) {
> return 0;
> }
> -
> +
> if (s->replay && replay_mode == REPLAY_MODE_PLAY) {
> return replay_char_read_all_load(buf);
> }
> @@ -362,6 +416,52 @@ int qemu_chr_fe_read_all(CharDriverState *s, uint8_t
> *buf, int len)
> return offset;
> }
>
> +int qemu_chr_fe_read_all_n(CharDriverState *s, uint64_t id,
> + uint8_t *buf, int len)
> +{
> + int offset = 0, counter = 10;
> + int res;
> +
> + if (!s->chr_sync_read_n) {
> + return 0;
> + }
> +
> + if (s->replay && replay_mode == REPLAY_MODE_PLAY) {
> + return replay_char_read_all_load(buf);
> + }
> +
> + while (offset < len) {
> + retry:
> + res = s->chr_sync_read_n(s, id, buf + offset, len - offset);
> + if (res == -1 && errno == EAGAIN) {
> + g_usleep(100);
> + goto retry;
> + }
> +
> + if (res == 0) {
> + break;
> + }
> +
> + if (res < 0) {
> + if (s->replay && replay_mode == REPLAY_MODE_RECORD) {
> + replay_char_read_all_save_error(res);
> + }
> + return res;
> + }
> +
> + offset += res;
> +
> + if (!counter--) {
> + break;
> + }
> + }
> +
> + if (s->replay && replay_mode == REPLAY_MODE_RECORD) {
> + replay_char_read_all_save_buf(buf, offset);
> + }
> + return offset;
> +}
> +
> int qemu_chr_fe_ioctl(CharDriverState *s, int cmd, void *arg)
> {
> int res;
> @@ -417,11 +517,23 @@ int qemu_chr_fe_get_msgfds(CharDriverState *s, int
> *fds, int len)
> return s->get_msgfds ? s->get_msgfds(s, fds, len) : -1;
> }
>
> +int qemu_chr_fe_get_msgfds_n(CharDriverState *s,
> + uint64_t id, int *fds, int len)
> +{
> + return s->get_msgfds_n ? s->get_msgfds_n(s, id, fds, len) : -1;
> +}
> +
> int qemu_chr_fe_set_msgfds(CharDriverState *s, int *fds, int num)
> {
> return s->set_msgfds ? s->set_msgfds(s, fds, num) : -1;
> }
>
> +int qemu_chr_fe_set_msgfds_n(CharDriverState *s,
> + uint64_t id, int *fds, int num)
> +{
> + return s->set_msgfds_n ? s->set_msgfds_n(s, id, fds, num) : -1;
> +}
> +
> int qemu_chr_add_client(CharDriverState *s, int fd)
> {
> return s->chr_add_client ? s->chr_add_client(s, fd) : -1;
> @@ -951,12 +1063,19 @@ static void io_remove_watch_poll(guint tag)
>
> static void remove_fd_in_watch(CharDriverState *chr)
> {
> - if (chr->fd_in_tag) {
> - io_remove_watch_poll(chr->fd_in_tag);
> - chr->fd_in_tag = 0;
> + if (chr->fd_in_tag[0]) {
> + io_remove_watch_poll(chr->fd_in_tag[0]);
> + chr->fd_in_tag[0] = 0;
> }
> }
>
> +static void remove_fd_in_watch_n(CharDriverState *chr, uint64_t id)
> +{
> + if (chr->fd_in_tag[id]) {
> + io_remove_watch_poll(chr->fd_in_tag[id]);
> + chr->fd_in_tag[id] = 0;
> + }
> +}
>
> static int io_channel_send_full(QIOChannel *ioc,
> const void *buf, size_t len,
> @@ -1063,7 +1182,7 @@ static void
> fd_chr_update_read_handler(CharDriverState *chr)
>
> remove_fd_in_watch(chr);
> if (s->ioc_in) {
> - chr->fd_in_tag = io_add_watch_poll(s->ioc_in,
> + chr->fd_in_tag[0] = io_add_watch_poll(s->ioc_in,
> fd_chr_read_poll,
> fd_chr_read, chr);
> }
> @@ -1410,8 +1529,8 @@ static void pty_chr_state(CharDriverState *chr, int
> connected)
> s->connected = 1;
> s->open_tag = g_idle_add(qemu_chr_be_generic_open_func, chr);
> }
> - if (!chr->fd_in_tag) {
> - chr->fd_in_tag = io_add_watch_poll(s->ioc,
> + if (!chr->fd_in_tag[0]) {
> + chr->fd_in_tag[0] = io_add_watch_poll(s->ioc,
> pty_chr_read_poll,
> pty_chr_read, chr);
> }
> @@ -2558,7 +2677,7 @@ static void
> udp_chr_update_read_handler(CharDriverState *chr)
>
> remove_fd_in_watch(chr);
> if (s->ioc) {
> - chr->fd_in_tag = io_add_watch_poll(s->ioc,
> + chr->fd_in_tag[0] = io_add_watch_poll(s->ioc,
> udp_chr_read_poll,
> udp_chr_read, chr);
> }
> @@ -2605,20 +2724,21 @@ static CharDriverState
> *qemu_chr_open_udp(QIOChannelSocket *sioc,
> /* TCP Net console */
>
> typedef struct {
> - QIOChannel *ioc; /* Client I/O channel */
> - QIOChannelSocket *sioc; /* Client master channel */
> + QIOChannel *ioc[MAX_CLIENTS]; /* Client I/O channels */
> + QIOChannelSocket *sioc[MAX_CLIENTS]; /* Client master channels */
> QIOChannelSocket *listen_ioc;
> guint listen_tag;
> QCryptoTLSCreds *tls_creds;
> - int connected;
> + int connected[MAX_CLIENTS];
> int max_size;
> int do_telnetopt;
> int do_nodelay;
> int is_unix;
> - int *read_msgfds;
> - size_t read_msgfds_num;
> - int *write_msgfds;
> - size_t write_msgfds_num;
> + int *read_msgfds[MAX_CLIENTS];
> + size_t read_msgfds_num[MAX_CLIENTS];
> + int *write_msgfds[MAX_CLIENTS];
> + size_t write_msgfds_num[MAX_CLIENTS];
> + uint64_t connections;
>
> SocketAddress *addr;
> bool is_listen;
> @@ -2634,7 +2754,7 @@ static gboolean socket_reconnect_timeout(gpointer
> opaque);
> static void qemu_chr_socket_restart_timer(CharDriverState *chr)
> {
> TCPCharDriver *s = chr->opaque;
> - assert(s->connected == 0);
> + assert(s->connected[0] == 0);
> s->reconnect_timer = g_timeout_add_seconds(s->reconnect_time,
> socket_reconnect_timeout,
> chr);
> }
> @@ -2660,16 +2780,16 @@ static gboolean tcp_chr_accept(QIOChannel *chan,
> static int tcp_chr_write(CharDriverState *chr, const uint8_t *buf, int
> len)
> {
> TCPCharDriver *s = chr->opaque;
> - if (s->connected) {
> - int ret = io_channel_send_full(s->ioc, buf, len,
> - s->write_msgfds,
> - s->write_msgfds_num);
> + if (s->connected[0]) {
> + int ret = io_channel_send_full(s->ioc[0], buf, len,
> + s->write_msgfds[0],
> + s->write_msgfds_num[0]);
>
> /* free the written msgfds, no matter what */
> - if (s->write_msgfds_num) {
> - g_free(s->write_msgfds);
> - s->write_msgfds = 0;
> - s->write_msgfds_num = 0;
> + if (s->write_msgfds_num[0]) {
> + g_free(s->write_msgfds[0]);
> + s->write_msgfds[0] = 0;
> + s->write_msgfds_num[0] = 0;
> }
>
> return ret;
> @@ -2679,11 +2799,41 @@ static int tcp_chr_write(CharDriverState *chr,
> const uint8_t *buf, int len)
> }
> }
>
> +/* Called with chr_write_lock held. */
> +static int tcp_chr_write_n(CharDriverState *chr, uint64_t id,
> + const uint8_t *buf, int len)
> +{
> + TCPCharDriver *s = chr->opaque;
> + if (s->connected[id]) {
> + int ret = io_channel_send_full(s->ioc[id], buf, len,
> + s->write_msgfds[id],
> + s->write_msgfds_num[id]);
> +
> + /* free the written msgfds, no matter what */
> + if (s->write_msgfds_num[id]) {
> + g_free(s->write_msgfds[id]);
> + s->write_msgfds[id] = 0;
> + s->write_msgfds_num[id] = 0;
> + }
> +
> + return ret;
> + } else {
> + /* XXX: indicate an error ? */
> + return len;
> + }
> +}
> +
> static int tcp_chr_read_poll(void *opaque)
> {
> CharDriverState *chr = opaque;
> TCPCharDriver *s = chr->opaque;
> - if (!s->connected)
> + uint64_t id;
> +
> + for (id = 0; id < s->connections; id++) {
> + if (s->connected[id])
> + break;
> + }
> + if (id == s->connections)
> return 0;
> s->max_size = qemu_chr_be_can_write(chr);
> return s->max_size;
> @@ -2742,54 +2892,107 @@ static void
> tcp_chr_process_IAC_bytes(CharDriverState *chr,
> static int tcp_get_msgfds(CharDriverState *chr, int *fds, int num)
> {
> TCPCharDriver *s = chr->opaque;
> - int to_copy = (s->read_msgfds_num < num) ? s->read_msgfds_num : num;
> + int to_copy = (s->read_msgfds_num[0] < num) ? s->read_msgfds_num[0] :
> num;
>
> assert(num <= TCP_MAX_FDS);
>
> if (to_copy) {
> int i;
>
> - memcpy(fds, s->read_msgfds, to_copy * sizeof(int));
> + memcpy(fds, s->read_msgfds[0], to_copy * sizeof(int));
>
> /* Close unused fds */
> - for (i = to_copy; i < s->read_msgfds_num; i++) {
> - close(s->read_msgfds[i]);
> + for (i = to_copy; i < s->read_msgfds_num[0]; i++) {
> + close(s->read_msgfds[0][i]);
> }
>
> - g_free(s->read_msgfds);
> - s->read_msgfds = 0;
> - s->read_msgfds_num = 0;
> + g_free(s->read_msgfds[0]);
> + s->read_msgfds[0] = 0;
> + s->read_msgfds_num[0] = 0;
> }
>
> return to_copy;
> }
>
> +static int tcp_get_msgfds_n(CharDriverState *chr, uint64_t id,
> + int *fds, int num)
> +{
> + TCPCharDriver *s = chr->opaque;
> + int to_copy = (s->read_msgfds_num[id] < num) ? s->read_msgfds_num[id]
> : num;
> +
> + assert(num <= TCP_MAX_FDS);
> +
> + if (to_copy) {
> + int i;
> +
> + memcpy(fds, s->read_msgfds[id], to_copy * sizeof(int));
> +
> + /* Close unused fds */
> + for (i = to_copy; i < s->read_msgfds_num[id]; i++) {
> + close(s->read_msgfds[id][i]);
> + }
> +
> + g_free(s->read_msgfds[id]);
> + s->read_msgfds[id] = 0;
> + s->read_msgfds_num[id] = 0;
> + }
> +
> + return to_copy;
> +}
> +
> static int tcp_set_msgfds(CharDriverState *chr, int *fds, int num)
> {
> TCPCharDriver *s = chr->opaque;
>
> /* clear old pending fd array */
> - g_free(s->write_msgfds);
> - s->write_msgfds = NULL;
> - s->write_msgfds_num = 0;
> + g_free(s->write_msgfds[0]);
> + s->write_msgfds[0] = NULL;
> + s->write_msgfds_num[0] = 0;
>
> - if (!s->connected ||
> - !qio_channel_has_feature(s->ioc,
> + if (!s->connected[0] ||
> + !qio_channel_has_feature(s->ioc[0],
> QIO_CHANNEL_FEATURE_FD_PASS)) {
> return -1;
> }
>
> if (num) {
> - s->write_msgfds = g_new(int, num);
> - memcpy(s->write_msgfds, fds, num * sizeof(int));
> + s->write_msgfds[0] = g_new(int, num);
> + memcpy(s->write_msgfds[0], fds, num * sizeof(int));
> }
>
> - s->write_msgfds_num = num;
> + s->write_msgfds_num[0] = num;
>
> return 0;
> }
>
> -static ssize_t tcp_chr_recv(CharDriverState *chr, char *buf, size_t len)
> +static int tcp_set_msgfds_n(CharDriverState *chr, uint64_t id,
> + int *fds, int num)
> +{
> + TCPCharDriver *s = chr->opaque;
> +
> + /* clear old pending fd array */
> + g_free(s->write_msgfds[id]);
> + s->write_msgfds[id] = NULL;
> + s->write_msgfds_num[id] = 0;
> +
> + if (!s->connected[id] ||
> + !qio_channel_has_feature(s->ioc[id],
> + QIO_CHANNEL_FEATURE_FD_PASS)) {
> + return -1;
> + }
> +
> + if (num) {
> + s->write_msgfds[id] = g_new(int, num);
> + memcpy(s->write_msgfds[id], fds, num * sizeof(int));
> + }
> +
> + s->write_msgfds_num[id] = num;
> +
> + return 0;
> +}
> +
> +static ssize_t tcp_chr_recv(CharDriverState *chr, uint64_t id,
> + char *buf, size_t len)
> {
> TCPCharDriver *s = chr->opaque;
> struct iovec iov = { .iov_base = buf, .iov_len = len };
> @@ -2798,12 +3001,12 @@ static ssize_t tcp_chr_recv(CharDriverState *chr,
> char *buf, size_t len)
> int *msgfds = NULL;
> size_t msgfds_num = 0;
>
> - if (qio_channel_has_feature(s->ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
> - ret = qio_channel_readv_full(s->ioc, &iov, 1,
> + if (qio_channel_has_feature(s->ioc[id], QIO_CHANNEL_FEATURE_FD_PASS))
> {
> + ret = qio_channel_readv_full(s->ioc[id], &iov, 1,
> &msgfds, &msgfds_num,
> NULL);
> } else {
> - ret = qio_channel_readv_full(s->ioc, &iov, 1,
> + ret = qio_channel_readv_full(s->ioc[id], &iov, 1,
> NULL, NULL,
> NULL);
> }
> @@ -2817,20 +3020,20 @@ static ssize_t tcp_chr_recv(CharDriverState *chr,
> char *buf, size_t len)
>
> if (msgfds_num) {
> /* close and clean read_msgfds */
> - for (i = 0; i < s->read_msgfds_num; i++) {
> - close(s->read_msgfds[i]);
> + for (i = 0; i < s->read_msgfds_num[id]; i++) {
> + close(s->read_msgfds[id][i]);
> }
>
> - if (s->read_msgfds_num) {
> - g_free(s->read_msgfds);
> + if (s->read_msgfds_num[id]) {
> + g_free(s->read_msgfds[id]);
> }
>
> - s->read_msgfds = msgfds;
> - s->read_msgfds_num = msgfds_num;
> + s->read_msgfds[id] = msgfds;
> + s->read_msgfds_num[id] = msgfds_num;
> }
>
> - for (i = 0; i < s->read_msgfds_num; i++) {
> - int fd = s->read_msgfds[i];
> + for (i = 0; i < s->read_msgfds_num[id]; i++) {
> + int fd = s->read_msgfds[id][i];
> if (fd < 0) {
> continue;
> }
> @@ -2849,47 +3052,47 @@ static ssize_t tcp_chr_recv(CharDriverState *chr,
> char *buf, size_t len)
> static GSource *tcp_chr_add_watch(CharDriverState *chr, GIOCondition cond)
> {
> TCPCharDriver *s = chr->opaque;
> - return qio_channel_create_watch(s->ioc, cond);
> + return qio_channel_create_watch(s->ioc[0], cond);
> }
>
> -static void tcp_chr_free_connection(CharDriverState *chr)
> +static void tcp_chr_free_connection(CharDriverState *chr, uint64_t id)
> {
> TCPCharDriver *s = chr->opaque;
> int i;
>
> - if (!s->connected) {
> + if (!s->connected[id]) {
> return;
> }
>
> - if (s->read_msgfds_num) {
> - for (i = 0; i < s->read_msgfds_num; i++) {
> - close(s->read_msgfds[i]);
> + if (s->read_msgfds_num[id]) {
> + for (i = 0; i < s->read_msgfds_num[id]; i++) {
> + close(s->read_msgfds[id][i]);
> }
> - g_free(s->read_msgfds);
> - s->read_msgfds = NULL;
> - s->read_msgfds_num = 0;
> + g_free(s->read_msgfds[id]);
> + s->read_msgfds[id] = NULL;
> + s->read_msgfds_num[id] = 0;
> }
>
> - tcp_set_msgfds(chr, NULL, 0);
> - remove_fd_in_watch(chr);
> - object_unref(OBJECT(s->sioc));
> - s->sioc = NULL;
> - object_unref(OBJECT(s->ioc));
> - s->ioc = NULL;
> + tcp_set_msgfds_n(chr, id, NULL, 0);
> + remove_fd_in_watch_n(chr, id);
> + object_unref(OBJECT(s->sioc[id]));
> + s->sioc[id] = NULL;
> + object_unref(OBJECT(s->ioc[id]));
> + s->ioc[id] = NULL;
> g_free(chr->filename);
> chr->filename = NULL;
> - s->connected = 0;
> + s->connected[id] = 0;
> }
>
> -static void tcp_chr_disconnect(CharDriverState *chr)
> +static void tcp_chr_disconnect_n(CharDriverState *chr, uint64_t id)
> {
> TCPCharDriver *s = chr->opaque;
>
> - if (!s->connected) {
> + if (!s->connected[id]) {
> return;
> }
>
> - tcp_chr_free_connection(chr);
> + tcp_chr_free_connection(chr, id);
>
> if (s->listen_ioc) {
> s->listen_tag = qio_channel_add_watch(
> @@ -2903,23 +3106,34 @@ static void tcp_chr_disconnect(CharDriverState
> *chr)
> }
> }
>
> +static void tcp_chr_disconnect(CharDriverState *chr)
> +{
> + tcp_chr_disconnect_n(chr, 0);
> +}
> +
> static gboolean tcp_chr_read(QIOChannel *chan, GIOCondition cond, void
> *opaque)
> {
> CharDriverState *chr = opaque;
> TCPCharDriver *s = chr->opaque;
> uint8_t buf[READ_BUF_LEN];
> int len, size;
> + uint64_t id;
>
> - if (!s->connected || s->max_size <= 0) {
> + for (id = 0; id < s->connections; id++) {
> + if (s->ioc[id] == chan)
> + break;
> + }
> +
> + if ((id == s->connections) || !s->connected[id] || s->max_size <= 0) {
> return TRUE;
> }
> len = sizeof(buf);
> if (len > s->max_size)
> len = s->max_size;
> - size = tcp_chr_recv(chr, (void *)buf, len);
> + size = tcp_chr_recv(chr, id, (void *)buf, len);
> if (size == 0 || size == -1) {
> /* connection closed */
> - tcp_chr_disconnect(chr);
> + tcp_chr_disconnect_n(chr, id);
> } else if (size > 0) {
> if (s->do_telnetopt)
> tcp_chr_process_IAC_bytes(chr, s, buf, &size);
> @@ -2935,33 +3149,52 @@ static int tcp_chr_sync_read(CharDriverState *chr,
> const uint8_t *buf, int len)
> TCPCharDriver *s = chr->opaque;
> int size;
>
> - if (!s->connected) {
> + if (!s->connected[0]) {
> + return 0;
> + }
> +
> + size = tcp_chr_recv(chr, 0, (void *) buf, len);
> + if (size == 0) {
> + /* connection closed */
> + tcp_chr_disconnect_n(chr, 0);
> + }
> +
> + return size;
> +}
> +
> +static int tcp_chr_sync_read_n(CharDriverState *chr, uint64_t id,
> + const uint8_t *buf, int len)
> +{
> + TCPCharDriver *s = chr->opaque;
> + int size;
> +
> + if (!s->connected[id]) {
> return 0;
> }
>
> - size = tcp_chr_recv(chr, (void *) buf, len);
> + size = tcp_chr_recv(chr, id, (void *) buf, len);
> if (size == 0) {
> /* connection closed */
> - tcp_chr_disconnect(chr);
> + tcp_chr_disconnect_n(chr, id);
> }
>
> return size;
> }
>
> -static void tcp_chr_connect(void *opaque)
> +static void tcp_chr_connect(void *opaque, uint64_t id)
> {
> CharDriverState *chr = opaque;
> TCPCharDriver *s = chr->opaque;
>
> g_free(chr->filename);
> chr->filename = sockaddr_to_str(
> - &s->sioc->localAddr, s->sioc->localAddrLen,
> - &s->sioc->remoteAddr, s->sioc->remoteAddrLen,
> + &s->sioc[id]->localAddr, s->sioc[id]->localAddrLen,
> + &s->sioc[id]->remoteAddr, s->sioc[id]->remoteAddrLen,
> s->is_listen, s->is_telnet);
>
> - s->connected = 1;
> - if (s->ioc) {
> - chr->fd_in_tag = io_add_watch_poll(s->ioc,
> + s->connected[id] = 1;
> + if (s->ioc[id]) {
> + chr->fd_in_tag[id] = io_add_watch_poll(s->ioc[id],
> tcp_chr_read_poll,
> tcp_chr_read, chr);
> }
> @@ -2971,16 +3204,18 @@ static void tcp_chr_connect(void *opaque)
> static void tcp_chr_update_read_handler(CharDriverState *chr)
> {
> TCPCharDriver *s = chr->opaque;
> + uint64_t id;
>
> - if (!s->connected) {
> - return;
> - }
> + for (id = 0; id < s->connections; id++) {
> + if (!s->connected[id])
> + continue;
>
> - remove_fd_in_watch(chr);
> - if (s->ioc) {
> - chr->fd_in_tag = io_add_watch_poll(s->ioc,
> - tcp_chr_read_poll,
> - tcp_chr_read, chr);
> + remove_fd_in_watch_n(chr, id);
> + if (s->ioc[id]) {
> + chr->fd_in_tag[id] = io_add_watch_poll(s->ioc[id],
> + tcp_chr_read_poll,
> + tcp_chr_read, chr);
> + }
> }
> }
>
> @@ -3002,14 +3237,14 @@ static gboolean tcp_chr_telnet_init_io(QIOChannel
> *ioc,
> if (ret == QIO_CHANNEL_ERR_BLOCK) {
> ret = 0;
> } else {
> - tcp_chr_disconnect(init->chr);
> + tcp_chr_disconnect_n(init->chr, 0);
> return FALSE;
> }
> }
> init->buflen -= ret;
>
> if (init->buflen == 0) {
> - tcp_chr_connect(init->chr);
> + tcp_chr_connect(init->chr, 0);
> return FALSE;
> }
>
> @@ -3018,7 +3253,7 @@ static gboolean tcp_chr_telnet_init_io(QIOChannel
> *ioc,
> return TRUE;
> }
>
> -static void tcp_chr_telnet_init(CharDriverState *chr)
> +static void tcp_chr_telnet_init(CharDriverState *chr, uint64_t id)
> {
> TCPCharDriver *s = chr->opaque;
> TCPCharDriverTelnetInit *init =
> @@ -3045,7 +3280,7 @@ static void tcp_chr_telnet_init(CharDriverState *chr)
> #undef IACSET
>
> qio_channel_add_watch(
> - s->ioc, G_IO_OUT,
> + s->ioc[id], G_IO_OUT,
> tcp_chr_telnet_init_io,
> init, NULL);
> }
> @@ -3059,18 +3294,18 @@ static void tcp_chr_tls_handshake(Object *source,
> TCPCharDriver *s = chr->opaque;
>
> if (err) {
> - tcp_chr_disconnect(chr);
> + tcp_chr_disconnect_n(chr, 0);
> } else {
> if (s->do_telnetopt) {
> - tcp_chr_telnet_init(chr);
> + tcp_chr_telnet_init(chr, 0);
> } else {
> - tcp_chr_connect(chr);
> + tcp_chr_connect(chr, 0);
> }
> }
> }
>
>
> -static void tcp_chr_tls_init(CharDriverState *chr)
> +static void tcp_chr_tls_init(CharDriverState *chr, uint64_t id)
> {
> TCPCharDriver *s = chr->opaque;
> QIOChannelTLS *tioc;
> @@ -3078,21 +3313,21 @@ static void tcp_chr_tls_init(CharDriverState *chr)
>
> if (s->is_listen) {
> tioc = qio_channel_tls_new_server(
> - s->ioc, s->tls_creds,
> + s->ioc[id], s->tls_creds,
> NULL, /* XXX Use an ACL */
> &err);
> } else {
> tioc = qio_channel_tls_new_client(
> - s->ioc, s->tls_creds,
> + s->ioc[id], s->tls_creds,
> s->addr->u.inet.data->host,
> &err);
> }
> if (tioc == NULL) {
> error_free(err);
> - tcp_chr_disconnect(chr);
> + tcp_chr_disconnect_n(chr, id);
> }
> - object_unref(OBJECT(s->ioc));
> - s->ioc = QIO_CHANNEL(tioc);
> + object_unref(OBJECT(s->ioc[id]));
> + s->ioc[id] = QIO_CHANNEL(tioc);
>
> qio_channel_tls_handshake(tioc,
> tcp_chr_tls_handshake,
> @@ -3100,36 +3335,52 @@ static void tcp_chr_tls_init(CharDriverState *chr)
> NULL);
> }
>
> +static int find_avail_ioc(TCPCharDriver *s, uint64_t *id)
> +{
> + uint64_t i;
> +
> + for(i = 0; i < MAX_CLIENTS; i++) {
> + if (s->ioc[i] == NULL) {
> + *id = i;
> + return 0;
> + }
> + }
> + return -1;
> +}
>
> static int tcp_chr_new_client(CharDriverState *chr, QIOChannelSocket
> *sioc)
> {
> TCPCharDriver *s = chr->opaque;
> - if (s->ioc != NULL) {
> - return -1;
> - }
> + uint64_t id;
>
> - s->ioc = QIO_CHANNEL(sioc);
> + if(find_avail_ioc(s, &id) < 0)
> + return -1;
> +
> + s->ioc[id] = QIO_CHANNEL(sioc);
> object_ref(OBJECT(sioc));
> - s->sioc = sioc;
> + s->sioc[id] = sioc;
> object_ref(OBJECT(sioc));
> + if(chr->conn_bitmap != NULL)
> + set_bit(id, chr->conn_bitmap);
>
> - qio_channel_set_blocking(s->ioc, false, NULL);
> + qio_channel_set_blocking(s->ioc[id], false, NULL);
>
> if (s->do_nodelay) {
> - qio_channel_set_delay(s->ioc, false);
> + qio_channel_set_delay(s->ioc[id], false);
> }
> +/*
> if (s->listen_tag) {
> g_source_remove(s->listen_tag);
> s->listen_tag = 0;
> }
> -
> +*/
> if (s->tls_creds) {
> - tcp_chr_tls_init(chr);
> + tcp_chr_tls_init(chr, id);
> } else {
> if (s->do_telnetopt) {
> - tcp_chr_telnet_init(chr);
> + tcp_chr_telnet_init(chr, id);
> } else {
> - tcp_chr_connect(chr);
> + tcp_chr_connect(chr, id);
> }
> }
>
> @@ -3178,7 +3429,7 @@ static int tcp_chr_wait_connected(CharDriverState
> *chr, Error **errp)
>
> /* It can't wait on s->connected, since it is set asynchronously
> * in TLS and telnet cases, only wait for an accepted socket */
> - while (!s->ioc) {
> + while (!s->ioc[0]) {
> if (s->is_listen) {
> fprintf(stderr, "QEMU waiting for connection on: %s\n",
> chr->filename);
> @@ -3211,9 +3462,11 @@ int qemu_chr_wait_connected(CharDriverState *chr,
> Error **errp)
> static void tcp_chr_close(CharDriverState *chr)
> {
> TCPCharDriver *s = chr->opaque;
> + uint64_t id;
>
> - tcp_chr_free_connection(chr);
> -
> + for (id = 0; id < s->connections; id++) {
> + tcp_chr_free_connection(chr, id);
> + }
> if (s->reconnect_timer) {
> g_source_remove(s->reconnect_timer);
> s->reconnect_timer = 0;
> @@ -3721,6 +3974,7 @@ static void qemu_chr_parse_socket(QemuOpts *opts,
> ChardevBackend *backend,
> bool is_telnet = qemu_opt_get_bool(opts, "telnet", false);
> bool do_nodelay = !qemu_opt_get_bool(opts, "delay", true);
> int64_t reconnect = qemu_opt_get_number(opts, "reconnect", 0);
> + uint64_t connections = qemu_opt_get_number(opts, "connections", 1);
> const char *path = qemu_opt_get(opts, "path");
> const char *host = qemu_opt_get(opts, "host");
> const char *port = qemu_opt_get(opts, "port");
> @@ -3758,6 +4012,8 @@ static void qemu_chr_parse_socket(QemuOpts *opts,
> ChardevBackend *backend,
> sock->has_reconnect = true;
> sock->reconnect = reconnect;
> sock->tls_creds = g_strdup(tls_creds);
> + sock->has_connections = true;
> + sock->connections = connections;
>
> addr = g_new0(SocketAddress, 1);
> if (path) {
> @@ -4241,6 +4497,9 @@ QemuOptsList qemu_chardev_opts = {
> },{
> .name = "logappend",
> .type = QEMU_OPT_BOOL,
> + },{
> + .name = "connections",
> + .type = QEMU_OPT_NUMBER,
> },
> { /* end of list */ }
> },
> @@ -4413,6 +4672,7 @@ static CharDriverState
> *qmp_chardev_open_socket(const char *id,
> bool is_telnet = sock->has_telnet ? sock->telnet : false;
> bool is_waitconnect = sock->has_wait ? sock->wait : false;
> int64_t reconnect = sock->has_reconnect ? sock->reconnect : 0;
> + uint64_t connections = sock->has_connections ? sock->connections : 1;
> ChardevCommon *common = qapi_ChardevSocket_base(sock);
> QIOChannelSocket *sioc = NULL;
>
> @@ -4426,6 +4686,7 @@ static CharDriverState
> *qmp_chardev_open_socket(const char *id,
> s->is_listen = is_listen;
> s->is_telnet = is_telnet;
> s->do_nodelay = do_nodelay;
> + s->connections = connections;
> if (sock->tls_creds) {
> Object *creds;
> creds = object_resolve_path_component(
> @@ -4461,6 +4722,15 @@ static CharDriverState
> *qmp_chardev_open_socket(const char *id,
>
> s->addr = QAPI_CLONE(SocketAddress, sock->addr);
>
> + if (sock->connections > 1) {
> + chr->conn_bitmap = bitmap_new(sock->connections);
> + chr->max_connections = sock->connections;
> + chr->chr_write_n = tcp_chr_write_n;
> + chr->chr_sync_read_n = tcp_chr_sync_read_n;
> + chr->get_msgfds_n = tcp_get_msgfds_n;
> + chr->set_msgfds_n = tcp_set_msgfds_n;
> + chr->chr_disconnect_n = tcp_chr_disconnect_n;
> + }
> chr->opaque = s;
> chr->chr_wait_connected = tcp_chr_wait_connected;
> chr->chr_write = tcp_chr_write;
> @@ -4478,10 +4748,12 @@ static CharDriverState
> *qmp_chardev_open_socket(const char *id,
> chr->filename = SocketAddress_to_str("disconnected:",
> addr, is_listen, is_telnet);
>
> + chr->conn_id = ANONYMOUS_CLIENT;
> if (is_listen) {
> if (is_telnet) {
> s->do_telnetopt = 1;
> }
> + chr->conn_id = 0;
> } else if (reconnect > 0) {
> s->reconnect_time = reconnect;
> }
> @@ -4502,11 +4774,9 @@ static CharDriverState
> *qmp_chardev_open_socket(const char *id,
> qemu_chr_wait_connected(chr, errp) < 0) {
> goto error;
> }
> - if (!s->ioc) {
> - s->listen_tag = qio_channel_add_watch(
> + s->listen_tag = qio_channel_add_watch(
> QIO_CHANNEL(s->listen_ioc), G_IO_IN,
> tcp_chr_accept, chr, NULL);
> - }
> } else if (qemu_chr_wait_connected(chr, errp) < 0) {
> goto error;
> }
> --
> 2.7.4
>
> --
Marc-André Lureau