[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet] 29/32: transport (quic): mq handling fixes
From: |
gnunet |
Subject: |
[gnunet] 29/32: transport (quic): mq handling fixes |
Date: |
Tue, 18 Jul 2023 17:16:18 +0200 |
This is an automated email from the git hooks/post-receive script.
marshall pushed a commit to branch master
in repository gnunet.
commit b37caeb2b2b453aa4cf8c704f2ce84eec4dbc170
Author: marshall <stmr@umich.edu>
AuthorDate: Wed Jul 12 14:21:57 2023 -0400
transport (quic): mq handling fixes
---
src/transport/gnunet-communicator-quic.c | 196 ++++++++++++++++++++++++++++---
1 file changed, 180 insertions(+), 16 deletions(-)
diff --git a/src/transport/gnunet-communicator-quic.c
b/src/transport/gnunet-communicator-quic.c
index cdbd05477..ab1b7b20b 100644
--- a/src/transport/gnunet-communicator-quic.c
+++ b/src/transport/gnunet-communicator-quic.c
@@ -70,6 +70,11 @@ struct PeerAddress
*/
socklen_t address_len;
+ /**
+ * The QUIC connection associated with this peer
+ */
+ struct quic_conn *conn;
+
/**
* Default message queue we are providing for the #ch.
*/
@@ -170,7 +175,7 @@ struct QUIC_header
* ASSUMES: connection is established to peer
*/
static void
-recv_from_streams (quiche_conn *conn, char*stream_buf, size_t buf_size)
+recv_from_streams (quiche_conn *conn, char *stream_buf, size_t buf_size)
{
uint64_t s = 0;
quiche_stream_iter *readable;
@@ -336,6 +341,55 @@ flush_egress (struct quic_conn *conn)
}
+/**
+ * Increment receiver timeout due to activity.
+ *
+ * @param receiver address for which the timeout should be rescheduled
+ */
+static void
+reschedule_peer_timeout (struct PeerAddress *peer)
+{
+ peer->timeout =
+ GNUNET_TIME_relative_to_absolute
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ GNUNET_CONTAINER_heap_update_cost (peer->hn,
+ peer->timeout.abs_value_us);
+}
+
+
+/**
+ * Destroys a receiving state due to timeout or shutdown.
+ *
+ * @param receiver entity to close down
+ */
+static void
+peer_destroy (struct PeerAddress *peer)
+{
+
+ peer->peer_destroy_called = GNUNET_YES;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Disconnecting peer for peer `%s'\n",
+ GNUNET_i2s (&peer->target));
+ if (NULL != peer->d_qh)
+ {
+ GNUNET_TRANSPORT_communicator_mq_del (peer->d_qh);
+ peer->d_qh = NULL;
+ }
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_remove (peers,
+ &peer->target,
+ peer));
+ GNUNET_assert (peer == GNUNET_CONTAINER_heap_remove_node (peer->hn));
+ GNUNET_STATISTICS_set (stats,
+ "# peers active",
+ GNUNET_CONTAINER_multipeermap_size (peers),
+ GNUNET_NO);
+ GNUNET_free (peer->address);
+ GNUNET_free (peer->foreign_addr);
+ GNUNET_free (peer);
+}
+
+
/**
* Signature of functions implementing the sending functionality of a
* message queue.
@@ -351,6 +405,14 @@ mq_send_d (struct GNUNET_MQ_Handle *mq,
{
struct PeerAddress *peer = impl_state;
uint16_t msize = ntohs (msg->size);
+ struct quic_conn *q_conn = peer->conn;
+
+ if (NULL == q_conn->conn)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "peer never established quic connection\n");
+ return;
+ }
GNUNET_assert (mq == peer->d_mq);
if (msize > peer->d_mtu)
@@ -365,6 +427,7 @@ mq_send_d (struct GNUNET_MQ_Handle *mq,
return;
}
reschedule_peer_timeout (peer);
+
// if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
// dgram,
// sizeof(dgram),
@@ -380,6 +443,65 @@ mq_send_d (struct GNUNET_MQ_Handle *mq,
}
+/**
+ * Signature of functions implementing the destruction of a message
+ * queue. Implementations must not free @a mq, but should take care
+ * of @a impl_state.
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state our `struct PeerAddress`
+ */
+static void
+mq_destroy_d (struct GNUNET_MQ_Handle *mq, void *impl_state)
+{
+ struct PeerAddress *peer = impl_state;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Default MQ destroyed\n");
+ if (mq == peer->d_mq)
+ {
+ peer->d_mq = NULL;
+ if (GNUNET_YES != peer->peer_destroy_called)
+ peer_destroy (peer);
+ }
+}
+
+
+/**
+ * Implementation function that cancels the currently sent message.
+ *
+ * @param mq message queue
+ * @param impl_state our `struct PeerAddress`
+ */
+static void
+mq_cancel (struct GNUNET_MQ_Handle *mq, void *impl_state)
+{
+ /* Cancellation is impossible with QUIC; bail */
+ GNUNET_assert (0);
+}
+
+
+/**
+ * Generic error handler, called with the appropriate
+ * error code and the same closure specified at the creation of
+ * the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls our `struct ReceiverAddress`
+ * @param error error code
+ */
+static void
+mq_error (void *cls, enum GNUNET_MQ_Error error)
+{
+ struct PeerAddress *peer = cls;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "MQ error in queue to %s: %d\n",
+ GNUNET_i2s (&peer->target),
+ (int) error);
+ peer_destroy (peer);
+}
+
+
/**
* Convert UDP bind specification to a `struct sockaddr *`
*
@@ -557,10 +679,10 @@ setup_peer_mq (struct PeerAddress *peer)
peer->d_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_d,
&mq_destroy_d,
&mq_cancel,
- receiver,
+ peer,
NULL,
&mq_error,
- receiver);
+ peer);
peer->d_qh =
GNUNET_TRANSPORT_communicator_mq_add (ch,
&peer->target,
@@ -574,6 +696,44 @@ setup_peer_mq (struct PeerAddress *peer)
}
+/**
+ * Taken from: UDP communicator
+ * Converts @a address to the address string format used by this
+ * communicator in HELLOs.
+ *
+ * @param address the address to convert, must be AF_INET or AF_INET6.
+ * @param address_len number of bytes in @a address
+ * @return string representation of @a address
+ */
+static char *
+sockaddr_to_udpaddr_string (const struct sockaddr *address,
+ socklen_t address_len)
+{
+ char *ret;
+
+ switch (address->sa_family)
+ {
+ case AF_INET:
+ GNUNET_asprintf (&ret,
+ "%s-%s",
+ COMMUNICATOR_ADDRESS_PREFIX,
+ GNUNET_a2s (address, address_len));
+ break;
+
+ case AF_INET6:
+ GNUNET_asprintf (&ret,
+ "%s-%s",
+ COMMUNICATOR_ADDRESS_PREFIX,
+ GNUNET_a2s (address, address_len));
+ break;
+
+ default:
+ GNUNET_assert (0);
+ }
+ return ret;
+}
+
+
/**
* Function called when the transport service has received a
* backchannel message for this communicator (!) via a different return
@@ -682,9 +842,9 @@ mq_init (void *cls, const struct GNUNET_PeerIdentity
*peer_id, const
socklen_t in_len;
uint8_t scid[LOCAL_CONN_ID_LEN];
- struct quic_conn *conn;
+ struct quic_conn *q_conn;
char *bindto;
- socklen_t in_len;
+ socklen_t local_in_len;
struct sockaddr *local_addr;
if (GNUNET_OK !=
@@ -696,9 +856,9 @@ mq_init (void *cls, const struct GNUNET_PeerIdentity
*peer_id, const
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
COMMUNICATOR_CONFIG_SECTION,
"BINDTO");
- return;
+ return GNUNET_SYSERR;
}
- local_addr = udp_address_to_sockaddr (bindto, &in_len);
+ local_addr = udp_address_to_sockaddr (bindto, &local_in_len);
if (0 != strncmp (address,
COMMUNICATOR_ADDRESS_PREFIX "-",
@@ -739,24 +899,27 @@ mq_init (void *cls, const struct GNUNET_PeerIdentity
*peer_id, const
*/
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_STRONG, scid,
LOCAL_CONN_ID_LEN);
- conn = GNUNET_new (struct quic_conn);
- GNUNET_memcpy (conn->cid, scid, LOCAL_CONN_ID_LEN);
+ q_conn = GNUNET_new (struct quic_conn);
+ GNUNET_memcpy (q_conn->cid, scid, LOCAL_CONN_ID_LEN);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Attempting to perform handshake with peer\n");
- conn->conn = quiche_connect (peer->foreign_addr, scid, LOCAL_CONN_ID_LEN,
- local_addr,
- in_len, peer->address, peer->address_len,
- config);
+ "Attempting to perform QUIC handshake with peer\n");
+ q_conn->conn = quiche_connect (peer->foreign_addr, scid, LOCAL_CONN_ID_LEN,
+ local_addr,
+ local_in_len, peer->address,
peer->address_len,
+ config);
+
+ peer->conn = q_conn;
/**
* Insert connection into hashmap
*/
struct GNUNET_HashCode key;
- GNUNET_CRYPTO_hash (conn->cid, LOCAL_CONN_ID_LEN, &key);
- GNUNET_CONTAINER_multihashmap_put (conn_map, &key, conn,
+ GNUNET_CRYPTO_hash (q_conn->cid, LOCAL_CONN_ID_LEN, &key);
+ GNUNET_CONTAINER_multihashmap_put (conn_map, &key, q_conn,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
setup_peer_mq (peer);
if (NULL == timeout_task)
timeout_task = GNUNET_SCHEDULER_add_now (&check_timeouts, NULL);
+ GNUNET_free (local_addr);
return GNUNET_OK;
}
@@ -1035,6 +1198,7 @@ sock_read (void *cls)
quiche_conn_free (conn->conn);
GNUNET_free (conn);
}
+ GNUNET_free (local_addr);
}
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
- [gnunet] 19/32: transport (quic): minor cleanup, remove commented code, (continued)
- [gnunet] 19/32: transport (quic): minor cleanup, remove commented code, gnunet, 2023/07/18
- [gnunet] 22/32: transport (quic): token validation check, gnunet, 2023/07/18
- [gnunet] 23/32: transport (quic): get random block for cid, gnunet, 2023/07/18
- [gnunet] 25/32: transport (quic): comm connect, gnunet, 2023/07/18
- [gnunet] 24/32: transport (quic): remove gen_streamid, gnunet, 2023/07/18
- [gnunet] 21/32: transport (quic): fix compiler warnings, gnunet, 2023/07/18
- [gnunet] 26/32: transport (quic): add functions for mq handling, gnunet, 2023/07/18
- [gnunet] 31/32: transport (quic): create peermap using address, gnunet, 2023/07/18
- [gnunet] 32/32: transport (quic): uncrustify, gnunet, 2023/07/18
- [gnunet] 30/32: transport(quic): cleanup conn, gnunet, 2023/07/18
- [gnunet] 29/32: transport (quic): mq handling fixes,
gnunet <=
- [gnunet] 28/32: create PeerAddress, gnunet, 2023/07/18