[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r30384 - gnunet/src/set
From: |
gnunet |
Subject: |
[GNUnet-SVN] r30384 - gnunet/src/set |
Date: |
Mon, 21 Oct 2013 18:45:00 +0200 |
Author: cfuchs
Date: 2013-10-21 18:45:00 +0200 (Mon, 21 Oct 2013)
New Revision: 30384
Modified:
gnunet/src/set/gnunet-service-set_intersection.c
Log:
more work on intersection
Modified: gnunet/src/set/gnunet-service-set_intersection.c
===================================================================
--- gnunet/src/set/gnunet-service-set_intersection.c 2013-10-21 16:13:00 UTC
(rev 30383)
+++ gnunet/src/set/gnunet-service-set_intersection.c 2013-10-21 16:45:00 UTC
(rev 30384)
@@ -75,7 +75,7 @@
/**
* We sent the request message, and expect a BF
*/
- PHASE_EXPECT_BF,
+ PHASE_BF_EXCHANGE,
/**
* The protocol is over.
* Results may still have to be sent to the client.
@@ -107,34 +107,16 @@
struct GNUNET_MQ_Handle *mq;
/**
- * Number of ibf buckets received
+ * The bf we currently receive
*/
- unsigned int ibf_buckets_received;
+ struct BloomFilter *remote_bf;
/**
- * Copy of the set's strata estimator at the time of
- * creation of this operation
+ * BF of the set's element.
*/
- struct StrataEstimator *se;
+ struct BloomFilter *local_bf;
/**
- * The ibf we currently receive
- */
- struct InvertibleBloomFilter *remote_ibf;
-
- /**
- * IBF of the set's element.
- */
- struct InvertibleBloomFilter *local_ibf;
-
- /**
- * Maps IBF-Keys (specific to the current salt) to elements.
- * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
- * Colliding IBF-Keys are linked.
- */
- struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
-
- /**
* Current state of the operation.
*/
enum IntersectionOperationPhase phase;
@@ -297,27 +279,7 @@
eo->tunnel = NULL;
GNUNET_MESH_tunnel_destroy (t);
}
- if (NULL != eo->remote_ibf)
- {
- ibf_destroy (eo->remote_ibf);
- eo->remote_ibf = NULL;
- }
- if (NULL != eo->local_ibf)
- {
- ibf_destroy (eo->local_ibf);
- eo->local_ibf = NULL;
- }
- if (NULL != eo->se)
- {
- strata_estimator_destroy (eo->se);
- eo->se = NULL;
- }
- if (NULL != eo->key_to_element)
- {
- GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
destroy_key_to_element_iter, NULL);
- GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
- eo->key_to_element = NULL;
- }
+ // TODO: destroy set elements?
if (NULL != eo->spec)
{
if (NULL != eo->spec->context_msg)
@@ -488,28 +450,7 @@
}
-/**
- * Insert a key into an ibf.
- *
- * @param cls the ibf
- * @param key unused
- * @param value the key entry to get the key from
- */
-static int
-prepare_ibf_iterator (void *cls,
- uint32_t key,
- void *value)
-{
- struct InvertibleBloomFilter *ibf = cls;
- struct KeyEntry *ke = value;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting %x into ibf\n",
ke->ibf_key.key_val);
-
- ibf_insert (ibf, ke->ibf_key);
- return GNUNET_YES;
-}
-
-
/**
* Iterator for initializing the
* key-to-element mapping of a intersection operation
@@ -542,388 +483,41 @@
return GNUNET_YES;
}
-
/**
- * Create an ibf with the operation's elements
- * of the specified size
- *
- * @param eo the intersection operation
- * @param size size of the ibf to create
- */
-static void
-prepare_ibf (struct OperationState *eo, uint16_t size)
-{
- if (NULL == eo->key_to_element)
- {
- unsigned int len;
- len = GNUNET_CONTAINER_multihashmap_size (eo->set->elements);
- eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
- GNUNET_CONTAINER_multihashmap_iterate (eo->set->elements,
- init_key_to_element_iterator, eo);
- }
- if (NULL != eo->local_ibf)
- ibf_destroy (eo->local_ibf);
- eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
- GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
- prepare_ibf_iterator,
eo->local_ibf);
-}
-
-
-/**
- * Send an ibf of appropriate size.
- *
- * @param eo the intersection operation
- * @param ibf_order order of the ibf to send, size=2^order
- */
-static void
-send_ibf (struct OperationState *eo, uint16_t ibf_order)
-{
- unsigned int buckets_sent = 0;
- struct InvertibleBloomFilter *ibf;
-
- prepare_ibf (eo, 1<<ibf_order);
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n",
1<<ibf_order);
-
- ibf = eo->local_ibf;
-
- while (buckets_sent < (1 << ibf_order))
- {
- unsigned int buckets_in_message;
- struct GNUNET_MQ_Envelope *ev;
- struct IBFMessage *msg;
-
- buckets_in_message = (1 << ibf_order) - buckets_sent;
- /* limit to maximum */
- if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
- buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
-
- ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
- GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
- msg->reserved = 0;
- msg->order = ibf_order;
- msg->offset = htons (buckets_sent);
- ibf_write_slice (ibf, buckets_sent,
- buckets_in_message, &msg[1]);
- buckets_sent += buckets_in_message;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
- buckets_in_message, buckets_sent, 1<<ibf_order);
- GNUNET_MQ_send (eo->mq, ev);
- }
-
- eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
-}
-
-
-/**
- * Send a strata estimator to the remote peer.
- *
- * @param eo the intersection operation with the remote peer
- */
-static void
-send_strata_estimator (struct OperationState *eo)
-{
- struct GNUNET_MQ_Envelope *ev;
- struct GNUNET_MessageHeader *strata_msg;
-
- ev = GNUNET_MQ_msg_header_extra (strata_msg,
- SE_STRATA_COUNT * IBF_BUCKET_SIZE *
SE_IBF_SIZE,
- GNUNET_MESSAGE_TYPE_SET_P2P_SE);
- strata_estimator_write (eo->set->state->se, &strata_msg[1]);
- GNUNET_MQ_send (eo->mq, ev);
- eo->phase = PHASE_EXPECT_BF;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
-}
-
-
-/**
- * Compute the necessary order of an ibf
- * from the size of the symmetric set difference.
- *
- * @param diff the difference
- * @return the required size of the ibf
- */
-static unsigned int
-get_order_from_difference (unsigned int diff)
-{
- unsigned int ibf_order;
-
- ibf_order = 2;
- while ((1<<ibf_order) < (IBF_ALPHA * diff) || (1<<ibf_order) <
SE_IBF_HASH_NUM)
- ibf_order++;
- if (ibf_order > MAX_IBF_ORDER)
- ibf_order = MAX_IBF_ORDER;
- return ibf_order;
-}
-
-
-/**
- * Handle a strata estimator from a remote peer
- *
- * @param cls the intersection operation
- * @param mh the message
- */
-static void
-handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
-{
- struct OperationState *eo = cls;
- struct StrataEstimator *remote_se;
- int diff;
-
- if (eo->phase != PHASE_EXPECT_SE)
- {
- fail_intersection_operation (eo);
- GNUNET_break (0);
- return;
- }
- remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
- SE_IBF_HASH_NUM);
- strata_estimator_read (&mh[1], remote_se);
- GNUNET_assert (NULL != eo->se);
- diff = strata_estimator_difference (remote_se, eo->se);
- strata_estimator_destroy (remote_se);
- strata_estimator_destroy (eo->se);
- eo->se = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n",
- diff, 1<<get_order_from_difference (diff));
- send_ibf (eo, get_order_from_difference (diff));
-}
-
-
-
-/**
- * Iterator to send elements to a remote peer
- *
- * @param cls closure with the element key and the intersection operation
- * @param key ignored
- * @param value the key entry
- */
-static int
-send_element_iterator (void *cls,
- uint32_t key,
- void *value)
-{
- struct SendElementClosure *sec = cls;
- struct IBF_Key ibf_key = sec->ibf_key;
- struct OperationState *eo = sec->eo;
- struct KeyEntry *ke = value;
-
- if (ke->ibf_key.key_val != ibf_key.key_val)
- return GNUNET_YES;
- while (NULL != ke)
- {
- const struct GNUNET_SET_Element *const element = &ke->element->element;
- struct GNUNET_MQ_Envelope *ev;
- struct GNUNET_MessageHeader *mh;
-
- GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
- ev = GNUNET_MQ_msg_header_extra (mh, element->size,
GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
- if (NULL == ev)
- {
- /* element too large */
- GNUNET_break (0);
- continue;
- }
- memcpy (&mh[1], element->data, element->size);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n",
- GNUNET_h2s (&ke->element->element_hash));
- GNUNET_MQ_send (eo->mq, ev);
- ke = ke->next_colliding;
- }
- return GNUNET_NO;
-}
-
-/**
- * Send all elements that have the specified IBF key
- * to the remote peer of the intersection operation
- *
- * @param eo intersection operation
- * @param ibf_key IBF key of interest
- */
-static void
-send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key)
-{
- struct SendElementClosure send_cls;
-
- send_cls.ibf_key = ibf_key;
- send_cls.eo = eo;
- GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t)
ibf_key.key_val,
- &send_element_iterator,
&send_cls);
-}
-
-
-/**
- * Decode which elements are missing on each side, and
- * send the appropriate elemens and requests
- *
- * @param eo intersection operation
- */
-static void
-decode_and_send (struct OperationState *eo)
-{
- struct IBF_Key key;
- struct IBF_Key last_key;
- int side;
- unsigned int num_decoded;
- struct InvertibleBloomFilter *diff_ibf;
-
- GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
-
- prepare_ibf (eo, eo->remote_ibf->size);
- diff_ibf = ibf_dup (eo->local_ibf);
- ibf_subtract (diff_ibf, eo->remote_ibf);
-
- ibf_destroy (eo->remote_ibf);
- eo->remote_ibf = NULL;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n",
diff_ibf->size);
-
- num_decoded = 0;
- last_key.key_val = 0;
-
- while (1)
- {
- int res;
- int cycle_detected = GNUNET_NO;
-
- last_key = key;
-
- res = ibf_decode (diff_ibf, &side, &key);
- if (res == GNUNET_OK)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n",
- key.key_val);
- num_decoded += 1;
- if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val
== key.key_val))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded
%u/%u)\n",
- num_decoded, diff_ibf->size);
- cycle_detected = GNUNET_YES;
- }
- }
- if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected))
- {
- int next_order;
- next_order = 0;
- while (1<<next_order < diff_ibf->size)
- next_order++;
- next_order++;
- if (next_order <= MAX_IBF_ORDER)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "decoding failed, sending larger ibf (size %u)\n",
- 1<<next_order);
- send_ibf (eo, next_order);
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "set intersection failed: reached ibf limit\n");
- }
- break;
- }
- if (GNUNET_NO == res)
- {
- struct GNUNET_MQ_Envelope *ev;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending
DONE\n");
- ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
- GNUNET_MQ_send (eo->mq, ev);
- break;
- }
- if (1 == side)
- {
- send_elements_for_key (eo, key);
- }
- else if (-1 == side)
- {
- struct GNUNET_MQ_Envelope *ev;
- struct GNUNET_MessageHeader *msg;
-
- /* FIXME: before sending the request, check if we may just have the
element */
- /* FIXME: merge multiple requests */
- /* FIXME: remember somewhere that we already requested the element,
- * so that we don't request it again with the next ibf if decoding fails
*/
- ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
-
GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
-
- *(struct IBF_Key *) &msg[1] = key;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n");
- GNUNET_MQ_send (eo->mq, ev);
- }
- else
- {
- GNUNET_assert (0);
- }
- }
- ibf_destroy (diff_ibf);
-}
-
-
-/**
* Handle an IBF message from a remote peer.
*
* @param cls the intersection operation
* @param mh the header of the message
*/
static void
-handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
+handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
{
struct OperationState *eo = cls;
- struct IBFMessage *msg = (struct IBFMessage *) mh;
+ struct BFMessage *msg = (struct BFMessage *) mh;
unsigned int buckets_in_message;
- if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
- (eo->phase == PHASE_EXPECT_BF) )
+ if (eo->phase == PHASE_EXPECT_INITIAL )
{
- eo->phase = PHASE_EXPECT_BF_CONT;
- GNUNET_assert (NULL == eo->remote_ibf);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n",
1<<msg->order);
- eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
- eo->ibf_buckets_received = 0;
- if (0 != ntohs (msg->offset))
- {
- GNUNET_break (0);
- fail_intersection_operation (eo);
- return;
- }
+ eo->phase = PHASE_BF_EXCHANGE;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new bf of size %u\n",
1<<msg->order);
+
+ // if (the remote peer has less elements than us)
+ // run our elements through his bloomfilter
+ // else if (we have the same elements)
+ // done;
+ //
+ // evict elements we can exclude through the bloomfilter
+ //
+ // create a new bloomfilter over our remaining elements
+ //
+ // send our new count and the bloomfilter back
}
- else if (eo->phase == PHASE_EXPECT_BF_CONT)
+ else if (eo->phase == PHASE_BF_EXCHANGE)
{
- if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
- (1<<msg->order != eo->remote_ibf->size) )
- {
- GNUNET_break (0);
- fail_intersection_operation (eo);
- return;
- }
- }
- buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) /
IBF_BUCKET_SIZE;
-
- if (0 == buckets_in_message)
- {
- GNUNET_break_op (0);
- fail_intersection_operation (eo);
- return;
}
- if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message *
IBF_BUCKET_SIZE)
- {
- GNUNET_break (0);
- fail_intersection_operation (eo);
- return;
- }
-
- ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message,
eo->remote_ibf);
- eo->ibf_buckets_received += buckets_in_message;
-
- if (eo->ibf_buckets_received == eo->remote_ibf->size)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
- eo->phase = PHASE_EXPECT_ELEMENTS;
- decode_and_send (eo);
- }
}
@@ -987,82 +581,6 @@
/**
- * Handle an element message from a remote peer.
- *
- * @param cls the intersection operation
- * @param mh the message
- */
-static void
-handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
-{
- struct OperationState *eo = cls;
- struct ElementEntry *ee;
- uint16_t element_size;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
-
- if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
- (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
- {
- fail_intersection_operation (eo);
- GNUNET_break (0);
- return;
- }
- element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
- ee = GNUNET_malloc (sizeof *ee + element_size);
- memcpy (&ee[1], &mh[1], element_size);
- ee->element.size = element_size;
- ee->element.data = &ee[1];
- ee->remote = GNUNET_YES;
- GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash);
-
- /* FIXME: see if the element has already been inserted! */
-
- op_register_element (eo, ee);
- send_client_element (eo, &ee->element);
-}
-
-
-/**
- * Handle an element request from a remote peer.
- *
- * @param cls the intersection operation
- * @param mh the message
- */
-static void
-handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
-{
- struct OperationState *eo = cls;
- struct IBF_Key *ibf_key;
- unsigned int num_keys;
-
- /* look up elements and send them */
- if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
- {
- GNUNET_break (0);
- fail_intersection_operation (eo);
- return;
- }
-
- num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
-
- if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
- {
- GNUNET_break (0);
- fail_intersection_operation (eo);
- return;
- }
-
- ibf_key = (struct IBF_Key *) &mh[1];
- while (0 != num_keys--)
- {
- send_elements_for_key (eo, *ibf_key);
- ibf_key++;
- }
-}
-
-
-/**
* Handle a done message from a remote peer
*
* @param cls the intersection operation
@@ -1116,7 +634,6 @@
eo = GNUNET_new (struct OperationState);
tc->vt = _GSS_intersection_vt ();
tc->op = eo;
- eo->se = strata_estimator_dup (spec->set->state->se);
eo->generation_created = spec->set->current_generation++;
eo->set = spec->set;
eo->spec = spec;
@@ -1134,7 +651,7 @@
eo->set->state->ops_tail,
eo);
- send_operation_request (eo);
+ send_initial_bloomfilter (eo);
}
@@ -1164,13 +681,12 @@
eo->spec = spec;
eo->tunnel = tunnel;
eo->mq = GNUNET_MESH_mq_create (tunnel);
- eo->se = strata_estimator_dup (eo->set->state->se);
/* transfer ownership of mq and socket from incoming to eo */
GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head,
eo->set->state->ops_tail,
eo);
/* kick off the operation */
- send_strata_estimator (eo);
+ send_bloomfilter (eo);
}
@@ -1187,8 +703,9 @@
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "intersection set created\n");
set_state = GNUNET_new (struct SetState);
- set_state->se = strata_estimator_create (SE_STRATA_COUNT,
- SE_IBF_SIZE, SE_IBF_HASH_NUM);
+
+ //TODO: actually create that thing
+
return set_state;
}
@@ -1202,7 +719,7 @@
static void
intersection_add (struct SetState *set_state, struct ElementEntry *ee)
{
- strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
+ //TODO
}
@@ -1220,7 +737,7 @@
intersection_operation_destroy (set_state->ops_head);
if (NULL != set_state->se)
{
- strata_estimator_destroy (set_state->se);
+ //TODO: actually destroy that thing
set_state->se = NULL;
}
GNUNET_free (set_state);
@@ -1229,7 +746,6 @@
/**
* Remove the element given in the element message from the set.
- * Only marks the element as removed, so that older set operations can still
exchange it.
*
* @param set_state state of the set to remove from
* @param element set element to remove
@@ -1237,7 +753,7 @@
static void
intersection_remove (struct SetState *set_state, struct ElementEntry *element)
{
- /* FIXME: remove from strata estimator */
+ //TODO
}
@@ -1257,18 +773,9 @@
ntohs (mh->type), ntohs (mh->size));
switch (ntohs (mh->type))
{
- case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
- handle_p2p_ibf (eo, mh);
+ case GNUNET_MESSAGE_TYPE_SET_P2P_BF:
+ handle_p2p_bf (eo, mh);
break;
- case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
- handle_p2p_strata_estimator (eo, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
- handle_p2p_elements (eo, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
- handle_p2p_element_requests (eo, mh);
- break;
case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
handle_p2p_done (eo, mh);
break;
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r30384 - gnunet/src/set,
gnunet <=