[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r8699 - in GNUnet/src/applications/dv_dht: module tools
From: |
gnunet |
Subject: |
[GNUnet-SVN] r8699 - in GNUnet/src/applications/dv_dht: module tools |
Date: |
Tue, 14 Jul 2009 22:20:49 -0600 |
Author: nevans
Date: 2009-07-14 22:20:49 -0600 (Tue, 14 Jul 2009)
New Revision: 8699
Modified:
GNUnet/src/applications/dv_dht/module/routing.c
GNUnet/src/applications/dv_dht/module/table.c
GNUnet/src/applications/dv_dht/module/table.h
GNUnet/src/applications/dv_dht/tools/dv_dht_driver_new.c
GNUnet/src/applications/dv_dht/tools/dv_test.conf
GNUnet/src/applications/dv_dht/tools/gnunetd_dv.conf
Log:
minor overhaul, redid return routing (still not perfect), added bloom filter to
avoid circular paths, removed dht level gossip (useless/extra bandwidth
consumed), no more discovery messages sent
Modified: GNUnet/src/applications/dv_dht/module/routing.c
===================================================================
--- GNUnet/src/applications/dv_dht/module/routing.c 2009-07-15 04:18:11 UTC
(rev 8698)
+++ GNUnet/src/applications/dv_dht/module/routing.c 2009-07-15 04:20:49 UTC
(rev 8699)
@@ -45,6 +45,17 @@
*/
#define DV_DHT_PRIORITY 0
+/*
+ * Number of hash functions for bloom filter
+ */
+#define DV_DHT_BLOOM_K 16
+
+/*
+ * Size in bytes of bloom filter
+ */
+#define DV_DHT_BLOOM_SIZE 4
+
+
/**
* What is the estimated per-hop delay for DV_DHT operations
* (this is how much we will request from the GNUnet core);
@@ -106,13 +117,14 @@
*/
GNUNET_ResultProcessor receiver;
+ /*
+ * Have we sent this specific response to a local client yet?
+ * (So we only give a single response to an application)
+ */
+ unsigned int received;
+
void *receiver_closure;
- /**
- * At what time will this record automatically
- * expire?
- */
- GNUNET_CronTime expire;
} DV_DHT_Source_Route;
@@ -149,6 +161,11 @@
*/
GNUNET_HashCode key;
+ /*
+ * Bloomfilter to stop circular routes
+ */
+ char bloomfilter[4];
+
#if DEBUG_ROUTING
/*
* Unique query id for sql database interaction.
@@ -171,12 +188,6 @@
{
/**
- * When does this record expire? Should be the max
- * of the individual source records.
- */
- GNUNET_CronTime expire;
-
- /**
* Information about where to send the results back to.
*/
DV_DHT_Source_Route *sources;
@@ -200,7 +211,7 @@
} DV_DHTQueryRecord;
/**
- * Linked list of active records.
+ * Array of active records.
*/
static DV_DHTQueryRecord *records;
@@ -208,6 +219,7 @@
* Size of records
*/
static unsigned int rt_size;
+static unsigned int next_record;
#if DEBUG_INSANE
static unsigned int indentation;
@@ -329,16 +341,14 @@
unsigned int size, const char *data, void *cls)
{
DV_DHTQueryRecord *q;
- unsigned int i;
- unsigned int j;
- int found;
GNUNET_HashCode hc;
DV_DHT_MESSAGE *result;
+ struct GNUNET_BloomFilter *bloom;
unsigned int routed;
unsigned int tracked;
+ unsigned int i;
DV_DHT_Source_Route *pos;
DV_DHT_Source_Route *prev;
- GNUNET_CronTime now;
#if DEBUG_ROUTING
GNUNET_EncName enc;
unsigned long long queryuid;
@@ -398,6 +408,7 @@
result->network_size =
htonl (GNUNET_DV_DHT_estimate_network_diameter ());
result->key = *key;
+ memset (&result->bloomfilter, 0, DV_DHT_BLOOM_SIZE);
if ((debug_routes) && (dhtlog != NULL))
{
dhtlog->insert_query (&queryuid, dhtqueryuid, DHTLOG_RESULT,
@@ -413,64 +424,40 @@
memcpy (&result[1], data, size);
}
+ bloom =
+ GNUNET_bloomfilter_init (NULL, &result->bloomfilter[0], DV_DHT_BLOOM_SIZE,
+ DV_DHT_BLOOM_K);
+ GNUNET_bloomfilter_add (bloom, &coreAPI->my_identity->hashPubKey);
+ GNUNET_bloomfilter_get_raw_data (bloom, &result->bloomfilter[0],
+ DV_DHT_BLOOM_SIZE);
+
GNUNET_hash (data, size, &hc);
routed = 0;
tracked = 0;
GNUNET_mutex_lock (lock);
- now = GNUNET_get_time ();
+
for (i = 0; i < rt_size; i++)
{
q = &records[i];
tracked++;
- if ((ntohl (q->get.type) != type) ||
- (0 != memcmp (key, &q->get.key, sizeof (GNUNET_HashCode))))
+ if ((ntohl (q->get.type) != type)
+ || (0 != memcmp (key, &q->get.key, sizeof (GNUNET_HashCode))))
continue;
- found = GNUNET_NO;
- for (j = 0; j < q->result_count; j++)
- if (0 == memcmp (&hc, &q->results[j], sizeof (GNUNET_HashCode)))
- {
- found = GNUNET_YES;
- break;
- }
- if (found == GNUNET_YES)
+ else
{
#if DEBUG_ROUTING
+ GNUNET_hash_to_enc (&q->get.key, &enc);
GNUNET_GE_LOG (coreAPI->ectx,
- GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER
- | GNUNET_GE_BULK,
- "Seen the same result earlier, not routing it
again.\n");
+ GNUNET_GE_WARNING | GNUNET_GE_ADMIN |
+ GNUNET_GE_USER | GNUNET_GE_BULK,
+ "Found matching request for reply `%s'\n", &enc);
#endif
- break;
}
-
routed++;
- GNUNET_array_grow (q->results, q->result_count, q->result_count + 1);
- q->results[q->result_count - 1] = hc;
pos = q->sources;
prev = NULL;
while (pos != NULL)
{
- if (pos->expire < now)
- {
-#if DEBUG_ROUTING
- GNUNET_hash_to_enc (&pos->source.hashPubKey, &enc);
- GNUNET_GE_LOG (coreAPI->ectx,
- GNUNET_GE_WARNING | GNUNET_GE_ADMIN |
- GNUNET_GE_USER | GNUNET_GE_BULK,
- "Route to peer `%s' has expired (%llu < %llu)\n",
- &enc, pos->expire, now);
-#endif
- if (prev == NULL)
- q->sources = pos->next;
- else
- prev->next = pos->next;
- GNUNET_free (pos);
- if (prev == NULL)
- pos = q->sources;
- else
- pos = prev->next;
- continue;
- }
if (0 != memcmp (&pos->source,
coreAPI->my_identity,
sizeof (GNUNET_PeerIdentity)))
@@ -497,7 +484,7 @@
if (stats != NULL)
stats->change (stat_replies_routed, 1);
}
- if (pos->receiver != NULL)
+ if ((pos->receiver != NULL) && (pos->received != GNUNET_YES))
{
#if DEBUG_ROUTING
GNUNET_GE_LOG (coreAPI->ectx,
@@ -506,6 +493,7 @@
"Routing result to local client\n");
#endif
pos->receiver (key, type, size, data, pos->receiver_closure);
+ pos->received = GNUNET_YES;
if ((debug_routes) && (dhtlog != NULL))
{
queryuid = ntohl (result->queryuid);
@@ -514,7 +502,7 @@
coreAPI->my_identity, key);
}
- if ((debug_routes_extended) && (dhtlog != NULL))
+ if ((debug_routes_extended) && (dhtlog != NULL))
{
queryuid = ntohl (result->queryuid);
dhtlog->insert_route (NULL, queryuid,
@@ -528,9 +516,6 @@
}
pos = pos->next;
}
- if (q->result_count >= MAX_RESULTS)
- q->expire = 0;
- break;
}
GNUNET_mutex_unlock (lock);
#if DEBUG_ROUTING
@@ -540,6 +525,7 @@
"Routed result to %u out of %u pending requests\n",
routed, tracked);
#endif
+ GNUNET_bloomfilter_free (bloom);
if (cls == NULL)
GNUNET_free (result);
return GNUNET_OK;
@@ -553,71 +539,47 @@
GNUNET_ResultProcessor handler, void *cls,
const DV_DHT_MESSAGE * get)
{
+
+/*
+ * TODO: Eventually change to using a hash
+ * map for more efficient lookup of return routing information!
+ */
DV_DHTQueryRecord *q;
- unsigned int i;
- unsigned int rt_pos;
unsigned int diameter;
- GNUNET_CronTime expire;
- GNUNET_CronTime now;
unsigned int hops;
struct DV_DHT_Source_Route *pos;
hops = ntohl (get->hop_count);
diameter = GNUNET_DV_DHT_estimate_network_diameter ();
/*if (hops > 2 * diameter) */
- if (hops > 8 * diameter)
+ if (hops > 2 * diameter)
{
fprintf (stderr,
"hops (%d) > 2 * diameter (%d) so failing (diameter %d)\n",
hops, 2 * diameter, diameter);
return GNUNET_SYSERR;
}
- now = GNUNET_get_time ();
- expire = now + DV_DHT_DELAY * diameter * 20;
+
GNUNET_mutex_lock (lock);
- rt_pos = rt_size;
- for (i = 0; i < rt_size; i++)
+
+ q = &records[next_record];
+#if DEBUG_ROUTING
+ GNUNET_GE_LOG (coreAPI->ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "Tracking request in slot %u\n",
+ next_record);
+#endif
+ if (q->sources != NULL)
{
- q = &records[i];
- if ((q->expire > now) &&
- ((0 != memcmp (&q->get.key,
- &get->key,
- sizeof (GNUNET_HashCode))) ||
- (q->get.type == get->type)))
- continue; /* used and not an identical request */
- if (q->expire < now)
+ while (q->sources != NULL)
{
- rt_pos = i;
- while (q->sources != NULL)
- {
- pos = q->sources;
- q->sources = pos->next;
- GNUNET_free (pos);
- }
- GNUNET_array_grow (q->results, q->result_count, 0);
- q->expire = 0;
+ pos = q->sources;
+ q->sources = pos->next;
+ GNUNET_free (pos);
}
- if ((0 == memcmp (&q->get.key,
- &get->key,
- sizeof (GNUNET_HashCode)) &&
- (q->get.type == get->type)))
- {
- GNUNET_array_grow (q->results, q->result_count, 0);
- rt_pos = i;
- break;
- }
+ GNUNET_array_grow (q->results, q->result_count, 0);
}
- if (rt_pos == rt_size)
- {
- /* do not route, no slot available */
- fprintf (stderr, "rt_pos (%d) == rt_size (%d) so failing\n", rt_pos,
- rt_size);
- GNUNET_mutex_unlock (lock);
- return GNUNET_SYSERR;
- }
- q = &records[rt_pos];
- if (q->expire < expire)
- q->expire = expire;
+
q->get = *get;
pos = GNUNET_malloc (sizeof (DV_DHT_Source_Route));
pos->next = q->sources;
@@ -626,14 +588,20 @@
pos->source = *sender;
else
pos->source = *coreAPI->my_identity;
- pos->expire = expire;
pos->receiver = handler;
pos->receiver_closure = cls;
-#if DEBUG_ROUTING
- GNUNET_GE_LOG (coreAPI->ectx,
- GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK, "Tracking request in slot %u\n", rt_pos);
-#endif
+
+ /* We loop through the records, allows peer
+ * to control how many concurrent responses
+ * are known about.
+ */
+ if (next_record == rt_size - 1)
+ {
+ next_record = 0;
+ }
+ else
+ next_record++;
+
GNUNET_mutex_unlock (lock);
if (stats != NULL)
stats->change (stat_requests_routed, 1);
@@ -652,6 +620,7 @@
DV_DHT_MESSAGE aget;
unsigned int target_value;
unsigned int hop_count;
+ struct GNUNET_BloomFilter *bloom;
int total;
int i;
int j;
@@ -720,7 +689,7 @@
&get->key);
}
- if ((debug_routes_extended) && (dhtlog != NULL))
+ if ((debug_routes_extended) && (dhtlog != NULL))
{
queryuid = ntohl (get->queryuid);
dhtlog->insert_route (NULL, ntohl (get->queryuid), DHTLOG_GET,
@@ -741,6 +710,14 @@
return GNUNET_OK;
}
aget = *get;
+
+ bloom =
+ GNUNET_bloomfilter_init (NULL, &aget.bloomfilter[0], DV_DHT_BLOOM_SIZE,
+ DV_DHT_BLOOM_K);
+ GNUNET_bloomfilter_add (bloom, &coreAPI->my_identity->hashPubKey);
+ GNUNET_bloomfilter_get_raw_data (bloom, &aget.bloomfilter[0],
+ DV_DHT_BLOOM_SIZE);
+
hop_count = ntohl (get->hop_count);
target_value = get_forward_count (hop_count, GET_TRIES);
aget.hop_count = htonl (1 + hop_count);
@@ -755,7 +732,7 @@
for (i = 0; i < target_value; i++)
{
if (GNUNET_OK !=
- GNUNET_DV_DHT_select_peer (&next[j], &get->key, &next[0], j))
+ GNUNET_DV_DHT_select_peer (&next[j], &get->key, &next[0], j, bloom))
{
#if DEBUG_ROUTING
GNUNET_GE_LOG (coreAPI->ectx,
@@ -784,6 +761,8 @@
dvapi->dv_send (&next[j], &aget.header, DV_DHT_PRIORITY, DV_DHT_DELAY);
j++;
}
+
+ GNUNET_bloomfilter_free (bloom);
return GNUNET_OK;
}
@@ -798,6 +777,7 @@
const DV_DHT_MESSAGE *put;
DV_DHT_MESSAGE *aput;
GNUNET_CronTime now;
+ struct GNUNET_BloomFilter *bloom;
unsigned int hop_count;
unsigned int target_value;
int store;
@@ -836,10 +816,18 @@
j = 0;
if (sender != NULL)
next[j++] = *sender; /* do not send back to sender! */
+
+ bloom =
+ GNUNET_bloomfilter_init (NULL, &aput->bloomfilter[0], DV_DHT_BLOOM_SIZE,
+ DV_DHT_BLOOM_K);
+ GNUNET_bloomfilter_add (bloom, &coreAPI->my_identity->hashPubKey);
+ GNUNET_bloomfilter_get_raw_data (bloom, &aput->bloomfilter[0],
+ DV_DHT_BLOOM_SIZE);
+
for (i = 0; i < target_value; i++)
{
if (GNUNET_OK !=
- GNUNET_DV_DHT_select_peer (&next[j], &put->key, &next[0], j))
+ GNUNET_DV_DHT_select_peer (&next[j], &put->key, &next[0], j, bloom))
{
#if DEBUG_ROUTING
GNUNET_GE_LOG (coreAPI->ectx,
@@ -848,13 +836,8 @@
"Failed to select peer for PUT fowarding in round
%d/%d\n",
i + 1, PUT_TRIES);
#endif
- store = 1;
continue;
}
- if (1 == GNUNET_hash_xorcmp (&next[j].hashPubKey,
- &coreAPI->my_identity->hashPubKey,
- &put->key))
- store = 1; /* we're closer than the selected target */
#if DEBUG_ROUTING
GNUNET_hash_to_enc (&next[j].hashPubKey, &enc);
GNUNET_GE_LOG (coreAPI->ectx,
@@ -873,20 +856,22 @@
dvapi->dv_send (&next[j], &aput->header, DV_DHT_PRIORITY, DV_DHT_DELAY);
j++;
}
+
+ GNUNET_bloomfilter_free (bloom);
GNUNET_free (aput);
store = 0;
if (GNUNET_YES == GNUNET_DV_DHT_am_closest_peer (&put->key))
store = 1;
- if ((store == 0) && (target_value == 0) && (debug_routes_extended) &&
(dhtlog != NULL))
- {
- queryuid = ntohl (put->queryuid);
- dhtlog->insert_route (NULL, queryuid, DHTLOG_PUT,
- hop_count, GNUNET_NO,
- coreAPI->my_identity, &put->key, sender,
- NULL);
- }
+ if ((store == 0) && (target_value == 0) && (debug_routes_extended)
+ && (dhtlog != NULL))
+ {
+ queryuid = ntohl (put->queryuid);
+ dhtlog->insert_route (NULL, queryuid, DHTLOG_PUT,
+ hop_count, GNUNET_NO,
+ coreAPI->my_identity, &put->key, sender, NULL);
+ }
if (store != 0)
{
@@ -908,7 +893,7 @@
coreAPI->my_identity, &put->key);
}
- if ((debug_routes_extended) && (dhtlog != NULL))
+ if ((debug_routes_extended) && (dhtlog != NULL))
{
queryuid = ntohl (put->queryuid);
dhtlog->insert_route (NULL, queryuid, DHTLOG_PUT,
@@ -992,6 +977,7 @@
get.hop_count = htonl (0);
get.network_size = htonl (GNUNET_DV_DHT_estimate_network_diameter ());
get.key = *key;
+ memset (&get.bloomfilter, 0, DV_DHT_BLOOM_SIZE);
if ((debug_routes) && (dhtlog != NULL))
{
dhtlog->insert_query (&queryuid, 0, DHTLOG_GET, 0, GNUNET_NO,
@@ -1054,7 +1040,6 @@
if (records[i].sources == NULL)
{
GNUNET_array_grow (records[i].results, records[i].result_count, 0);
- records[i].expire = 0;
}
if (done == GNUNET_YES)
break;
@@ -1089,7 +1074,13 @@
put->key = *key;
put->type = htonl (type);
put->hop_count = htonl (0);
+ memset (&put->bloomfilter, 0, DV_DHT_BLOOM_SIZE);
put->network_size = htonl (GNUNET_DV_DHT_estimate_network_diameter ());
+#if DEBUG_ROUTING
+ GNUNET_GE_LOG (coreAPI->ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "Insert called\n");
+#endif
if ((debug_routes) && (dhtlog != NULL))
{
dhtlog->insert_dhtkey (&keyuid, key);
@@ -1097,9 +1088,11 @@
ntohl (put->hop_count), GNUNET_NO,
coreAPI->my_identity, key);
#if DEBUG_ROUTING
- fprintf (stderr,
- "Inserted dhtkey, uid: %llu, inserted query, uid: %llu\n",
- keyuid, queryuid);
+ GNUNET_GE_LOG (coreAPI->ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK,
+ "Inserted dhtkey, uid: %llu, inserted query, uid: %llu\n",
+ keyuid, queryuid);
#endif
}
#if DEBUG_ROUTING
Modified: GNUnet/src/applications/dv_dht/module/table.c
===================================================================
--- GNUnet/src/applications/dv_dht/module/table.c 2009-07-15 04:18:11 UTC
(rev 8698)
+++ GNUnet/src/applications/dv_dht/module/table.c 2009-07-15 04:20:49 UTC
(rev 8699)
@@ -67,7 +67,7 @@
* How often should the cron job for maintaining the DV_DHT
* run?
*/
-#define MAINTAIN_FREQUENCY 1500 * GNUNET_CRON_MILLISECONDS
+#define MAINTAIN_FREQUENCY 5000 * GNUNET_CRON_MILLISECONDS
/**
* What is the chance (1 in XXX) that we send DISCOVERY messages
@@ -417,7 +417,8 @@
GNUNET_DV_DHT_select_peer (GNUNET_PeerIdentity * set,
const GNUNET_HashCode * target,
const GNUNET_PeerIdentity * blocked,
- unsigned int blocked_size)
+ unsigned int blocked_size,
+ struct GNUNET_BloomFilter *bloom)
{
unsigned long long total_distance;
unsigned long long selected;
@@ -429,6 +430,7 @@
const PeerBucket *bucket;
const PeerInfo *pi;
+ //return find_closest_peer(set, target);
GNUNET_mutex_lock (lock);
if (stats != NULL)
stats->change (stat_dht_route_looks, 1);
@@ -440,6 +442,11 @@
{
pi = bucket->peers[ec];
match = GNUNET_NO;
+ match = GNUNET_bloomfilter_test (bloom, &pi->id.hashPubKey);
+ if (match == GNUNET_YES)
+ {
+ continue;
+ }
for (i = 0; i < blocked_size; i++)
{
if (0 ==
@@ -467,6 +474,15 @@
{
pi = bucket->peers[ec];
match = GNUNET_NO;
+ match = GNUNET_bloomfilter_test (bloom, &pi->id.hashPubKey);
+ if (match == GNUNET_YES)
+ {
+ GNUNET_GE_LOG (coreAPI->ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_ADMIN |
+ GNUNET_GE_USER | GNUNET_GE_BULK,
+ "Avoiding circular route, yay!\n");
+ continue;
+ }
for (i = 0; i < blocked_size; i++)
{
if (0 ==
@@ -610,7 +626,7 @@
inverse_distance (target, &closest.hashPubKey),
inverse_distance (target,
&coreAPI->my_identity->hashPubKey));
- if (inverse_distance (target, &coreAPI->my_identity->hashPubKey) >=
+ if (inverse_distance (target, &coreAPI->my_identity->hashPubKey) >
inverse_distance (target, &closest.hashPubKey))
{
return GNUNET_YES;
@@ -618,6 +634,7 @@
return GNUNET_NO;
}
+#ifdef DISCOVERY
/**
* Send a discovery message to the other peer.
*
@@ -682,56 +699,10 @@
print_exit ("broadcast_dht_discovery");
#endif
}
-
-static void
-broadcast_dht_discovery_prob (const GNUNET_PeerIdentity * other, void *cls)
-{
-#if DEBUG_TABLE
- print_entry ("broadcast_dht_discovery_prob");
#endif
- if (GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, MAINTAIN_CHANCE) != 0)
- {
-#if DEBUG_TABLE
- print_exit ("broadcast_dht_discovery_prob");
-#endif
- return;
- }
- //fprintf(stderr, "sending discovery message\n");
- broadcast_dht_discovery (other, cls);
-#if DEBUG_TABLE
- print_exit ("broadcast_dht_discovery_prob");
-#endif
-}
-
/**
- * Cron job to maintain DV_DHT routing table.
- */
-static void
-maintain_dht_job (void *unused)
-{
-#if DEBUG_TABLE
- print_entry ("maintain_dht_job");
-#endif
- P2P_DV_DHT_Discovery disc;
- if (total_peers == 0)
- {
- disc.header.size = htons (sizeof (P2P_DV_DHT_Discovery));
- disc.header.type = htons (GNUNET_P2P_PROTO_DHT_DISCOVERY);
- disc.space_available = -1; /* FIXME */
- dvapi->dv_connections_iterate (&broadcast_dht_discovery_prob, &disc);
- }
- else
- {
- dvapi->dv_connections_iterate (&broadcast_dht_discovery_prob, NULL);
- }
-#if DEBUG_TABLE
- print_exit ("maintain_dht_job");
-#endif
-}
-
-/**
* We have received a pong from a peer and know it is still
* there.
*/
@@ -825,8 +796,6 @@
{
PeerInfo *pi;
PeerBucket *bucket;
- P2P_DV_DHT_ASK_HELLO ask;
- GNUNET_MessageHello *hello;
bucket = findBucketFor (peer);
if (bucket == NULL)
@@ -840,6 +809,11 @@
return; /* already have this peer in buckets */
}
/* do we know how to contact this peer? */
+ /* This may not work with the dv implementation... */
+#if 0
+ P2P_DV_DHT_ASK_HELLO ask;
+ GNUNET_MessageHello *hello;
+
hello =
identity->identity2Hello (peer, GNUNET_TRANSPORT_PROTOCOL_NUMBER_ANY,
GNUNET_NO);
@@ -850,21 +824,25 @@
ask.header.type = htons (sizeof (GNUNET_P2P_PROTO_DHT_ASK_HELLO));
ask.reserved = 0;
ask.peer = *peer;
- dvapi->dv_send (sender, &ask.header, 0, /* FIXME: priority */
+ dvapi->dv_send (peer, &ask.header, 0, /* FIXME: priority */
5 * GNUNET_CRON_SECONDS);
return;
}
GNUNET_free (hello);
+#endif
+
/* check if connected, if not, send discovery */
/* coreAPI->p2p_connection_status_check (peer, NULL, NULL); */
if (GNUNET_OK != dvapi->p2p_connection_status_check (peer, NULL, NULL))
{
+#if DISCOVERY
/* not yet connected; connect sending DISCOVERY */
- broadcast_dht_discovery (peer, NULL);
+ /*broadcast_dht_discovery (peer, NULL); */
+#endif
return;
}
- /* we are connected (in core), add to bucket */
+ /* we are connected (in dv), add to bucket */
pi = GNUNET_malloc (sizeof (PeerInfo));
memset (pi, 0, sizeof (PeerInfo));
pi->id = *peer;
@@ -877,7 +855,59 @@
stats->change (stat_dht_total_peers, 1);
}
+static void
+broadcast_dht_discovery_prob (const GNUNET_PeerIdentity * other, void *cls)
+{
+#if DEBUG_TABLE
+ print_entry ("broadcast_dht_discovery_prob");
+#endif
+
+ considerPeer (other, other);
+ /*
+ if (GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, MAINTAIN_CHANCE) >
(MAINTAIN_CHANCE / 2))
+ {
+ #if DEBUG_TABLE
+ print_exit ("broadcast_dht_discovery_prob");
+ #endif
+ return;
+ } */
+ /*fprintf(stderr, "sending discovery message\n");
+ broadcast_dht_discovery (other, cls); */
+
+#if DEBUG_TABLE
+ print_exit ("broadcast_dht_discovery_prob");
+#endif
+}
+
+
/**
+ * Cron job to maintain DV_DHT routing table.
+ */
+static void
+maintain_dht_job (void *unused)
+{
+#if DEBUG_TABLE
+ print_entry ("maintain_dht_job");
+#endif
+ P2P_DV_DHT_Discovery disc;
+ if (total_peers == 0)
+ {
+ disc.header.size = htons (sizeof (P2P_DV_DHT_Discovery));
+ disc.header.type = htons (GNUNET_P2P_PROTO_DHT_DISCOVERY);
+ disc.space_available = -1; /* FIXME */
+ dvapi->dv_connections_iterate (&broadcast_dht_discovery_prob, &disc);
+ }
+ else
+ {
+ dvapi->dv_connections_iterate (&broadcast_dht_discovery_prob, NULL);
+ }
+#if DEBUG_TABLE
+ print_exit ("maintain_dht_job");
+#endif
+}
+
+#ifdef DISCOVERY
+/**
* Handle discovery message.
*/
static int
@@ -919,6 +949,7 @@
GNUNET_mutex_unlock (lock);
return GNUNET_OK;
}
+#endif
/**
* Handle ask hello message.
@@ -1097,8 +1128,10 @@
GNUNET_GE_ASSERT (coreAPI->ectx, identity != NULL);
pingpong = coreAPI->service_request ("pingpong");
GNUNET_GE_ASSERT (coreAPI->ectx, pingpong != NULL);
+#if DISCOVERY
capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_DHT_DISCOVERY,
&handleDiscovery);
+#endif
capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_DHT_ASK_HELLO,
&handleAskHello);
capi->peer_disconnect_notification_register (&peer_disconnect_handler,
@@ -1124,8 +1157,10 @@
coreAPI->peer_disconnect_notification_unregister (&peer_disconnect_handler,
NULL);
+#if DISCOVERY
coreAPI->p2p_ciphertext_handler_unregister (GNUNET_P2P_PROTO_DHT_DISCOVERY,
&handleDiscovery);
+#endif
coreAPI->p2p_ciphertext_handler_unregister (GNUNET_P2P_PROTO_DHT_ASK_HELLO,
&handleAskHello);
GNUNET_cron_del_job (coreAPI->cron, &maintain_dht_job, MAINTAIN_FREQUENCY,
Modified: GNUnet/src/applications/dv_dht/module/table.h
===================================================================
--- GNUnet/src/applications/dv_dht/module/table.h 2009-07-15 04:18:11 UTC
(rev 8698)
+++ GNUnet/src/applications/dv_dht/module/table.h 2009-07-15 04:20:49 UTC
(rev 8699)
@@ -46,7 +46,8 @@
int GNUNET_DV_DHT_select_peer (GNUNET_PeerIdentity * set,
const GNUNET_HashCode * target,
const GNUNET_PeerIdentity * blocked,
- unsigned int blocked_size);
+ unsigned int blocked_size,
+ struct GNUNET_BloomFilter *bloom);
/**
* Compute a (rough) estimate of the networks diameter.
Modified: GNUnet/src/applications/dv_dht/tools/dv_dht_driver_new.c
===================================================================
--- GNUnet/src/applications/dv_dht/tools/dv_dht_driver_new.c 2009-07-15
04:18:11 UTC (rev 8698)
+++ GNUnet/src/applications/dv_dht/tools/dv_dht_driver_new.c 2009-07-15
04:20:49 UTC (rev 8699)
@@ -60,6 +60,7 @@
static unsigned long long concurrent_requests;
static unsigned long long requests_per_second;
static unsigned long long requests_wait_time;
+static unsigned long long randomized_gets;
static char *dotOutFileName = NULL;
static char *trialmessage = NULL;
@@ -183,17 +184,17 @@
int random_peer;
int random_key;
int totalConnections;
- unsigned int failed_inserts;
+ unsigned long long failed_inserts;
unsigned long long trialuid;
key_count = 0;
+ failed_inserts = 0;
-
printf ("Starting %u peers\n", (unsigned int) num_peers);
if (trialmessage != NULL)
{
printf ("Trial message is %s, strlen is %d\n", trialmessage,
- (int)strlen (trialmessage));
+ (int) strlen (trialmessage));
}
if (sqlapi == NULL)
@@ -259,14 +260,17 @@
{
if (GNUNET_shutdown_test () == GNUNET_YES)
break;
- fprintf (stderr, "Peer %d (%s:%d, pid %s):\n", i,
peer_array[i]->hostname, peer_array[i]->port, peer_array[i]->pid);
+ fprintf (stderr, "Peer %d (%s:%d, pid %s):\n", i,
+ peer_array[i]->hostname, peer_array[i]->port,
+ peer_array[i]->pid);
sock =
GNUNET_client_connection_create (NULL, peer_array[i]->config);
- if (GNUNET_SYSERR == GNUNET_STATS_get_statistics (NULL, sock,
&getPeers, NULL))
- {
- fprintf(stderr, "Problem connecting to peer %d!\n", i);
- }
+ if (GNUNET_SYSERR ==
+ GNUNET_STATS_get_statistics (NULL, sock, &getPeers, NULL))
+ {
+ fprintf (stderr, "Problem connecting to peer %d!\n", i);
+ }
GNUNET_thread_sleep (50 * GNUNET_CRON_MILLISECONDS);
GNUNET_client_connection_destroy (sock);
}
@@ -281,21 +285,24 @@
for (i = 0; i < put_items; i++)
{
random_peer = GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, num_peers);
- random_key = GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, put_items);
- fprintf (stdout, "Inserting key %d at peer %d\n", random_key,
random_peer);
- if (GNUNET_OK != GNUNET_DV_DHT_put (peer_array[random_peer]->config,
- ectx,
- &keys[random_key].key,
-
GNUNET_ECRS_BLOCKTYPE_DHT_STRING2STRING,
- sizeof (keys[random_key].data),
- keys[random_key].data))
+ /*random_key = GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK,
put_items); */
+ random_key = i;
+ fprintf (stdout, "Inserting key %d at peer %d\n", random_key,
+ random_peer);
+ if (GNUNET_OK !=
+ GNUNET_DV_DHT_put (peer_array[random_peer]->config, ectx,
+ &keys[random_key].key,
+ GNUNET_ECRS_BLOCKTYPE_DHT_STRING2STRING,
+ sizeof (keys[random_key].data),
+ keys[random_key].data))
{
fprintf (stdout, "Insert FAILED at peer %d\n", random_peer);
failed_inserts++;
}
GNUNET_thread_sleep (50 * GNUNET_CRON_MILLISECONDS);
}
- fprintf (stdout, "Inserted %llu items\n", put_items - failed_inserts);
+ fprintf (stdout, "Inserted %d items\n",
+ (int) put_items - (int) failed_inserts);
for (i = 0; i < get_requests / concurrent_requests; i++)
{
@@ -304,9 +311,15 @@
{
random_peers[j] =
GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, num_peers);
- random_key =
- GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, put_items);
- /*random_key = (i + 1) * (j + 1) - 1; */
+ if (randomized_gets == GNUNET_YES)
+ {
+ random_key =
+ GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, put_items);
+ }
+ else
+ {
+ random_key = (i + 1) * (j + 1) - 1;
+ }
dctx[j] =
GNUNET_DV_DHT_context_create (peer_array[random_peers[j]]->config,
ectx, &result_callback,
Modified: GNUnet/src/applications/dv_dht/tools/dv_test.conf
===================================================================
--- GNUnet/src/applications/dv_dht/tools/dv_test.conf 2009-07-15 04:18:11 UTC
(rev 8698)
+++ GNUnet/src/applications/dv_dht/tools/dv_test.conf 2009-07-15 04:20:49 UTC
(rev 8699)
@@ -1,13 +1,13 @@
[MULTIPLE_SERVER_TESTING]
CONTROL_HOST=130.253.106.13;
-HOSTNAMES=130.253.191.150 130.253.191.152 130.253.191.155
+HOSTNAMES=r1m1cl r1m2cl r1m3cl r1m5cl r2m1cl r2m2cl r2m3cl r2m4cl r2m5cl
r3m1cl r3m2cl r3m3cl r3m4cl r3m5cl r4m1cl r4m2cl r4m3cl r4m4cl r4m5cl
SSH_USERNAME=natevans
STARTING_PORT=31387
PORT_INCREMENT=2
BASE_CONFIG=gnunetd_dv.conf
TOPOLOGY=1
SETTLE_TIME=10
-NUM_PEERS=30
+NUM_PEERS=50
PUT_ITEMS=400
GET_REQUESTS=10000
#PERCENTAGE=.45
Modified: GNUnet/src/applications/dv_dht/tools/gnunetd_dv.conf
===================================================================
--- GNUnet/src/applications/dv_dht/tools/gnunetd_dv.conf 2009-07-15
04:18:11 UTC (rev 8698)
+++ GNUnet/src/applications/dv_dht/tools/gnunetd_dv.conf 2009-07-15
04:20:49 UTC (rev 8699)
@@ -56,7 +56,7 @@
MAXNETDOWNBPSTOTAL = 500000
MAXNETUPBPSTOTAL = 500000
HARDUPLIMIT = 0
-MAXCPULOAD = 100
+MAXCPULOAD = 10
MAXIOLOAD = 50
HARDCPULIMIT = 0
BASICLIMITING = YES
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r8699 - in GNUnet/src/applications/dv_dht: module tools,
gnunet <=