gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r13299 - in gnunet: . src/fs


From: gnunet
Subject: [GNUnet-SVN] r13299 - in gnunet: . src/fs
Date: Tue, 19 Oct 2010 20:50:03 +0200

Author: grothoff
Date: 2010-10-19 20:50:03 +0200 (Tue, 19 Oct 2010)
New Revision: 13299

Modified:
   gnunet/TODO
   gnunet/src/fs/fs.h
   gnunet/src/fs/fs_test_lib_data.conf
   gnunet/src/fs/gnunet-service-fs.c
   gnunet/src/fs/perf_gnunet_service_fs_p2p.c
Log:
bugfixes, more todo

Modified: gnunet/TODO
===================================================================
--- gnunet/TODO 2010-10-19 11:52:42 UTC (rev 13298)
+++ gnunet/TODO 2010-10-19 18:50:03 UTC (rev 13299)
@@ -33,6 +33,11 @@
   - also do UPnP-based (external) IP detection
     (Note: build library always, build UPnP service when dependencies like 
libxml2 are available)
 * FS: [CG]
+  - service:
+    + 2-peer perf test does NOT terminate for large (500 MB) files because
+      somehow blocks are not found (suspect: load-based no DB lookup + forward 
first, no clean up of routing table?)
+    + 2-peer perf test goes WAY over bandwidth limit (i.e. 300 kbps/set, 2 
MB/s transfer rate); clearly core does
+      not properly enforce the limit
   - library:
     + reconstruct IBLOCKS from DBLOCKS if possible (during download; see FIXME 
in fs_download)
     + add support for pushing "already seen" search results to FS service for 
bloomfilter

Modified: gnunet/src/fs/fs.h
===================================================================
--- gnunet/src/fs/fs.h  2010-10-19 11:52:42 UTC (rev 13298)
+++ gnunet/src/fs/fs.h  2010-10-19 18:50:03 UTC (rev 13299)
@@ -34,10 +34,19 @@
 #include "gnunet_block_lib.h"
 #include "block_fs.h"
 
+
 /**
+ * Maximum number of outgoing messages we queue per peer.
+ */
+#define MAX_QUEUE_PER_PEER 16
+
+/**
  * Maximum size of the datastore queue for P2P operations.
+ * Needs to be large enough to queue MAX_QUEUE_PER_PEER
+ * operations for roughly the number of active (connected)
+ * peers.
  */
-#define MAX_DATASTORE_QUEUE 16
+#define MAX_DATASTORE_QUEUE (16 * MAX_QUEUE_PER_PEER)
 
 /**
  * Maximum number of blocks we keep in memory for migration.

Modified: gnunet/src/fs/fs_test_lib_data.conf
===================================================================
--- gnunet/src/fs/fs_test_lib_data.conf 2010-10-19 11:52:42 UTC (rev 13298)
+++ gnunet/src/fs/fs_test_lib_data.conf 2010-10-19 18:50:03 UTC (rev 13299)
@@ -22,6 +22,7 @@
 [datastore]
 #DEBUG = YES
 #PREFIX = valgrind --tool=memcheck --leak-check=yes
+QUOTA = 2000000000
 
 [statistics]
 PORT = 43467
@@ -40,8 +41,8 @@
 HOSTNAME = localhost
 #TOTAL_QUOTA_IN = 9321
 #TOTAL_QUOTA_OUT = 9321
-TOTAL_QUOTA_IN = 3932160
-TOTAL_QUOTA_OUT = 3932160
+TOTAL_QUOTA_IN = 393216
+TOTAL_QUOTA_OUT = 393216
 #DEBUG = YES
 #PREFIX = valgrind --tool=memcheck --leak-check=yes
 #BINARY = /home/grothoff/bin/gnunet-service-core
@@ -51,7 +52,7 @@
 HOSTNAME = localhost
 #OPTIONS = -L DEBUG
 ACTIVEMIGRATION = NO
-#DEBUG = YES
+DEBUG = YES
 #PREFIX = valgrind --tool=memcheck --leak-check=yes 
 #BINARY = /home/grothoff/gn9/bin/gnunet-service-fs
 #PREFIX = xterm -e gdb -x cmd --args 

Modified: gnunet/src/fs/gnunet-service-fs.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs.c   2010-10-19 11:52:42 UTC (rev 13298)
+++ gnunet/src/fs/gnunet-service-fs.c   2010-10-19 18:50:03 UTC (rev 13299)
@@ -51,11 +51,6 @@
 #define SUPPORT_DELAYS GNUNET_NO
 
 /**
- * Maximum number of outgoing messages we queue per peer.
- */
-#define MAX_QUEUE_PER_PEER 16
-
-/**
  * Size for the hash map for DHT requests from the FS
  * service.  Should be about the number of concurrent
  * DHT requests we plan to make.
@@ -191,6 +186,10 @@
    * getting a reply (only calculated over the requests for
    * which we actually got a reply).   Calculated
    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
+   *
+   * FIXME: actually, this is currently the delay between us originally
+   * receiving (not forwarding!) a request and us receiving a reply from
+   * this peer (regardless of when we transmitted this request to this peer!)
    */ 
   struct GNUNET_TIME_Relative avg_delay;
 
@@ -207,6 +206,15 @@
   struct GNUNET_TIME_Absolute last_migration_block;
 
   /**
+   * Transmission times for the last MAX_QUEUE_PER_PEER
+   * requests for this peer.  Used as a ring buffer, current
+   * offset is stored in 'last_request_times_off'.  If the
+   * oldest entry is more recent than the 'avg_delay', we should
+   * not send any more requests right now.
+   */
+  struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
+
+  /**
    * Handle for an active request for transmission to this
    * peer, or NULL.
    */
@@ -285,6 +293,11 @@
    */
   unsigned int last_client_replies_woff;
 
+  /**
+   * Current offset into 'last_request_times' ring buffer.
+   */
+  unsigned int last_request_times_off;
+
 };
 
 
@@ -402,6 +415,34 @@
 
 
 /**
+ * Information about a peer that we have forwarded this
+ * request to already.  
+ */
+struct UsedTargetEntry
+{
+  /**
+   * What was the last time we have transmitted this request to this
+   * peer?
+   */
+  struct GNUNET_TIME_Absolute last_request_time;
+
+  /**
+   * How often have we transmitted this request to this peer?
+   */
+  unsigned int num_requests;
+
+  /**
+   * PID of the target peer.
+   */
+  GNUNET_PEER_Id pid;
+
+};
+
+
+
+
+
+/**
  * Doubly-linked list of messages we are performing
  * due to a pending request.
  */
@@ -531,7 +572,7 @@
    * (Interned) Peer identifiers of peers that have already
    * received our query for this content.
    */
-  GNUNET_PEER_Id *used_pids;
+  struct UsedTargetEntry *used_targets;
   
   /**
    * Our entry in the queue (non-NULL while we wait for our
@@ -550,14 +591,14 @@
   uint32_t anonymity_level;
 
   /**
-   * How many entries in "used_pids" are actually valid?
+   * How many entries in "used_targets" are actually valid?
    */
-  unsigned int used_pids_off;
+  unsigned int used_targets_off;
 
   /**
-   * How long is the "used_pids" array?
+   * How long is the "used_targets" array?
    */
-  unsigned int used_pids_size;
+  unsigned int used_targets_size;
 
   /**
    * Number of results found for this request.
@@ -1384,6 +1425,7 @@
 destroy_pending_request (struct PendingRequest *pr)
 {
   struct GNUNET_PeerIdentity pid;
+  unsigned int i;
 
   if (pr->hnode != NULL)
     {
@@ -1464,13 +1506,14 @@
   while (NULL != pr->pending_head)    
     destroy_pending_message_list_entry (pr->pending_head);
   GNUNET_PEER_change_rc (pr->target_pid, -1);
-  if (pr->used_pids != NULL)
+  if (pr->used_targets != NULL)
     {
-      GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
-      GNUNET_free (pr->used_pids);
-      pr->used_pids_off = 0;
-      pr->used_pids_size = 0;
-      pr->used_pids = NULL;
+      for (i=0;i<pr->used_targets_off;i++)
+       GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
+      GNUNET_free (pr->used_targets);
+      pr->used_targets_off = 0;
+      pr->used_targets_size = 0;
+      pr->used_targets = NULL;
     }
   GNUNET_free (pr);
 }
@@ -2142,7 +2185,13 @@
                                     pm);
   cp->pending_requests++;
   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
-    destroy_pending_message (cp->pending_messages_tail, 0);  
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# P2P searches discarded (queue 
length bound)"),
+                               1,
+                               GNUNET_NO);
+      destroy_pending_message (cp->pending_messages_tail, 0);  
+    }
   GNUNET_PEER_resolve (cp->pid, &pid);
   if (NULL != cp->cth)
     {
@@ -2298,6 +2347,7 @@
                             GNUNET_PEER_Id tpid)
 {
   struct PendingRequest *pr = cls;
+  unsigned int i;
 
   GNUNET_STATISTICS_update (stats,
                            gettext_noop ("# queries scheduled for forwarding"),
@@ -2316,16 +2366,32 @@
                                                 pr); 
       return;    
     }
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Transmitted query `%s'\n",
+             GNUNET_h2s (&pr->query));
+#endif
   GNUNET_STATISTICS_update (stats,
                            gettext_noop ("# queries forwarded"),
                            1,
                            GNUNET_NO);
-  GNUNET_PEER_change_rc (tpid, 1);
-  if (pr->used_pids_off == pr->used_pids_size)
-    GNUNET_array_grow (pr->used_pids,
-                      pr->used_pids_size,
-                      pr->used_pids_size * 2 + 2);
-  pr->used_pids[pr->used_pids_off++] = tpid;
+  for (i=0;i<pr->used_targets_off;i++)
+    if (pr->used_targets[i].pid == tpid)
+      break; /* found match! */    
+  if (i == pr->used_targets_off)
+    {
+      /* need to create new entry */
+      if (pr->used_targets_off == pr->used_targets_size)
+       GNUNET_array_grow (pr->used_targets,
+                          pr->used_targets_size,
+                          pr->used_targets_size * 2 + 2);
+      GNUNET_PEER_change_rc (tpid, 1);
+      pr->used_targets[pr->used_targets_off].pid = tpid;
+      pr->used_targets[pr->used_targets_off].num_requests = 0;
+      i = pr->used_targets_off++;
+    }
+  pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
+  pr->used_targets[i].num_requests++;
   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
                                             get_processing_delay (),
@@ -2431,6 +2497,7 @@
   unsigned int k;
   int no_route;
   uint32_t bm;
+  unsigned int i;
 
   pr->irc = NULL;
   if (peer == NULL)
@@ -2443,7 +2510,7 @@
                                                 pr);
       return;
     }
-  // (3) transmit, update ttl/priority
+  /* (3) transmit, update ttl/priority */
   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
                                          &peer->hashPubKey);
   if (cp == NULL)
@@ -2489,6 +2556,16 @@
                            gettext_noop ("# queries scheduled for forwarding"),
                            1,
                            GNUNET_NO);
+  for (i=0;i<pr->used_targets_off;i++)
+    if (pr->used_targets[i].pid == cp->pid) 
+      {
+       GNUNET_STATISTICS_update (stats,
+                                 gettext_noop ("# queries retransmitted to 
same target"),
+                                 1,
+                                 GNUNET_NO);
+       break;
+      } 
+
   /* build message and insert message into priority queue */
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2542,6 +2619,7 @@
                                               pr->bf_size);
   pm->cont = &transmit_query_continuation;
   pm->cont_cls = pr;
+  cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] 
= GNUNET_TIME_absolute_get ();
   add_to_pending_messages_for_peer (cp, pm, pr);
 }
 
@@ -2589,6 +2667,7 @@
   struct PeerSelectionContext *psc = cls;
   struct ConnectedPeer *cp = value;
   struct PendingRequest *pr = psc->pr;
+  struct GNUNET_TIME_Relative delay;
   double score;
   unsigned int i;
   unsigned int pc;
@@ -2604,28 +2683,46 @@
     }
 
   /* 2) check if we have already (recently) forwarded to this peer */
+  /* 2a) this particular request */
   pc = 0;
-  for (i=0;i<pr->used_pids_off;i++)
-    if (pr->used_pids[i] == cp->pid) 
+  for (i=0;i<pr->used_targets_off;i++)
+    if (pr->used_targets[i].pid == cp->pid) 
       {
-       pc++;
+       pc = pr->used_targets[i].num_requests;
+       GNUNET_assert (pc > 0);
        if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                          RETRY_PROBABILITY_INV))
+                                          RETRY_PROBABILITY_INV * pc))
          {
 #if DEBUG_FS
            GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                        "NOT re-trying query that was previously transmitted %u 
times\n",
-                       (unsigned int) pr->used_pids_off);
+                       (unsigned int) pc);
 #endif
            return GNUNET_YES; /* skip */
          }
+       break;
       }
 #if DEBUG_FS
   if (0 < pc)
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-               "Re-trying query that was previously transmitted %u times to 
this peer\n",
-               (unsigned int) pc);
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                 "Re-trying query that was previously transmitted %u times to 
this peer\n",
+                 (unsigned int) pc);
+    }
 #endif
+  /* 2b) many other requests to this peer */
+  delay = GNUNET_TIME_absolute_get_duration 
(cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
+  if (delay.value <= cp->avg_delay.value)
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                 "NOT sending query since we send %u others to this peer in 
the last %llums\n",
+                 MAX_QUEUE_PER_PEER,
+                 cp->avg_delay.value);
+#endif
+      return GNUNET_YES; /* skip */      
+    }
+
   /* 3) calculate how much we'd like to forward to this peer,
      starting with a random value that is strong enough
      to at least give any peer a chance sometimes 
@@ -3023,6 +3120,7 @@
 struct GNUNET_TIME_Relative art_delay;
 #endif
   size_t msize;
+  unsigned int i;
 
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -3036,13 +3134,19 @@
                            GNUNET_NO);
   if (prq->sender != NULL)
     {
-      cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
-      prq->sender->avg_delay.value
-       = (prq->sender->avg_delay.value * 
-          (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
-      prq->sender->avg_priority
-       = (prq->sender->avg_priority * 
-          (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
+      for (i=0;i<pr->used_targets_off;i++)
+       if (pr->used_targets[i].pid == prq->sender->pid)
+         break;
+      if (i < pr->used_targets_off)
+       {
+         cur_delay = GNUNET_TIME_absolute_get_duration 
(pr->used_targets[i].last_request_time);      
+         prq->sender->avg_delay.value
+           = (prq->sender->avg_delay.value * 
+              (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
+         prq->sender->avg_priority
+           = (prq->sender->avg_priority * 
+              (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
+       }
       if (pr->cp != NULL)
        {
          GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
@@ -3812,6 +3916,11 @@
       return GNUNET_SYSERR;
     }
   gm = (const struct GetMessage*) message;
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received request for `%s'\n",
+             GNUNET_h2s (&gm->query));
+#endif
   type = ntohl (gm->type);
   bm = ntohl (gm->hash_bitmap);
   bits = 0;
@@ -3949,7 +4058,6 @@
                                                  BLOOMFILTER_K);
       pr->bf_size = bfsize;
     }
-
   cdc.have = NULL;
   cdc.pr = pr;
   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
@@ -4021,19 +4129,35 @@
   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
                                           (pr->priority + 1)); 
   if (GNUNET_YES != pr->forward_only)
-    pr->qe = GNUNET_DATASTORE_get (dsh,
-                                  &gm->query,
-                                  type,                               
-                                  pr->priority + 1,
-                                  MAX_DATASTORE_QUEUE,                         
 
-                                  timeout,
-                                  &process_local_reply,
-                                  pr);
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Handing request for `%s' to datastore\n",
+                 GNUNET_h2s (&gm->query));
+#endif
+      pr->qe = GNUNET_DATASTORE_get (dsh,
+                                    &gm->query,
+                                    type,                             
+                                    pr->priority + 1,
+                                    MAX_DATASTORE_QUEUE,                       
         
+                                    timeout,
+                                    &process_local_reply,
+                                    pr);
+      if (NULL == pr->qe)
+       {
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# requests dropped by 
datastore (queue length limit)"),
+                                   1,
+                                   GNUNET_NO);
+       }
+    }
   else
-    GNUNET_STATISTICS_update (stats,
-                             gettext_noop ("# requests forwarded due to high 
load"),
-                             1,
-                             GNUNET_NO);
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests forwarded due to high 
load"),
+                               1,
+                               GNUNET_NO);
+    }
 
   /* Are multiple results possible (and did we look locally)?  If so, start 
processing remotely now! */
   switch (pr->type)

Modified: gnunet/src/fs/perf_gnunet_service_fs_p2p.c
===================================================================
--- gnunet/src/fs/perf_gnunet_service_fs_p2p.c  2010-10-19 11:52:42 UTC (rev 
13298)
+++ gnunet/src/fs/perf_gnunet_service_fs_p2p.c  2010-10-19 18:50:03 UTC (rev 
13299)
@@ -27,17 +27,17 @@
 #include "fs_test_lib.h"
 #include "gnunet_testing_lib.h"
 
-#define VERBOSE GNUNET_NO
+#define VERBOSE GNUNET_YES
 
 /**
  * File-size we use for testing.
  */
-#define FILESIZE (1024 * 1024 * 10)
+#define FILESIZE (1024 * 1024 * 1)
 
 /**
  * How long until we give up on transmitting the message?
  */
-#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 300)
+#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 3)
 
 #define NUM_DAEMONS 2
 
@@ -89,6 +89,10 @@
     { "fs", "# requests done for free (low load)"},
     { "fs", "# P2P searches received"},
     { "fs", "# replies received for local clients"},
+    { "fs", "# P2P searches discarded (queue length bound)"},
+    { "fs", "# requests dropped due to high load"},
+    { "fs", "# requests dropped by datastore (queue length limit)"},
+    { "fs", "# queries retransmitted to same target"},
     { "fs", "cummulative artificial delay introduced (ms)"},
     { "core", "# bytes decrypted"},
     { "core", "# bytes encrypted"},
@@ -129,6 +133,7 @@
   return GNUNET_OK;
 }
 
+
 /**
  * Function that gathers stats from all daemons.
  */
@@ -136,6 +141,7 @@
 stat_run (void *cls,
          const struct GNUNET_SCHEDULER_TaskContext *tc);
 
+
 /**
  * Function called when GET operation on stats is done.
  */
@@ -149,6 +155,7 @@
   GNUNET_SCHEDULER_add_now (sched, &stat_run, sm);
 }
 
+
 /**
  * Function that gathers stats from all daemons.
  */
@@ -217,7 +224,7 @@
     }
   else
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                  "Timeout during download, shutting down with error\n");
       ok = 1;
       GNUNET_SCHEDULER_add_now (sched, &do_stop, NULL);
@@ -234,7 +241,7 @@
       GNUNET_FS_TEST_daemons_stop (sched,
                                   NUM_DAEMONS,
                                   daemons);
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                  "Timeout during upload attempt, shutting down with error\n");
       ok = 1;
       return;
@@ -261,7 +268,7 @@
       GNUNET_FS_TEST_daemons_stop (sched,
                                   NUM_DAEMONS,
                                   daemons);
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                  "Error trying to connect: %s\n",
                  emsg);
       ok = 1;




reply via email to

[Prev in Thread] Current Thread [Next in Thread]