qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v5] net: L2TPv3 transport


From: Anton Ivanov
Subject: Re: [Qemu-devel] [PATCH v5] net: L2TPv3 transport
Date: Tue, 25 Mar 2014 10:35:28 +0000
User-agent: Mozilla/5.0 (X11; Linux x86_64; rv:17.0) Gecko/20130922 Icedove/17.0.9

On 25/03/14 10:17, Stefan Hajnoczi wrote:
> On Mon, Mar 24, 2014 at 11:56:16AM +0000, address@hidden wrote:
>> 1. Correct buffering and corect poll FSM
>>
>> Current qemu queue logic assumes single packet inputs, not multiple packet
>> RX. If the receiver cannot take more packets it will enqueue the new
>> arrivals and incur a malloc, memcpy and free to do that.
>>
>> This is unnecessary if you use recvmmsg and treat the receive message array 
>> as
>> a ring buffer. If the peer returns a 0 you leave the message(s) in the 
>> receive 
>> array and use the remainder of the already message array to continue
>> reading. It is a natural queue, no need to double-buffer and memcpy on 
>> top of that.
>>
>> The poll logic and the send logic has been updated to account for that.
>>
>> End result - 10%+ performance compared to using qemu_send_packet_async
> Does this mean you regularly see qemu_send_packet_async() return 0
> because l2tp is reading more packets using recvmmsg() than the NIC can
> fit into its rx ring?  I'd like to understand this case better.

qemu_send_packet_async() will return non-zero if its underlying queue enqueues 
itself which is a memcpy and double-buffer. It will return zero only if l2tpv3 
fills it so fast that it fills the queue in queue.c

So the fact that qemu_send_packet_async() has returned a non-zero does not mean 
that we have not paid the price for it :) 

The relevant code is in qemu_net_queue_append which invoked by
qemu_net_queue_send which is what qemu_send_packet_async calls.

I do see that bit - the memcpy based enqueue invoked quite a few times.

>
>> +static void l2tpv3_writable(void *opaque)
>> +{
>> +    NetL2TPV3State *s = opaque;
>> +
>> +    l2tpv3_write_poll(s, false);
>> +
>> +    net_l2tpv3_send(s);
> This is the wrong function because net-l2tpv3_send() *reads* new packets
> from the socket.  We must tell the peer to resume transmitting packets
> so we can *write* them to the socket.
>
> This line should be:
> qemu_flush_queued_packets(&s->nc);

No.

That function doubles for local buffer using the recvmmsg vector, there
is never any packets in the queue from queue.c to flush. The exact idea
here is to eliminate that queue as it is surplus to requirements if you
are doing multirx. If you are doing multirx (or packet mmap) your
packets are in a multi-packet buffer already. It is a perfect queue,
there is no need for another :)

The send function is written so it picks up where it has left off last
time so any packets left over in the recvmmsg vector are flushed first
before any new reads.

>
>> +static int l2tpv3_verify_header(NetL2TPV3State *s, uint8_t *buf)
>> +{
>> +
>> +    uint32_t *session;
>> +    uint64_t cookie;
>> +
>> +    if ((!s->udp) && (!s->ipv6)) {
>> +        buf += sizeof(struct iphdr) /* fix for ipv4 raw */;
>> +    }
>> +
>> +    /* we do not do a strict check for "data" packets as per
>> +    * the RFC spec because the pure IP spec does not have
>> +    * that anyway.
>> +    */
>> +
>> +    if (s->cookie) {
>> +        if (s->cookie_is_64) {
>> +            cookie = ldq_be_p(buf + s->cookie_offset);
>> +        } else {
>> +            cookie = ldl_be_p(buf + s->cookie_offset);
>> +        }
>> +        if (cookie != s->rx_cookie) {
>> +            if (!s->header_mismatch) {
>> +                error_report("unknown cookie id\n");
>> +            }
> Do we need to set s->header_mismatch = true to prevent flooding logs?

I thought I did...

>
>> +            return -1;
>> +        }
>> +    }
>> +    session = (uint32_t *) (buf + s->session_offset);
>> +    if (ldl_be_p(session) != s->rx_session) {
>> +        if (!s->header_mismatch) {
>> +            error_report("session mismatch");
>> +        }
> Same here.
>
>> +static void net_l2tpv3_send(void *opaque)
>> +{
>> +    NetL2TPV3State *s = opaque;
>> +
>> +    int count, target_count, offset, size = 0;
>> +    struct mmsghdr *msgvec;
>> +    struct iovec *vec;
>> +    bool bad_read;
>> +    int data_size;
>> +
>> +    /* go into ring mode only if there is a "pending" tail */
>> +
>> +    if (s->queue_depth) {
>> +
>> +        /* The ring buffer we use has variable intake
>> +         * count of how much we can read varies - adjust accordingly
>> +         */
>> +
>> +        target_count = MAX_L2TPV3_MSGCNT - s->queue_depth;
>> +
>> +        /* Ensure we do not overrun the ring when we have
>> +         * a lot of enqueued packets
>> +         */
>> +
>> +        if (s->queue_head + target_count > MAX_L2TPV3_MSGCNT) {
>> +            target_count = MAX_L2TPV3_MSGCNT - s->queue_head;
>> +        }
>> +    } else {
>> +
>> +        /* we do not have any pending packets - we can use
>> +        * the whole message vector linearly instead of using
>> +        * it as a ring
>> +        */
>> +
>> +        s->queue_head = 0;
>> +        s->queue_tail = 0;
>> +        target_count = MAX_L2TPV3_MSGCNT;
>> +    }
>> +
>> +    msgvec = s->msgvec + s->queue_head;
>> +    if (target_count > 0) {
>> +        count = recvmmsg(
>> +                s->fd,
>> +                msgvec,
>> +                target_count, MSG_DONTWAIT, NULL);
>> +        s->queue_head = (s->queue_head + count) % MAX_L2TPV3_MSGCNT;
>> +        s->queue_depth += count;
> These calculations break if count == -1 because there was an error (e.g.
> EINTR).

Thanks, will fix.

>
>> +    }
>> +    if (s->queue_depth > 0 && (!s->delivering)) {
>> +        do {
>> +            msgvec = s->msgvec + s->queue_tail;
>> +            if (msgvec->msg_len > 0) {
>> +                data_size = msgvec->msg_len - s->header_size;
>> +                vec = msgvec->msg_hdr.msg_iov;
>> +                if ((data_size > 0) &&
>> +                    (l2tpv3_verify_header(s, vec->iov_base) == 0)) {
>> +                    vec++;
>> +                    /* we do not use the "normal" send and send async
>> +                     * functions here because we have our own buffer -
>> +                     * the message vector. If we use the "usual" ones
>> +                     * we will end up double-buffering.
>> +                     */
>> +                    s->delivering = true;
>> +                    /* deliver directly to peer bypassing queueing and
>> +                     * buffering
>> +                     */
>> +                    size = qemu_deliver_packet(
>> +                            &s->nc,
>> +                            QEMU_NET_PACKET_FLAG_NONE,
>> +                            vec->iov_base,
>> +                            data_size,
>> +                            s->nc.peer
>> +                        );
>> +                    s->delivering = false;
>> +                    bad_read = false;
>> +                } else {
>> +                    bad_read = true;
>> +                    if (!s->header_mismatch) {
>> +                        /* report error only once */
>> +                        error_report("l2tpv3 header verification failed");
>> +                        s->header_mismatch = true;
>> +                    }
>> +                }
>> +            } else {
>> +                bad_read = true;
>> +            }
>> +            if ((bad_read) || (size > 0)) {
>> +                s->queue_tail = (s->queue_tail + 1) % MAX_L2TPV3_MSGCNT;
>> +                s->queue_depth--;
>> +            }
>> +        } while (
>> +                (s->queue_depth > 0) &&
>> +                qemu_can_send_packet(&s->nc) &&
>> +                ((size > 0) || bad_read)
>> +            );
>> +    }
>> +    if (s->queue_depth >= (MAX_L2TPV3_MSGCNT - 1)) {
>> +        /* We change the read poll flag only if our internal queue
>> +         * (the message vector) is full. We do not change it on
>> +         * size == 0 as other transports because we do our own buffering
>> +         */
>> +        l2tpv3_read_poll(s, false);
> I don't see where l2tpv3_read_poll() is set to true again.  How do we
> resume reading the socket?

in l2tpv3_poll which is the .poll in the info.

A.

>




reply via email to

[Prev in Thread] Current Thread [Next in Thread]