[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet] 100/164: Added message flow control
From: |
gnunet |
Subject: |
[gnunet] 100/164: Added message flow control |
Date: |
Fri, 30 Jul 2021 15:32:46 +0200 |
This is an automated email from the git hooks/post-receive script.
grothoff pushed a commit to branch master
in repository gnunet.
commit 261276cc3b0c1d2a36309b6a76c4c731af244fd9
Author: Elias Summermatter <elias.summermatter@seccom.ch>
AuthorDate: Tue May 18 01:40:38 2021 +0200
Added message flow control
---
src/setu/gnunet-service-setu.c | 332 ++++++++++++++++++++++++++++++++++++++---
1 file changed, 309 insertions(+), 23 deletions(-)
diff --git a/src/setu/gnunet-service-setu.c b/src/setu/gnunet-service-setu.c
index 4ff91d81e..8139ec7d7 100644
--- a/src/setu/gnunet-service-setu.c
+++ b/src/setu/gnunet-service-setu.c
@@ -473,6 +473,16 @@ struct Operation
* Mode of operation that was chosen by the algorithm
*/
uint8_t mode_of_operation;
+
+ /**
+ * Hashmap to keep track of the send/received messages
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *message_control_flow;
+
+ /**
+ * Hashmap to keep track of the send/received inquiries (ibf keys)
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *inquiries_sent;
};
@@ -737,6 +747,28 @@ struct perf_rtt_struct
struct perf_rtt_struct perf_rtt;
+enum MESSAGE_CONTROL_FLOW_STATE
+{
+ MESSAGE_EMPTY,
+ MESSAGE_SENT,
+ MESSAGE_EXPECTED,
+ MESSAGE_RECEIVED,
+};
+
+enum MESSAGE_TYPE
+{
+ OFFER_MESSAGE,
+ DEMAND_MESSAGE,
+ ELEMENT_MESSAGE,
+};
+
+struct message_control_flow_element
+{
+ enum MESSAGE_CONTROL_FLOW_STATE offer;
+ enum MESSAGE_CONTROL_FLOW_STATE demand;
+ enum MESSAGE_CONTROL_FLOW_STATE element;
+};
+
/*
* Calcuate
*/
@@ -902,13 +934,6 @@ estimate_best_mode_of_operation(uint64_t avg_element_size,
(RTT_MIN_FULL + 0.5) * bandwith_latency_tradeoff + \
SIZEOF_REQUEST_FULL;
- LOG (GNUNET_ERROR_TYPE_ERROR,
- "YYYYY::::: est_set_diff_remote %u, local_set_size %u \n",
est_set_diff_remote, local_set_size);
-
- LOG (GNUNET_ERROR_TYPE_ERROR,
- "XXX::::: total_elements_to_send_local_send_first %u,
total_elements_to_send_remote_send_first %u \n",
- total_elements_to_send_local_send_first,
total_elements_to_send_remote_send_first);
-
/*
* Calculate bytes for differential Sync
*/
@@ -948,12 +973,6 @@ estimate_best_mode_of_operation(uint64_t avg_element_size,
uint64_t full_min = MIN(total_bytes_full_local_send_first,
total_bytes_full_local_send_first);
/* Decide between full and differential sync */
- LOG (GNUNET_ERROR_TYPE_ERROR,
- "OK::::: full_min: %u total_bytes_diff: %u
total_bytes_full_remote_send_first:%u total_bytes_full_local_send_first:%u\n",
- full_min,
- total_bytes_diff,
- total_bytes_full_remote_send_first,
- total_bytes_full_local_send_first);
if (full_min < total_bytes_diff) {
/* Decide between sending all element first or receiving all elements
*/
@@ -971,8 +990,8 @@ static int check_valid_phase(uint8_t allowed_phases[],
size_t size_phases, struc
for(uint32_t phase_ctr=0; phase_ctr < size_phases; phase_ctr++) {
uint8_t phase = allowed_phases[phase_ctr];
if (phase == op->phase) {
- LOG (GNUNET_ERROR_TYPE_ERROR,
- "Found correct phase\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Message received in valid phase\n");
return GNUNET_YES;
}
}
@@ -982,6 +1001,92 @@ static int check_valid_phase(uint8_t allowed_phases[],
size_t size_phases, struc
}
+static int
+update_message_control_flow(struct GNUNET_CONTAINER_MultiHashMap *hash_map,
+ enum MESSAGE_CONTROL_FLOW_STATE new_mcfs,
+ struct GNUNET_HashCode *hash_code,
+ enum MESSAGE_TYPE mt)
+{
+ struct message_control_flow_element *cfe = NULL;
+ enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
+
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "%u NEW_STATE %u\n", *hash_code->bits, new_mcfs);
+
+ cfe = GNUNET_CONTAINER_multihashmap_get(hash_map, hash_code);
+ if(NULL == cfe) {
+ cfe = (struct message_control_flow_element*)
GNUNET_malloc(sizeof(struct message_control_flow_element));
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "%u CREATE NEW!\n", *hash_code->bits);
+ }
+
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "ID: %u OFFER: %u DEMAND: %u ELEMEMT: %u %u\n", *hash_code->bits,
cfe->offer, cfe->demand, cfe->element);
+
+ if ( OFFER_MESSAGE == mt) {
+ mcfs = &cfe->offer;
+ } else if ( DEMAND_MESSAGE == mt ) {
+ mcfs = &cfe->demand;
+ } else if ( ELEMENT_MESSAGE == mt) {
+ mcfs = &cfe->element;
+ if(new_mcfs != MESSAGE_SENT && MESSAGE_RECEIVED != cfe->offer) {
+ LOG (GNUNET_ERROR_TYPE_ERROR, "Received an element without sent
offer!\n");
+ return GNUNET_NO;
+ }
+ /* Check that only requested elements are received! */
+ if(new_mcfs != MESSAGE_SENT && cfe->demand != MESSAGE_SENT) {
+ LOG (GNUNET_ERROR_TYPE_ERROR, "Received an element that was not
demanded\n");
+ return GNUNET_NO;
+ }
+ } else {
+ return GNUNET_SYSERR;
+ }
+
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "%u VALUE %u < %u \n", *hash_code->bits , new_mcfs, *mcfs);
+
+ if(new_mcfs <= *mcfs) {
+ return GNUNET_NO;
+ }
+
+ *mcfs = new_mcfs;
+
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "ID: %u OFFER: %u DEMAND: %u ELEMEMT: %u\n", *hash_code->bits,
cfe->offer, cfe->demand, cfe->element);
+
+ GNUNET_CONTAINER_multihashmap_put(hash_map,
hash_code,cfe,GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+ return GNUNET_YES;
+}
+
+static int
+is_message_in_message_control_flow(struct GNUNET_CONTAINER_MultiHashMap
*hash_map,
+ struct GNUNET_HashCode *hash_code,
+ enum MESSAGE_TYPE mt)
+{
+ struct message_control_flow_element *cfe = NULL;
+ enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
+
+ cfe = GNUNET_CONTAINER_multihashmap_get(hash_map, hash_code);
+ if(NULL == cfe) {
+ cfe = (struct message_control_flow_element*)
GNUNET_malloc(sizeof(struct message_control_flow_element));
+ }
+
+ if ( OFFER_MESSAGE == mt) {
+ mcfs = &cfe->offer;
+ } else if ( DEMAND_MESSAGE == mt ) {
+ mcfs = &cfe->demand;
+ } else if ( ELEMENT_MESSAGE == mt) {
+ mcfs = &cfe->element;
+ } else {
+ return GNUNET_SYSERR;
+ }
+ if(*mcfs != MESSAGE_EMPTY) {
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
+}
+
+
/**
* Iterator over hash map entries, called to
* destroy the linked list of colliding ibf key entries.
@@ -1746,6 +1851,9 @@ handle_union_p2p_strata_estimator (void *cls,
size_t len;
int is_compressed;
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "START OPERATION %u\n", op->peer_site);
+
// Setting peer site to receiving peer
op->peer_site = 1;
@@ -1923,6 +2031,52 @@ send_offers_iterator (void *cls,
if (ke->ibf_key.key_val != sec->ibf_key.key_val)
return GNUNET_YES;
+
+ /* Prevent implementation from sending a offer multible times in case of
roll switch */
+ if (GNUNET_YES !=
+ is_message_in_message_control_flow(
+ op->message_control_flow,
+ &ke->element->element_hash,
+ OFFER_MESSAGE)
+ )
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Skipping already sent processed element offer!\n");
+ return GNUNET_YES;
+ }
+
+ /* Save send offer message for message control */
+ if (GNUNET_YES !=
+ update_message_control_flow(
+ op->message_control_flow,
+ MESSAGE_SENT,
+ &ke->element->element_hash,
+ OFFER_MESSAGE)
+ )
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Double offer message sent found!\n");
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return GNUNET_NO;
+ };
+
+ /* Mark element to be expected to received */
+ if (GNUNET_YES !=
+ update_message_control_flow(
+ op->message_control_flow,
+ MESSAGE_EXPECTED,
+ &ke->element->element_hash,
+ DEMAND_MESSAGE)
+ )
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Double demand received found!\n");
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return GNUNET_NO;
+ };
+
perf_rtt.offer.sent += 1;
perf_rtt.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode);
@@ -1947,7 +2101,6 @@ send_offers_iterator (void *cls,
* @param op union operation
* @param ibf_key IBF key of interest
*/
-static void
send_offers_for_key (struct Operation *op,
struct IBF_Key ibf_key)
{
@@ -2103,8 +2256,6 @@ decode_and_send (struct Operation *op)
if (1 == side)
{
struct IBF_Key unsalted_key;
-
-
unsalt_key (&key,
op->salt_receive,
&unsalted_key);
@@ -2118,6 +2269,22 @@ decode_and_send (struct Operation *op)
perf_rtt.inquery.sent += 1;
perf_rtt.inquery.sent_var_bytes += sizeof(struct IBF_Key);
+
+ /** Add sent inquiries to hashmap for flow control **/
+ struct GNUNET_HashContext *hashed_key_context =
GNUNET_CRYPTO_hash_context_start ();
+ struct GNUNET_HashCode *hashed_key = (struct GNUNET_HashCode*)
GNUNET_malloc(sizeof(struct GNUNET_HashCode));;
+ enum MESSAGE_CONTROL_FLOW_STATE mcfs = MESSAGE_SENT;
+ GNUNET_CRYPTO_hash_context_read (hashed_key_context,
+ &key,
+ sizeof(struct IBF_Key));
+ GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
+ hashed_key);
+ GNUNET_CONTAINER_multihashmap_put(op->inquiries_sent,
+ hashed_key,
+ &mcfs,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
+ );
+
/* It may be nice to merge multiple requests, but with CADET's corking
it is not worth
* the effort additional complexity. */
ev = GNUNET_MQ_msg_extra (msg,
@@ -2460,6 +2627,21 @@ handle_union_p2p_elements (void *cls,
return;
}
+ if ( GNUNET_OK !=
+ update_message_control_flow(
+ op->message_control_flow,
+ MESSAGE_RECEIVED,
+ &ee->element_hash,
+ ELEMENT_MESSAGE)
+ )
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "An element has been received more than once!\n");
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ }
+
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Got element (size %u, hash %s) from peer\n",
(unsigned int) element_size,
@@ -2699,10 +2881,25 @@ handle_union_p2p_inquiry (void *cls,
num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
/ sizeof(struct IBF_Key);
ibf_key = (const struct IBF_Key *) &msg[1];
+
+ /** Add received inquiries to hashmap for flow control **/
+ struct GNUNET_HashContext *hashed_key_context =
GNUNET_CRYPTO_hash_context_start ();
+ struct GNUNET_HashCode *hashed_key = (struct GNUNET_HashCode*)
GNUNET_malloc(sizeof(struct GNUNET_HashCode));;
+ enum MESSAGE_CONTROL_FLOW_STATE mcfs = MESSAGE_RECEIVED;
+ GNUNET_CRYPTO_hash_context_read (hashed_key_context,
+ &ibf_key,
+ sizeof(struct IBF_Key));
+ GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
+ hashed_key);
+ GNUNET_CONTAINER_multihashmap_put(op->inquiries_sent,
+ hashed_key,
+ &mcfs,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
+ );
+
while (0 != num_keys--)
{
struct IBF_Key unsalted_key;
-
unsalt_key (ibf_key,
ntohl (msg->salt),
&unsalted_key);
@@ -2929,8 +3126,40 @@ handle_union_p2p_demand (void *cls,
num_hashes > 0;
hash++, num_hashes--)
{
- ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
- hash);
+ ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
+ hash);
+
+ /* Save send demand message for message control */
+ if (GNUNET_YES !=
+ update_message_control_flow(
+ op->message_control_flow,
+ MESSAGE_RECEIVED,
+ &ee->element_hash,
+ DEMAND_MESSAGE)
+ )
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Double demand message received found!\n");
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ };
+
+ /* Mark element to be expected to received */
+ if (GNUNET_YES !=
+ update_message_control_flow(
+ op->message_control_flow,
+ MESSAGE_SENT,
+ &ee->element_hash,
+ ELEMENT_MESSAGE)
+ )
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Double element message sent found!\n");
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ };
if (NULL == ee)
{
/* Demand for non-existing element. */
@@ -3076,6 +3305,55 @@ handle_union_p2p_offer (void *cls,
ev = GNUNET_MQ_msg_header_extra (demands,
sizeof(struct GNUNET_HashCode),
GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND);
+ /* Save send demand message for message control */
+ if (GNUNET_YES !=
+ update_message_control_flow(
+ op->message_control_flow,
+ MESSAGE_SENT,
+ hash,
+ DEMAND_MESSAGE)
+ )
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Double demand message sent found!\n");
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ };
+
+ /* Mark offer as received received */
+ if (GNUNET_YES !=
+ update_message_control_flow(
+ op->message_control_flow,
+ MESSAGE_RECEIVED,
+ hash,
+ OFFER_MESSAGE)
+ )
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Double offer message received found!\n");
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ };
+
+ /* Mark element to be expected to received */
+ if (GNUNET_YES !=
+ update_message_control_flow(
+ op->message_control_flow,
+ MESSAGE_EXPECTED,
+ hash,
+ ELEMENT_MESSAGE)
+ )
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Element already expected!\n");
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ };
+
+
GNUNET_memcpy (&demands[1],
hash,
sizeof(struct GNUNET_HashCode));
@@ -3917,8 +4195,12 @@ handle_client_evaluate (void *cls,
op->symmetric = msg->symmetric;
context = GNUNET_MQ_extract_nested_mh (msg);
- /* load config */
- load_config(op);
+ /* create hashmap for message control */
+ op->message_control_flow =
GNUNET_CONTAINER_multihashmap_create(32,GNUNET_NO);
+ op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create(32,GNUNET_NO);
+
+ /* load config */
+ load_config(op);
/* Advance generation values, so that
mutations won't interfer with the running operation. */
@@ -4109,6 +4391,10 @@ handle_client_accept (void *cls,
op->force_delta = msg->force_delta;
op->symmetric = msg->symmetric;
+ /* create hashmap for message control */
+ op->message_control_flow =
GNUNET_CONTAINER_multihashmap_create(32,GNUNET_NO);
+ op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create(32,GNUNET_NO);
+
/* load config */
load_config(op);
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
- [gnunet] 85/164: Perf test, (continued)
- [gnunet] 85/164: Perf test, gnunet, 2021/07/30
- [gnunet] 89/164: Perftest, gnunet, 2021/07/30
- [gnunet] 75/164: Final messurement, gnunet, 2021/07/30
- [gnunet] 74/164: Add new algo to determinate mode of operation, gnunet, 2021/07/30
- [gnunet] 95/164: Perftest, gnunet, 2021/07/30
- [gnunet] 98/164: Perftest, gnunet, 2021/07/30
- [gnunet] 101/164: Added element avg calculation, gnunet, 2021/07/30
- [gnunet] 77/164: Added new algo to determine operation mode, gnunet, 2021/07/30
- [gnunet] 81/164: Perf test, gnunet, 2021/07/30
- [gnunet] 87/164: Perf test, gnunet, 2021/07/30
- [gnunet] 100/164: Added message flow control,
gnunet <=
- [gnunet] 108/164: Added some more sec checks, gnunet, 2021/07/30
- [gnunet] 93/164: Perftest, gnunet, 2021/07/30
- [gnunet] 94/164: Perftest, gnunet, 2021/07/30
- [gnunet] 86/164: Perf test, gnunet, 2021/07/30
- [gnunet] 20/164: Run over night, gnunet, 2021/07/30
- [gnunet] 53/164: Reverte change to changes salt, gnunet, 2021/07/30
- [gnunet] 46/164: Test data 50 elements, gnunet, 2021/07/30
- [gnunet] 69/164: Pack IBF counter to use only as much storage as needed, gnunet, 2021/07/30
- [gnunet] 83/164: Perf test, gnunet, 2021/07/30
- [gnunet] 71/164: Fixed some ugly construct, gnunet, 2021/07/30