[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH v5] net: L2TPv3 transport
From: |
Stefan Hajnoczi |
Subject: |
Re: [Qemu-devel] [PATCH v5] net: L2TPv3 transport |
Date: |
Tue, 25 Mar 2014 11:17:54 +0100 |
User-agent: |
Mutt/1.5.21 (2010-09-15) |
On Mon, Mar 24, 2014 at 11:56:16AM +0000, address@hidden wrote:
> 1. Correct buffering and corect poll FSM
>
> Current qemu queue logic assumes single packet inputs, not multiple packet
> RX. If the receiver cannot take more packets it will enqueue the new
> arrivals and incur a malloc, memcpy and free to do that.
>
> This is unnecessary if you use recvmmsg and treat the receive message array as
> a ring buffer. If the peer returns a 0 you leave the message(s) in the
> receive
> array and use the remainder of the already message array to continue
> reading. It is a natural queue, no need to double-buffer and memcpy on
> top of that.
>
> The poll logic and the send logic has been updated to account for that.
>
> End result - 10%+ performance compared to using qemu_send_packet_async
Does this mean you regularly see qemu_send_packet_async() return 0
because l2tp is reading more packets using recvmmsg() than the NIC can
fit into its rx ring? I'd like to understand this case better.
> +static void l2tpv3_writable(void *opaque)
> +{
> + NetL2TPV3State *s = opaque;
> +
> + l2tpv3_write_poll(s, false);
> +
> + net_l2tpv3_send(s);
This is the wrong function because net-l2tpv3_send() *reads* new packets
from the socket. We must tell the peer to resume transmitting packets
so we can *write* them to the socket.
This line should be:
qemu_flush_queued_packets(&s->nc);
> +static int l2tpv3_verify_header(NetL2TPV3State *s, uint8_t *buf)
> +{
> +
> + uint32_t *session;
> + uint64_t cookie;
> +
> + if ((!s->udp) && (!s->ipv6)) {
> + buf += sizeof(struct iphdr) /* fix for ipv4 raw */;
> + }
> +
> + /* we do not do a strict check for "data" packets as per
> + * the RFC spec because the pure IP spec does not have
> + * that anyway.
> + */
> +
> + if (s->cookie) {
> + if (s->cookie_is_64) {
> + cookie = ldq_be_p(buf + s->cookie_offset);
> + } else {
> + cookie = ldl_be_p(buf + s->cookie_offset);
> + }
> + if (cookie != s->rx_cookie) {
> + if (!s->header_mismatch) {
> + error_report("unknown cookie id\n");
> + }
Do we need to set s->header_mismatch = true to prevent flooding logs?
> + return -1;
> + }
> + }
> + session = (uint32_t *) (buf + s->session_offset);
> + if (ldl_be_p(session) != s->rx_session) {
> + if (!s->header_mismatch) {
> + error_report("session mismatch");
> + }
Same here.
> +static void net_l2tpv3_send(void *opaque)
> +{
> + NetL2TPV3State *s = opaque;
> +
> + int count, target_count, offset, size = 0;
> + struct mmsghdr *msgvec;
> + struct iovec *vec;
> + bool bad_read;
> + int data_size;
> +
> + /* go into ring mode only if there is a "pending" tail */
> +
> + if (s->queue_depth) {
> +
> + /* The ring buffer we use has variable intake
> + * count of how much we can read varies - adjust accordingly
> + */
> +
> + target_count = MAX_L2TPV3_MSGCNT - s->queue_depth;
> +
> + /* Ensure we do not overrun the ring when we have
> + * a lot of enqueued packets
> + */
> +
> + if (s->queue_head + target_count > MAX_L2TPV3_MSGCNT) {
> + target_count = MAX_L2TPV3_MSGCNT - s->queue_head;
> + }
> + } else {
> +
> + /* we do not have any pending packets - we can use
> + * the whole message vector linearly instead of using
> + * it as a ring
> + */
> +
> + s->queue_head = 0;
> + s->queue_tail = 0;
> + target_count = MAX_L2TPV3_MSGCNT;
> + }
> +
> + msgvec = s->msgvec + s->queue_head;
> + if (target_count > 0) {
> + count = recvmmsg(
> + s->fd,
> + msgvec,
> + target_count, MSG_DONTWAIT, NULL);
> + s->queue_head = (s->queue_head + count) % MAX_L2TPV3_MSGCNT;
> + s->queue_depth += count;
These calculations break if count == -1 because there was an error (e.g.
EINTR).
> + }
> + if (s->queue_depth > 0 && (!s->delivering)) {
> + do {
> + msgvec = s->msgvec + s->queue_tail;
> + if (msgvec->msg_len > 0) {
> + data_size = msgvec->msg_len - s->header_size;
> + vec = msgvec->msg_hdr.msg_iov;
> + if ((data_size > 0) &&
> + (l2tpv3_verify_header(s, vec->iov_base) == 0)) {
> + vec++;
> + /* we do not use the "normal" send and send async
> + * functions here because we have our own buffer -
> + * the message vector. If we use the "usual" ones
> + * we will end up double-buffering.
> + */
> + s->delivering = true;
> + /* deliver directly to peer bypassing queueing and
> + * buffering
> + */
> + size = qemu_deliver_packet(
> + &s->nc,
> + QEMU_NET_PACKET_FLAG_NONE,
> + vec->iov_base,
> + data_size,
> + s->nc.peer
> + );
> + s->delivering = false;
> + bad_read = false;
> + } else {
> + bad_read = true;
> + if (!s->header_mismatch) {
> + /* report error only once */
> + error_report("l2tpv3 header verification failed");
> + s->header_mismatch = true;
> + }
> + }
> + } else {
> + bad_read = true;
> + }
> + if ((bad_read) || (size > 0)) {
> + s->queue_tail = (s->queue_tail + 1) % MAX_L2TPV3_MSGCNT;
> + s->queue_depth--;
> + }
> + } while (
> + (s->queue_depth > 0) &&
> + qemu_can_send_packet(&s->nc) &&
> + ((size > 0) || bad_read)
> + );
> + }
> + if (s->queue_depth >= (MAX_L2TPV3_MSGCNT - 1)) {
> + /* We change the read poll flag only if our internal queue
> + * (the message vector) is full. We do not change it on
> + * size == 0 as other transports because we do our own buffering
> + */
> + l2tpv3_read_poll(s, false);
I don't see where l2tpv3_read_poll() is set to true again. How do we
resume reading the socket?