[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [RFC PATCH v2] tests/vhost-user-bridge: add vhost-user
From: |
Michael S. Tsirkin |
Subject: |
Re: [Qemu-devel] [RFC PATCH v2] tests/vhost-user-bridge: add vhost-user bridge application |
Date: |
Tue, 27 Oct 2015 13:40:03 +0200 |
On Mon, Oct 26, 2015 at 07:19:23PM +0200, Victor Kaplansky wrote:
> The test existing in QEMU for vhost-user feature is good for
> testing the management protocol, but does not allow actual
> traffic. This patch proposes Vhost-User Bridge application, which
> can serve the QEMU community as a comprehensive test by running
> real internet traffic by means of vhost-user interface.
>
> Essentially the Vhost-User Bridge is a very basic vhost-user
> backend for QEMU. It runs as a standalone user-level process.
> For packet processing Vhost-User Bridge uses an additional QEMU
> instance with a backend configured by "-net socket" as a shared
> VLAN. This way another QEMU virtual machine can effectively
> serve as a shared bus by means of UDP communication.
>
> For a more simple setup, the another QEMU instance running the
> SLiRP backend can be the same QEMU instance running vhost-user
> client.
>
> This Vhost-User Bridge implementation is very preliminary. It is
> missing many features. I has been studying vhost-user protocol
> internals, so I've written vhost-user-bridge bit by bit as I
> progressed through the protocol. Most probably its internal
> architecture will change significantly.
>
> To run Vhost-User Bridge application:
>
> 1. Build vhost-user-bridge with a regular procedure. This will
> create a vhost-user-bridge executable under tests directory:
>
> $ configure; make tests/vhost-user-bridge
>
> 2. Ensure the machine has hugepages enabled in kernel with
> command line like:
>
> default_hugepagesz=2M hugepagesz=2M hugepages=2048
>
> 3. Run Vhost-User Bridge with:
>
> $ tests/vhost-user-bridge
>
> The above will run vhost-user server listening for connections
> on UNIX domain socket /tmp/vubr.sock, and will try to connect
> by UDP to VLAN bridge to localhost:5555, while listening on
> localhost:4444
>
> Run qemu with a virtio-net backed by vhost-user:
>
> $ qemu \
> -enable-kvm -m 512 -smp 2 \
> -object
> memory-backend-file,id=mem,size=512M,mem-path=/dev/hugepages,share=on \
> -numa node,memdev=mem -mem-prealloc \
> -chardev socket,id=char0,path=/tmp/vubr.sock \
> -netdev type=vhost-user,id=mynet1,chardev=char0,vhostforce \
> -device virtio-net-pci,netdev=mynet1 \
> -net none \
> -net socket,vlan=0,udp=localhost:4444,localaddr=localhost:5555 \
> -net user,vlan=0 \
> disk.img
>
> vhost-user-bridge was tested very lightly: it's able to bring up a
> Linux on client VM with the virtio-net driver, and execute transmits
> and receives to the internet. I tested with "wget redhat.com",
> "dig redhat.com".
>
> PS. I've consulted DPDK's code for vhost-user during Vhost-User
> Bridge implementation.
>
> Signed-off-by: Victor Kaplansky <address@hidden>
> ---
> v2:
> Cosmetic changes:
> - Tabs expanded, trailing spaces removed.
> - Removed use of architecture specific definitions starting with _
> - Used header files available in qemu/includes.
> - Rearranged source into single file.
> - checkpatch.pl pacified.
> - Added copyright note.
> - Small spelling corrections.
> - Removed _ prefixes in function names.
> - Makefile incorporated into tests/Makefile.
> - Error handling code changed to use die().
> - Prefix "vubr" replaced by "vhost_user".
> - Structures, enums and function type names renamed to
> comply with CODING_STYLE doc.
> - Preprocessor tricks thrown away.
> - Lines are no longer than 80.
>
> Functional changes:
> - Added memory barriers.
> - Implemented SET_OWNER. (by doing nothing).
>
> TODO:
> - move all declarations before code inside block.
> - main should get parameters from the command line.
Not required for merge.
> - cleanup debug printings.
> - implement all request handlers.
> - test for broken requests and virtqueue.
Not required for merge.
> - implement features defined by Virtio 1.0 spec.
Not required for merge.
> - support mergeable buffers and indirect descriptors.
Not required for merge.
> - implement RESET_DEVICE request.
No need for this one I think.
> - implement clean shutdown.
> - implement non-blocking writes to UDP backend.
Not required for merge.
> - handle correctly blocking if there are no available
> descriptors on RX virtqueue.
> - implement polling strategy.
Not required for merge.
> ---
> tests/vhost-user-bridge.c | 1138
> +++++++++++++++++++++++++++++++++++++++++++++
> tests/Makefile | 1 +
> 2 files changed, 1139 insertions(+)
> create mode 100644 tests/vhost-user-bridge.c
>
> diff --git a/tests/vhost-user-bridge.c b/tests/vhost-user-bridge.c
> new file mode 100644
> index 0000000..89f1e07
> --- /dev/null
> +++ b/tests/vhost-user-bridge.c
> @@ -0,0 +1,1138 @@
> +/*
> + * Vhost-User Bridge
> + *
> + * Copyright (c) 2015 Red Hat, Inc.
> + *
> + * Authors:
> + * Victor Kaplansky <address@hidden>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or
> + * later. See the COPYING file in the top-level directory.
> + */
> +
> +#include <stddef.h>
> +#include <assert.h>
> +#include <stdio.h>
> +#include <stdlib.h>
> +#include <stdint.h>
> +#include <inttypes.h>
> +#include <string.h>
> +#include <unistd.h>
> +#include <errno.h>
> +#include <sys/types.h>
> +#include <sys/socket.h>
> +#include <sys/un.h>
> +#include <sys/unistd.h>
> +#include <sys/mman.h>
> +#include <sys/eventfd.h>
> +#include <arpa/inet.h>
> +
> +#include <linux/vhost.h>
> +
> +#include "qemu/atomic.h"
> +#include "standard-headers/linux/virtio_net.h"
> +#include "standard-headers/linux/virtio_ring.h"
> +
> +#define DEBUG_VHOST_USER_BRIDGE
> +
> +typedef void (*CallbackFunc)(int sock, void *ctx);
> +
> +typedef struct Event {
> + void *ctx;
> + CallbackFunc callback;
> +} Event;
> +
> +typedef struct Dispatcher {
> + int max_sock;
> + fd_set fdset;
> + Event events[FD_SETSIZE];
> +} Dispatcher;
> +
> +static void
> +vhost_user_die(const char *s)
> +{
> + perror(s);
> + exit(1);
> +}
> +
> +static int
> +dispatcher_init(Dispatcher *dispr)
> +{
> + FD_ZERO(&dispr->fdset);
> + dispr->max_sock = -1;
> + return 0;
> +}
> +
> +static int
> +dispatcher_add(Dispatcher *dispr, int sock, void *ctx, CallbackFunc cb)
> +{
> + if (sock >= FD_SETSIZE) {
> + fprintf(stderr,
> + "Error: Failed to add new event. sock %d should be less than
> %d\n",
> + sock, FD_SETSIZE);
> + return -1;
> + }
> +
> + dispr->events[sock].ctx = ctx;
> + dispr->events[sock].callback = cb;
> +
> + FD_SET(sock, &dispr->fdset);
> + if (sock > dispr->max_sock) {
> + dispr->max_sock = sock;
> + }
> + printf("Added sock %d for watching. max_sock: %d\n",
> + sock, dispr->max_sock);
> + return 0;
> +}
> +
> +#if 0
> +/* dispatcher_remove() is not currently in use but may be useful
> + * in the future. */
> +static int
> +dispatcher_remove(Dispatcher *dispr, int sock)
> +{
> + if (sock >= FD_SETSIZE) {
> + fprintf(stderr,
> + "Error: Failed to remove event. sock %d should be less than
> %d\n",
> + sock, FD_SETSIZE);
> + return -1;
> + }
> +
> + FD_CLR(sock, &dispr->fdset);
> + return 0;
> +}
> +#endif
> +
> +/* timeout in us */
> +static int
> +dispatcher_wait(Dispatcher *dispr, uint32_t timeout)
> +{
> + struct timeval tv;
> + tv.tv_sec = timeout / 1000000;
> + tv.tv_usec = timeout % 1000000;
> +
> + fd_set fdset = dispr->fdset;
> +
> + /* wait until some of sockets become readable. */
> + int rc = select(dispr->max_sock + 1, &fdset, 0, 0, &tv);
> +
> + if (rc == -1) {
> + vhost_user_die("select");
> + }
> +
> + /* Timeout */
> + if (rc == 0) {
> + return 0;
> + }
> +
> + /* Now call callback for every ready socket. */
> +
> + int sock;
> + for (sock = 0; sock < dispr->max_sock + 1; sock++)
> + if (FD_ISSET(sock, &fdset)) {
> + Event *e = &dispr->events[sock];
> + e->callback(sock, e->ctx);
> + }
> +
> + return 0;
> +}
> +
> +typedef struct VirtQueue {
> + int call_fd;
> + int kick_fd;
> + uint32_t size;
> + uint16_t last_avail_index;
> + uint16_t last_used_index;
> + struct vring_desc *desc;
> + struct vring_avail *avail;
> + struct vring_used *used;
> +} VirtQueue;
> +
> +/* Based on qemu/hw/virtio/vhost-user.c */
> +
> +#define VHOST_MEMORY_MAX_NREGIONS 8
> +#define VHOST_USER_F_PROTOCOL_FEATURES 30
> +
> +enum VhostUserProtocolFeature {
> + VHOST_USER_PROTOCOL_F_MQ = 0,
> + VHOST_USER_PROTOCOL_F_LOG_SHMFD = 1,
> + VHOST_USER_PROTOCOL_F_RARP = 2,
> +
> + VHOST_USER_PROTOCOL_F_MAX
> +};
> +
> +#define VHOST_USER_PROTOCOL_FEATURE_MASK ((1 << VHOST_USER_PROTOCOL_F_MAX) -
> 1)
> +
> +typedef enum VhostUserRequest {
> + VHOST_USER_NONE = 0,
> + VHOST_USER_GET_FEATURES = 1,
> + VHOST_USER_SET_FEATURES = 2,
> + VHOST_USER_SET_OWNER = 3,
> + VHOST_USER_RESET_DEVICE = 4,
> + VHOST_USER_SET_MEM_TABLE = 5,
> + VHOST_USER_SET_LOG_BASE = 6,
> + VHOST_USER_SET_LOG_FD = 7,
> + VHOST_USER_SET_VRING_NUM = 8,
> + VHOST_USER_SET_VRING_ADDR = 9,
> + VHOST_USER_SET_VRING_BASE = 10,
> + VHOST_USER_GET_VRING_BASE = 11,
> + VHOST_USER_SET_VRING_KICK = 12,
> + VHOST_USER_SET_VRING_CALL = 13,
> + VHOST_USER_SET_VRING_ERR = 14,
> + VHOST_USER_GET_PROTOCOL_FEATURES = 15,
> + VHOST_USER_SET_PROTOCOL_FEATURES = 16,
> + VHOST_USER_GET_QUEUE_NUM = 17,
> + VHOST_USER_SET_VRING_ENABLE = 18,
> + VHOST_USER_SEND_RARP = 19,
> + VHOST_USER_MAX
> +} VhostUserRequest;
Maybe we need a common copy under tests/
> +
> +typedef struct VhostUserMemoryRegion {
> + uint64_t guest_phys_addr;
> + uint64_t memory_size;
> + uint64_t userspace_addr;
> + uint64_t mmap_offset;
> +} VhostUserMemoryRegion;
> +
> +typedef struct VhostUserMemory {
> + uint32_t nregions;
> + uint32_t padding;
> + VhostUserMemoryRegion regions[VHOST_MEMORY_MAX_NREGIONS];
> +} VhostUserMemory;
> +
> +typedef struct VhostUserMsg {
> + VhostUserRequest request;
> +
> +#define VHOST_USER_VERSION_MASK (0x3)
> +#define VHOST_USER_REPLY_MASK (0x1<<2)
> + uint32_t flags;
> + uint32_t size; /* the following payload size */
> + union {
> +#define VHOST_USER_VRING_IDX_MASK (0xff)
> +#define VHOST_USER_VRING_NOFD_MASK (0x1<<8)
> + uint64_t u64;
> + struct vhost_vring_state state;
> + struct vhost_vring_addr addr;
> + VhostUserMemory memory;
> + } payload;
> + int fds[VHOST_MEMORY_MAX_NREGIONS];
> + int fd_num;
> +} QEMU_PACKED VhostUserMsg;
> +
> +#define VHOST_USER_HDR_SIZE offsetof(VhostUserMsg, payload.u64)
> +
> +/* The version of the protocol we support */
> +#define VHOST_USER_VERSION (0x1)
> +
> +#define MAX_NR_VIRTQUEUE (8)
> +
> +typedef struct VhostDevRegion {
> + /* Guest Phhysical address. */
> + uint64_t gpa;
> + /* Memory region size. */
> + uint64_t size;
> + /* QEMU virtual address (userspace). */
> + uint64_t qva;
> + /* Starting offset in our mmaped space. */
> + uint64_t mmap_offset;
> + /* Start addrtess of mmaped space. */
> + uint64_t mmap_addr;
> +} VhostDevRegion;
> +
> +typedef struct VhostDev {
> + int sock;
> + Dispatcher dispatcher;
> + uint32_t nregions;
> + VhostDevRegion regions[VHOST_MEMORY_MAX_NREGIONS];
> + VirtQueue virtqueue[MAX_NR_VIRTQUEUE];
> + int backend_udp_sock;
> + struct sockaddr_in backend_udp_dest;
> +} VhostDev;
> +
> +static const char *vhost_user_request_str[] = {
> + [VHOST_USER_NONE] = "VHOST_USER_NONE",
> + [VHOST_USER_GET_FEATURES] = "VHOST_USER_GET_FEATURES",
> + [VHOST_USER_SET_FEATURES] = "VHOST_USER_SET_FEATURES",
> + [VHOST_USER_SET_OWNER] = "VHOST_USER_SET_OWNER",
> + [VHOST_USER_RESET_DEVICE] = "VHOST_USER_RESET_DEVICE",
> + [VHOST_USER_SET_MEM_TABLE] = "VHOST_USER_SET_MEM_TABLE",
> + [VHOST_USER_SET_LOG_BASE] = "VHOST_USER_SET_LOG_BASE",
> + [VHOST_USER_SET_LOG_FD] = "VHOST_USER_SET_LOG_FD",
> + [VHOST_USER_SET_VRING_NUM] = "VHOST_USER_SET_VRING_NUM",
> + [VHOST_USER_SET_VRING_ADDR] = "VHOST_USER_SET_VRING_ADDR",
> + [VHOST_USER_SET_VRING_BASE] = "VHOST_USER_SET_VRING_BASE",
> + [VHOST_USER_GET_VRING_BASE] = "VHOST_USER_GET_VRING_BASE",
> + [VHOST_USER_SET_VRING_KICK] = "VHOST_USER_SET_VRING_KICK",
> + [VHOST_USER_SET_VRING_CALL] = "VHOST_USER_SET_VRING_CALL",
> + [VHOST_USER_SET_VRING_ERR] = "VHOST_USER_SET_VRING_ERR",
> + [VHOST_USER_GET_PROTOCOL_FEATURES] =
> "VHOST_USER_GET_PROTOCOL_FEATURES",
> + [VHOST_USER_SET_PROTOCOL_FEATURES] =
> "VHOST_USER_SET_PROTOCOL_FEATURES",
> + [VHOST_USER_GET_QUEUE_NUM] = "VHOST_USER_GET_QUEUE_NUM",
> + [VHOST_USER_SET_VRING_ENABLE] = "VHOST_USER_SET_VRING_ENABLE",
> + [VHOST_USER_SEND_RARP] = "VHOST_USER_SEND_RARP",
> + [VHOST_USER_MAX] = "VHOST_USER_MAX",
> +};
> +
> +static void
> +print_buffer(uint8_t *buf, size_t len)
> +{
> + int i;
> + printf("Raw buffer:\n");
> + for (i = 0; i < len; i++) {
> + if (i % 16 == 0) {
> + printf("\n");
> + }
> + if (i % 4 == 0) {
> + printf(" ");
> + }
> + printf("%02x ", buf[i]);
> + }
> +
> printf("\n............................................................\n");
> +}
> +
> +/* Translate guest physical address to our virtual address. */
> +static uint64_t
> +gpa_to_va(VhostDev *dev, uint64_t guest_addr)
> +{
> + int i;
> +
> + /* Find matching memory region. */
> + for (i = 0; i < dev->nregions; i++) {
> + VhostDevRegion *r = &dev->regions[i];
> +
> + if ((guest_addr >= r->gpa) && (guest_addr < (r->gpa + r->size))) {
> + return guest_addr - r->gpa + r->mmap_addr + r->mmap_offset;
> + }
> + }
> +
> + assert(!"address not found in regions");
> + return 0;
> +}
> +
> +/* Translate qemu virtual address to our virtual address. */
> +static uint64_t
> +qva_to_va(VhostDev *dev, uint64_t qemu_addr)
> +{
> + int i;
> +
> + /* Find matching memory region. */
> + for (i = 0; i < dev->nregions; i++) {
> + VhostDevRegion *r = &dev->regions[i];
> +
> + if ((qemu_addr >= r->qva) && (qemu_addr < (r->qva + r->size))) {
> + return qemu_addr - r->qva + r->mmap_addr + r->mmap_offset;
> + }
> + }
> +
> + assert(!"address not found in regions");
> + return 0;
> +}
> +
> +static void
> +vhost_user_message_read(int conn_fd, VhostUserMsg *vmsg)
> +{
> + char control[CMSG_SPACE(VHOST_MEMORY_MAX_NREGIONS * sizeof(int))] = { };
> + struct iovec iov = {
> + .iov_base = (char *)vmsg,
> + .iov_len = VHOST_USER_HDR_SIZE,
> + };
> + struct msghdr msg = {
> + .msg_iov = &iov,
> + .msg_iovlen = 1,
> + .msg_control = control,
> + .msg_controllen = sizeof(control),
> + };
> + size_t fd_size;
> + struct cmsghdr *cmsg;
> + int rc;
> +
> + rc = recvmsg(conn_fd, &msg, 0);
> +
> + if (rc <= 0) {
> + vhost_user_die("recvmsg");
> + }
> +
> + vmsg->fd_num = 0;
> + for (cmsg = CMSG_FIRSTHDR(&msg);
> + cmsg != NULL;
> + cmsg = CMSG_NXTHDR(&msg, cmsg))
> + {
> + if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS)
> {
> + fd_size = cmsg->cmsg_len - CMSG_LEN(0);
> + vmsg->fd_num = fd_size / sizeof(int);
> + memcpy(vmsg->fds, CMSG_DATA(cmsg), fd_size);
> + break;
> + }
> + }
> +
> + if (vmsg->size > sizeof(vmsg->payload)) {
> + fprintf(stderr,
> + "Error: too big message request: %d, size: vmsg->size: %u, "
> + "while sizeof(vmsg->payload) = %lu\n",
> + vmsg->request, vmsg->size, sizeof(vmsg->payload));
> + exit(1);
> + }
> +
> + if (vmsg->size) {
> + rc = read(conn_fd, &vmsg->payload, vmsg->size);
> + if (rc <= 0) {
> + vhost_user_die("recvmsg");
> + }
> +
> + assert(rc == vmsg->size);
> + }
> +}
> +
> +static void
> +vhost_user_message_write(int conn_fd, VhostUserMsg *vmsg)
> +{
> + int rc;
> + do {
> + rc = write(conn_fd, vmsg, VHOST_USER_HDR_SIZE + vmsg->size);
> + } while (rc < 0 && errno == EINTR);
> +
> + if (rc < 0) {
> + vhost_user_die("write");
> + }
> +}
> +
> +static void
> +vhost_user_backend_udp_sendbuf(VhostDev *dev,
> + uint8_t *buf,
> + size_t len)
> +{
> + int slen = sizeof(struct sockaddr_in);
> +
> + if (sendto(dev->backend_udp_sock, buf, len, 0,
> + (struct sockaddr *) &dev->backend_udp_dest, slen) == -1) {
> + vhost_user_die("sendto()");
> + }
> +}
> +
> +static int
> +vhost_user_backend_udp_recvbuf(VhostDev *dev,
> + uint8_t *buf,
> + size_t buflen)
> +{
> + int slen = sizeof(struct sockaddr_in);
> + int rc;
> +
> + rc = recvfrom(dev->backend_udp_sock, buf, buflen, 0,
> + (struct sockaddr *) &dev->backend_udp_dest,
> + (socklen_t *)&slen);
> + if (rc == -1) {
> + vhost_user_die("recvfrom()");
> + }
> +
> + return rc;
> +}
> +
> +static void
> +vubr_consume_raw_packet(VhostDev *dev, uint8_t *buf, uint32_t len)
> +{
> + int hdrlen = sizeof(struct virtio_net_hdr_v1);
> +
> +#ifdef DEBUG_VHOST_USER_BRIDGE
> + print_buffer(buf, len);
> +#endif
> + vhost_user_backend_udp_sendbuf(dev, buf + hdrlen, len - hdrlen);
> +}
> +
> +/* Kick the guest if necessary. */
> +static void
> +virtqueue_kick(VirtQueue *vq)
> +{
> + if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) {
> + printf("Kicking the guest...\n");
> + eventfd_write(vq->call_fd, 1);
> + }
> +}
> +
> +static void
> +vubr_post_buffer(VhostDev *dev,
> + VirtQueue *vq,
> + uint8_t *buf,
> + int32_t len)
> +{
> + struct vring_desc *desc = vq->desc;
> + struct vring_avail *avail = vq->avail;
> + struct vring_used *used = vq->used;
> +
> + unsigned int size = vq->size;
> +
> + assert(vq->last_avail_index != avail->idx);
Why? How do you know there's anything there?
> + /* Prevent accessing descriptors, buffers, avail->ring and used before
> + * avail->idx */
smp_rmb then? Can be fixed later ...
> + smp_mb();
> +
> + uint16_t a_index = vq->last_avail_index % size;
> + uint16_t u_index = vq->last_used_index % size;
> + uint16_t d_index = avail->ring[a_index];
> +
> + int i = d_index;
> +
> +
> +#ifdef DEBUG_VHOST_USER_BRIDGE
> + printf("Post packet to guest on vq:\n");
> + printf(" size = %d\n", vq->size);
> + printf(" last_avail_index = %d\n", vq->last_avail_index);
> + printf(" last_used_index = %d\n", vq->last_used_index);
> + printf(" a_index = %d\n", a_index);
> + printf(" u_index = %d\n", u_index);
> + printf(" d_index = %d\n", d_index);
> + printf(" desc[%d].addr = 0x%016"PRIx64"\n", i, desc[i].addr);
> + printf(" desc[%d].len = %d\n", i, desc[i].len);
> + printf(" desc[%d].flags = %d\n", i, desc[i].flags);
> + printf(" avail->idx = %d\n", avail->idx);
> + printf(" used->idx = %d\n", used->idx);
> +#endif
> +
> + if (!(desc[i].flags & VRING_DESC_F_WRITE)) {
> + /* FIXME: we should find writable descriptor. */
> + fprintf(stderr, "Error: descriptor is not writable. Exiting.\n");
> + exit(1);
> + }
> +
> + void *chunk_start = (void *)gpa_to_va(dev, desc[i].addr);
> + uint32_t chunk_len = desc[i].len;
> +
> + if (len <= chunk_len) {
> + memcpy(chunk_start, buf, len);
> + } else {
> + fprintf(stderr,
> + "Received too long packet from the backend. Dropping...\n");
> + return;
> + }
> +
> + /* Add descriptor to the used ring. */
> + used->ring[u_index].id = d_index;
> + used->ring[u_index].len = len;
> +
> + vq->last_avail_index++;
> + vq->last_used_index++;
> +
> + /* Prevent accessing avail, descriptors, buffers and used->ring after
> + * the store to used->idx */
> + smp_mb();
> + used->idx = vq->last_used_index;
> +
> + /* Kick the guest if necessary. */
> + virtqueue_kick(vq);
> +}
> +
> +static int
> +vubr_process_desc(VhostDev *dev, VirtQueue *vq)
> +{
> + struct vring_desc *desc = vq->desc;
> + struct vring_avail *avail = vq->avail;
> + struct vring_used *used = vq->used;
> +
> + unsigned int size = vq->size;
> +
> + uint16_t a_index = vq->last_avail_index % size;
> + uint16_t u_index = vq->last_used_index % size;
> + uint16_t d_index = avail->ring[a_index];
> +
> + uint32_t i, len = 0;
> + size_t buf_size = 4096;
> + uint8_t buf[4096];
> +
> +#ifdef DEBUG_VHOST_USER_BRIDGE
> + printf("Chunks: ");
> +#endif
> +
> + i = d_index;
> + do {
> + void *chunk_start = (void *)gpa_to_va(dev, desc[i].addr);
> + uint32_t chunk_len = desc[i].len;
> +
> + if (len + chunk_len < buf_size) {
> + memcpy(buf + len, chunk_start, chunk_len);
> +#ifdef DEBUG_VHOST_USER_BRIDGE
> + printf("%d ", chunk_len);
> +#endif
Wrap these in a macro so you don't need ifdefs in code.
> + } else {
> + fprintf(stderr, "Error: too long packet. Dropping...\n");
> + break;
> + }
> +
> + len += chunk_len;
> +
> + if (!(desc[i].flags & VRING_DESC_F_NEXT)) {
> + break;
> + }
> +
> + i = desc[i].next;
> + } while (1);
> +
> + if (!len) {
> + return -1;
> + }
> +
> + /* Add descriptor to the used ring. */
> + used->ring[u_index].id = d_index;
> + used->ring[u_index].len = len;
> +
> +#ifdef DEBUG_VHOST_USER_BRIDGE
> + printf("\n");
> +#endif
> +
> + vubr_consume_raw_packet(dev, buf, len);
> +
> + return 0;
> +}
> +
> +static void
> +vubr_process_avail(VhostDev *dev, VirtQueue *vq)
> +{
> + struct vring_avail *avail = vq->avail;
> + struct vring_used *used = vq->used;
> +
> + while (vq->last_avail_index != avail->idx) {
> + /* Prevent accessing avail->ring, descriptors and buffers before
> + * avail->idx */
> + smp_mb();
> +
> + vubr_process_desc(dev, vq);
> + vq->last_avail_index++;
> + vq->last_used_index++;
> + }
> +
> + /* Prevent accessing avail->ring, descriptors, buffers and used->ring,
> + * after user->idx */
> + smp_mb();
> +
> + used->idx = vq->last_used_index;
> +}
> +
> +static void
> +vubr_backend_recv_cb(int sock, void *ctx)
> +{
> + VhostDev *dev = (VhostDev *) ctx;
> + VirtQueue *rx_vq = &dev->virtqueue[0];
> + uint8_t buf[4096];
> + struct virtio_net_hdr_v1 *hdr = (struct virtio_net_hdr_v1 *)buf;
> + int hdrlen = sizeof(struct virtio_net_hdr_v1);
> + int buflen = sizeof(buf);
> + int len;
> +
> +#ifdef DEBUG_VHOST_USER_BRIDGE
> + printf("\n\n *** IN UDP RECEIVE CALLBACK ***\n\n");
> +#endif
> +
> + *hdr = (struct virtio_net_hdr_v1) { };
> + hdr->num_buffers = 1;
> +
> + len = vhost_user_backend_udp_recvbuf(dev, buf + hdrlen, buflen - hdrlen);
> + vubr_post_buffer(dev, rx_vq, buf, len + hdrlen);
> +}
> +
> +static void
> +vubr_kick_cb(int sock, void *ctx)
> +{
> + VhostDev *dev = (VhostDev *) ctx;
> + eventfd_t kick_data;
> + ssize_t rc;
> +
> + rc = eventfd_read(sock, &kick_data);
> +
> + if (rc == -1) {
> + vhost_user_die("eventfd_read()");
> + } else {
> + printf("Got kick_data: %016"PRIx64"\n", kick_data);
> + vubr_process_avail(dev, &dev->virtqueue[1]);
> + }
> +}
> +
> +static int
> +vhost_user_none_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + printf("Function %s() not implemented yet.\n", __func__);
> + return 0;
> +}
> +
> +static int
> +vhost_user_get_features_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
Please prefix everything with vubr_ consistently.
Same applies to types etc.
> +{
> + vmsg->payload.u64 =
> + ((1ULL << VIRTIO_NET_F_MRG_RXBUF) |
> + (1ULL << VIRTIO_NET_F_CTRL_VQ) |
> + (1ULL << VIRTIO_NET_F_CTRL_RX) |
> + (1ULL << VHOST_F_LOG_ALL));
> + vmsg->size = sizeof(vmsg->payload.u64);
> +
> + printf("Sending back to guest u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
> +
> + /* reply */
> + return 1;
> +}
> +
> +static int
> +vhost_user_set_features_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
> + return 0;
> +}
> +
> +static int
> +vhost_user_set_owner_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + return 0;
> +}
> +
> +static int
> +vhost_user_reset_device_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + printf("Function %s() not implemented yet.\n", __func__);
> + return 0;
> +}
> +
> +static int
> +vhost_user_set_mem_table_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + printf("Nregions: %d\n", vmsg->payload.memory.nregions);
> +
> + VhostUserMemory *memory = &vmsg->payload.memory;
> + dev->nregions = memory->nregions;
> + int i;
> + for (i = 0; i < dev->nregions; i++) {
> + VhostUserMemoryRegion *msg_region = &memory->regions[i];
> + VhostDevRegion *dev_region = &dev->regions[i];
> +
> + printf("Region %d\n", i);
> + printf(" guest_phys_addr: 0x%016"PRIx64"\n",
> + msg_region->guest_phys_addr);
> + printf(" memory_size: 0x%016"PRIx64"\n",
> + msg_region->memory_size);
> + printf(" userspace_addr 0x%016"PRIx64"\n",
> + msg_region->userspace_addr);
> + printf(" mmap_offset 0x%016"PRIx64"\n",
> + msg_region->mmap_offset);
> +
> + dev_region->gpa = msg_region->guest_phys_addr;
> + dev_region->size = msg_region->memory_size;
> + dev_region->qva = msg_region->userspace_addr;
> + dev_region->mmap_offset = msg_region->mmap_offset;
> +
> + void *mmap_addr;
> +
> + /* We don't use offset argument of mmap() since the
> + * mapped address has to be page aligned, and we use huge
> + * pages. */
> + mmap_addr = mmap(0, dev_region->size + dev_region->mmap_offset,
> + PROT_READ | PROT_WRITE, MAP_SHARED,
> + vmsg->fds[i], 0);
> +
> + if (mmap_addr == MAP_FAILED) {
> + vhost_user_die("mmap");
> + }
> +
> + dev_region->mmap_addr = (uint64_t) mmap_addr;
> + printf(" mmap_addr: 0x%016"PRIx64"\n",
> dev_region->mmap_addr);
> + }
> +
> + return 0;
> +}
> +
> +static int
> +vhost_user_set_log_base_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + printf("Function %s() not implemented yet.\n", __func__);
> + return 0;
> +}
> +
> +static int
> +vhost_user_set_log_fd_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + printf("Function %s() not implemented yet.\n", __func__);
> + return 0;
> +}
> +
> +static int
> +vhost_user_set_vring_num_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + unsigned int index = vmsg->payload.state.index;
> + unsigned int num = vmsg->payload.state.num;
> +
> + printf("State.index: %d\n", index);
> + printf("State.num: %d\n", num);
> + dev->virtqueue[index].size = num;
> + return 0;
> +}
> +
> +static int
> +vhost_user_set_vring_addr_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + struct vhost_vring_addr *vra = &vmsg->payload.addr;
> + printf("vhost_vring_addr:\n");
> + printf(" index: %d\n", vra->index);
> + printf(" flags: %d\n", vra->flags);
> + printf(" desc_user_addr: 0x%016llx\n", vra->desc_user_addr);
> + printf(" used_user_addr: 0x%016llx\n", vra->used_user_addr);
> + printf(" avail_user_addr: 0x%016llx\n", vra->avail_user_addr);
> + printf(" log_guest_addr: 0x%016llx\n", vra->log_guest_addr);
> +
> + unsigned int index = vra->index;
> + VirtQueue *vq = &dev->virtqueue[index];
> +
> + vq->desc = (struct vring_desc *)qva_to_va(dev, vra->desc_user_addr);
> + vq->used = (struct vring_used *)qva_to_va(dev, vra->used_user_addr);
> + vq->avail = (struct vring_avail *)qva_to_va(dev, vra->avail_user_addr);
> +
> + printf("Setting virtq addresses:\n");
> + printf(" vring_desc at %p\n", vq->desc);
> + printf(" vring_used at %p\n", vq->used);
> + printf(" vring_avail at %p\n", vq->avail);
> +
> + vq->last_used_index = vq->used->idx;
> + return 0;
> +}
> +
> +static int
> +vhost_user_set_vring_base_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + unsigned int index = vmsg->payload.state.index;
> + unsigned int num = vmsg->payload.state.num;
> +
> + printf("State.index: %d\n", index);
> + printf("State.num: %d\n", num);
> + dev->virtqueue[index].last_avail_index = num;
> +
> + return 0;
> +}
> +
> +static int
> +vhost_user_get_vring_base_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + printf("Function %s() not implemented yet.\n", __func__);
> + return 0;
> +}
> +
> +static int
> +vhost_user_set_vring_kick_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + uint64_t u64_arg = vmsg->payload.u64;
> + int index = u64_arg & VHOST_USER_VRING_IDX_MASK;
> +
> + printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
> +
> + assert((u64_arg & VHOST_USER_VRING_NOFD_MASK) == 0);
> + assert(vmsg->fd_num == 1);
> +
> + dev->virtqueue[index].kick_fd = vmsg->fds[0];
> + printf("Got kick_fd: %d for vq: %d\n", vmsg->fds[0], index);
> +
> + if (index % 2 == 1) {
> + /* TX queue. */
> + dispatcher_add(&dev->dispatcher, dev->virtqueue[index].kick_fd,
> + dev, vubr_kick_cb);
> +
> + printf("Waiting for kicks on fd: %d for vq: %d\n",
> + dev->virtqueue[index].kick_fd, index);
> + }
> + return 0;
> +}
> +
> +static int
> +vhost_user_set_vring_call_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + uint64_t u64_arg = vmsg->payload.u64;
> + int index = u64_arg & VHOST_USER_VRING_IDX_MASK;
> +
> + printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
> + assert((u64_arg & VHOST_USER_VRING_NOFD_MASK) == 0);
> + assert(vmsg->fd_num == 1);
> +
> + dev->virtqueue[index].call_fd = vmsg->fds[0];
> + printf("Got call_fd: %d for vq: %d\n", vmsg->fds[0], index);
> +
> + return 0;
> +}
> +
> +static int
> +vhost_user_set_vring_err_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
> + return 0;
> +}
> +
> +static int
> +vhost_user_get_protocol_features_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + /* FIXME: unimplented */
> + printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
> + return 0;
> +}
> +
> +static int
> +vhost_user_set_protocol_features_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + /* FIXME: unimplented */
> + printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
> + return 0;
> +}
> +
> +static int
> +vhost_user_get_queue_num_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + printf("Function %s() not implemented yet.\n", __func__);
> + return 0;
> +}
> +
> +static int
> +vhost_user_set_vring_enable_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + printf("Function %s() not implemented yet.\n", __func__);
> + return 0;
> +}
> +
> +static int
> +vhost_user_send_rarp_exec(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + printf("Function %s() not implemented yet.\n", __func__);
> + return 0;
> +}
> +
> +static int
> +vhost_user_execute_request(VhostDev *dev,
> + VhostUserMsg *vmsg)
> +{
> + /* Print out generic part of the request. */
> + printf(
> + "================== Vhost user message from QEMU
> ==================\n");
> + printf("Request: %s (%d)\n", vhost_user_request_str[vmsg->request],
> + vmsg->request);
> + printf("Flags: 0x%x\n", vmsg->flags);
> + printf("Size: %d\n", vmsg->size);
> +
> + if (vmsg->fd_num) {
> + int i;
> + printf("Fds:");
> + for (i = 0; i < vmsg->fd_num; i++) {
> + printf(" %d", vmsg->fds[i]);
> + }
> + printf("\n");
> + }
> +
> + switch (vmsg->request) {
> + case VHOST_USER_NONE:
> + return vhost_user_none_exec(dev, vmsg);
> + case VHOST_USER_GET_FEATURES:
> + return vhost_user_get_features_exec(dev, vmsg);
> + case VHOST_USER_SET_FEATURES:
> + return vhost_user_set_features_exec(dev, vmsg);
> + case VHOST_USER_SET_OWNER:
> + return vhost_user_set_owner_exec(dev, vmsg);
> + case VHOST_USER_RESET_DEVICE:
> + return vhost_user_reset_device_exec(dev, vmsg);
> + case VHOST_USER_SET_MEM_TABLE:
> + return vhost_user_set_mem_table_exec(dev, vmsg);
> + case VHOST_USER_SET_LOG_BASE:
> + return vhost_user_set_log_base_exec(dev, vmsg);
> + case VHOST_USER_SET_LOG_FD:
> + return vhost_user_set_log_fd_exec(dev, vmsg);
> + case VHOST_USER_SET_VRING_NUM:
> + return vhost_user_set_vring_num_exec(dev, vmsg);
> + case VHOST_USER_SET_VRING_ADDR:
> + return vhost_user_set_vring_addr_exec(dev, vmsg);
> + case VHOST_USER_SET_VRING_BASE:
> + return vhost_user_set_vring_base_exec(dev, vmsg);
> + case VHOST_USER_GET_VRING_BASE:
> + return vhost_user_get_vring_base_exec(dev, vmsg);
> + case VHOST_USER_SET_VRING_KICK:
> + return vhost_user_set_vring_kick_exec(dev, vmsg);
> + case VHOST_USER_SET_VRING_CALL:
> + return vhost_user_set_vring_call_exec(dev, vmsg);
> + case VHOST_USER_SET_VRING_ERR:
> + return vhost_user_set_vring_err_exec(dev, vmsg);
> + case VHOST_USER_GET_PROTOCOL_FEATURES:
> + return vhost_user_get_protocol_features_exec(dev, vmsg);
> + case VHOST_USER_SET_PROTOCOL_FEATURES:
> + return vhost_user_set_protocol_features_exec(dev, vmsg);
> + case VHOST_USER_GET_QUEUE_NUM:
> + return vhost_user_get_queue_num_exec(dev, vmsg);
> + case VHOST_USER_SET_VRING_ENABLE:
> + return vhost_user_set_vring_enable_exec(dev, vmsg);
> + case VHOST_USER_SEND_RARP:
> + return vhost_user_send_rarp_exec(dev, vmsg);
> +
> +
> + case VHOST_USER_MAX:
> + assert(vmsg->request != VHOST_USER_MAX);
> + }
> + return 0;
> +}
> +
> +static void
> +vhost_user_receive_cb(int sock, void *ctx)
> +{
> + VhostDev *dev = (VhostDev *) ctx;
> + VhostUserMsg vmsg;
> +
> + vhost_user_message_read(sock, &vmsg);
> +
> + int reply_requested = vhost_user_execute_request(dev, &vmsg);
> +
> + if (reply_requested) {
> + /* Set the version in the flags when sending the reply */
> + vmsg.flags &= ~VHOST_USER_VERSION_MASK;
> + vmsg.flags |= VHOST_USER_VERSION;
> + vmsg.flags |= VHOST_USER_REPLY_MASK;
> + vhost_user_message_write(sock, &vmsg);
> + }
> +}
> +
> +static void
> +vhost_user_accept_cb(int sock, void *ctx)
> +{
> + VhostDev *dev = (VhostDev *)ctx;
> + int conn_fd;
> + struct sockaddr_un un;
> + socklen_t len = sizeof(un);
> +
> + conn_fd = accept(sock, (struct sockaddr *) &un, &len);
> + if (conn_fd == -1) {
> + vhost_user_die("accept()");
> + }
> + printf("Got connection from remote peer on sock %d\n", conn_fd);
> + dispatcher_add(&dev->dispatcher, conn_fd, ctx, vhost_user_receive_cb);
> +}
> +
> +static VhostDev *
> +vhost_user_new(const char *path)
> +{
> + VhostDev *dev =
> + (VhostDev *) calloc(1, sizeof(VhostDev));
> +
> + dev->nregions = 0;
> +
> + int i;
> + for (i = 0; i < MAX_NR_VIRTQUEUE; i++) {
> + dev->virtqueue[i] = (VirtQueue) {
> + .call_fd = -1, .kick_fd = -1,
> + .size = 0,
> + .last_avail_index = 0, .last_used_index = 0,
> + .desc = 0, .avail = 0, .used = 0,
> + };
> + }
> +
> + /* Get a UNIX socket. */
> + dev->sock = socket(AF_UNIX, SOCK_STREAM, 0);
> + if (dev->sock == -1) {
> + vhost_user_die("socket");
> + }
> +
> + struct sockaddr_un un;
> + un.sun_family = AF_UNIX;
> + strcpy(un.sun_path, path);
> +
> + size_t len = sizeof(un.sun_family) + strlen(path);
> +
> + unlink(path);
> +
> + if (bind(dev->sock, (struct sockaddr *) &un, len) == -1) {
> + vhost_user_die("bind");
> + }
> +
> + if (listen(dev->sock, 1) == -1) {
> + vhost_user_die("listen");
> + }
> +
> + dispatcher_init(&dev->dispatcher);
> + dispatcher_add(&dev->dispatcher, dev->sock, (void *)dev,
> + vhost_user_accept_cb);
> +
> + printf("Waiting for connections on UNIX socket %s ...\n", path);
> + return dev;
> +}
> +
> +static void
> +vhost_user_backend_udp_setup(VhostDev *dev,
> + const char *local_host,
> + uint16_t local_port,
> + const char *dest_host,
> + uint16_t dest_port)
> +{
> +
> + struct sockaddr_in si_local;
> + int sock;
> +
> + sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
> + if (sock == -1) {
> + vhost_user_die("socket");
> + }
> +
> + memset((char *) &si_local, 0, sizeof(struct sockaddr_in));
> + si_local.sin_family = AF_INET;
> + si_local.sin_port = htons(local_port);
> + if (inet_aton(local_host, &si_local.sin_addr) == 0) {
> + fprintf(stderr, "inet_aton() failed.\n");
> + exit(1);
> + }
> +
> + if (bind(sock, (struct sockaddr *)&si_local, sizeof(si_local)) == -1) {
> + vhost_user_die("bind");
> + }
> +
> + /* setup destination for sends */
> + struct sockaddr_in *si_remote = &dev->backend_udp_dest;
> + memset((char *) si_remote, 0, sizeof(struct sockaddr_in));
> + si_remote->sin_family = AF_INET;
> + si_remote->sin_port = htons(dest_port);
> + if (inet_aton(dest_host, &si_remote->sin_addr) == 0) {
> + fprintf(stderr, "inet_aton() failed.\n");
> + exit(1);
> + }
> +
> + dev->backend_udp_sock = sock;
> + dispatcher_add(&dev->dispatcher, sock, dev, vubr_backend_recv_cb);
> + printf("Waiting for data from udp backend on %s:%d...\n",
> + local_host, local_port);
> +}
> +
> +static void
> +vhost_user_run(VhostDev *dev)
> +{
> + while (1) {
> + /* timeout 200ms */
> + dispatcher_wait(&dev->dispatcher, 200000);
> + /* Here one can try polling strategy. */
> + }
> +}
> +
> +int
> +main(int argc, char *argv[])
> +{
> + VhostDev *dev;
> +
> + dev = vhost_user_new("/tmp/vubr.sock");
> + if (!dev) {
> + return 1;
> + }
> +
> + vhost_user_backend_udp_setup(dev,
> + "127.0.0.1", 4444,
> + "127.0.0.1", 5555);
> + vhost_user_run(dev);
> + return 0;
> +}
> diff --git a/tests/Makefile b/tests/Makefile
> index 9341498..0811c68 100644
> --- a/tests/Makefile
> +++ b/tests/Makefile
> @@ -522,6 +522,7 @@ tests/qemu-iotests/socket_scm_helper$(EXESUF):
> tests/qemu-iotests/socket_scm_hel
> tests/test-qemu-opts$(EXESUF): tests/test-qemu-opts.o $(test-util-obj-y)
> tests/test-write-threshold$(EXESUF): tests/test-write-threshold.o
> $(test-block-obj-y)
> tests/test-netfilter$(EXESUF): tests/test-netfilter.o $(qtest-obj-y)
> +tests/vhost-user-bridge$(EXESUF): tests/vhost-user-bridge.o
Needs to be limited to when there's actual vhost-user support.
Same as existing vhost-user-test I guess.
>
> ifeq ($(CONFIG_POSIX),y)
> LIBS += -lutil
> --
> --Victor