gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r13067 - in gnunet: . src/fs


From: gnunet
Subject: [GNUnet-SVN] r13067 - in gnunet: . src/fs
Date: Thu, 23 Sep 2010 07:16:24 +0200

Author: grothoff
Date: 2010-09-23 07:16:24 +0200 (Thu, 23 Sep 2010)
New Revision: 13067

Modified:
   gnunet/TODO
   gnunet/src/fs/gnunet-service-fs.c
Log:
train hacking

Modified: gnunet/TODO
===================================================================
--- gnunet/TODO 2010-09-23 05:16:15 UTC (rev 13066)
+++ gnunet/TODO 2010-09-23 05:16:24 UTC (rev 13067)
@@ -1,7 +1,6 @@
 0.9.0pre2:
  FS:
   - measure latencies (core, datastore) => trust economy
-  - refuse content migration message (or solicit?)
   - FS performance benchmarking
   - integrate with DHT (need DHT API to fit block API better first; also, get 
rid of the continuation!)
 * DHT: [Nate]

Modified: gnunet/src/fs/gnunet-service-fs.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs.c   2010-09-23 05:16:15 UTC (rev 13066)
+++ gnunet/src/fs/gnunet-service-fs.c   2010-09-23 05:16:24 UTC (rev 13067)
@@ -28,8 +28,6 @@
  * - consider more precise latency estimation (per-peer & request) -- again 
load API?
  * - implement test_load_too_high, make decision priority-based, implement 
forwarding, etc.
  * - introduce random latency in processing
- * - tell other peers to stop migration if our PUTs fail (or if
- *   we don't support migration per configuration?)
  * - more statistics
  */
 #include "platform.h"
@@ -190,6 +188,12 @@
   struct GNUNET_TIME_Absolute migration_blocked;
 
   /**
+   * Time until when we blocked this peer from migrating
+   * data to us.
+   */
+  struct GNUNET_TIME_Absolute last_migration_block;
+
+  /**
    * Handle for an active request for transmission to this
    * peer, or NULL.
    */
@@ -752,11 +756,16 @@
 static double current_priorities;
 
 /**
- * Datastore load tracking.
+ * Datastore 'GET' load tracking.
  */
-static struct GNUNET_LOAD_Value *datastore_load;
+static struct GNUNET_LOAD_Value *datastore_get_load;
 
+/**
+ * Datastore 'PUT' load tracking.
+ */
+static struct GNUNET_LOAD_Value *datastore_put_load;
 
+
 /**
  * We've just now completed a datastore request.  Update our
  * datastore load calculations.
@@ -769,7 +778,7 @@
   struct GNUNET_TIME_Relative delay;
 
   delay = GNUNET_TIME_absolute_get_duration (start);
-  GNUNET_LOAD_update (datastore_load,
+  GNUNET_LOAD_update (datastore_get_load,
                      delay.value);
 }
 
@@ -1126,12 +1135,20 @@
   TransmissionContinuation cont;
   void *cont_cls;
 
-  GNUNET_assert (pml->pm == pm);
-  GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
-  cont = pm->cont;
-  cont_cls = pm->cont_cls;
-  destroy_pending_message_list_entry (pml);
-  cont (cont_cls, tpid);  
+  if (pml != NULL)
+    {
+      GNUNET_assert (pml->pm == pm);
+      GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
+      cont = pm->cont;
+      cont_cls = pm->cont_cls;
+      destroy_pending_message_list_entry (pml);
+    }
+  else
+    {
+      GNUNET_free (pm);
+    }
+  if (cont != NULL)
+    cont (cont_cls, tpid);  
 }
 
 
@@ -1636,8 +1653,10 @@
   GNUNET_assert (0 == mig_size);
   GNUNET_DHT_disconnect (dht_handle);
   dht_handle = NULL;
-  GNUNET_LOAD_value_free (datastore_load);
-  datastore_load = NULL;
+  GNUNET_LOAD_value_free (datastore_get_load);
+  datastore_get_load = NULL;
+  GNUNET_LOAD_value_free (datastore_put_load);
+  datastore_put_load = NULL;
   GNUNET_BLOCK_context_destroy (block_ctx);
   block_ctx = NULL;
   GNUNET_CONFIGURATION_destroy (block_cfg);
@@ -1793,14 +1812,17 @@
 
   GNUNET_assert (pm->next == NULL);
   GNUNET_assert (pm->pml == NULL);    
-  pml = GNUNET_malloc (sizeof (struct PendingMessageList));
-  pml->req = pr;
-  pml->target = cp;
-  pml->pm = pm;
-  pm->pml = pml;  
-  GNUNET_CONTAINER_DLL_insert (pr->pending_head,
-                              pr->pending_tail,
-                              pml);
+  if (pr != NULL)
+    {
+      pml = GNUNET_malloc (sizeof (struct PendingMessageList));
+      pml->req = pr;
+      pml->target = cp;
+      pml->pm = pm;
+      pm->pml = pml;  
+      GNUNET_CONTAINER_DLL_insert (pr->pending_head,
+                                  pr->pending_tail,
+                                  pml);
+    }
   pos = cp->pending_messages_head;
   while ( (pos != NULL) &&
          (pm->priority < pos->priority) )
@@ -2560,6 +2582,11 @@
    * Did we finish processing the associated request?
    */ 
   int finished;
+
+  /**
+   * Did we find a matching request?
+   */
+  int request_found;
 };
 
 
@@ -2715,6 +2742,7 @@
   prq->priority += pr->remaining_priority;
   pr->remaining_priority = 0;
   pr->results_found++;
+  prq->request_found = GNUNET_YES;
   if (NULL != pr->client_request_list)
     {
       GNUNET_STATISTICS_update (stats,
@@ -2800,7 +2828,19 @@
                            int success,
                            const char *msg)
 {
-  /* FIXME */
+  struct GNUNET_TIME_Absolute *start = cls;
+  struct GNUNET_TIME_Relative delay;
+  
+  delay = GNUNET_TIME_absolute_get_duration (*start);
+  GNUNET_free (start);
+  GNUNET_LOAD_update (datastore_put_load,
+                     delay.value);
+  if (GNUNET_OK == success)
+    return;
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# datastore 'put' failures"),
+                           1,
+                           GNUNET_NO);
 }
 
 
@@ -2830,6 +2870,12 @@
   struct GNUNET_TIME_Absolute expiration;
   GNUNET_HashCode query;
   struct ProcessReplyClosure prq;
+  struct GNUNET_TIME_Absolute *start;
+  struct GNUNET_TIME_Relative block_time;  
+  double putl;
+  struct ConnectedPeer *cp; 
+  struct PendingMessage *pm;
+  struct MigrationStopMessage *msm;
 
   msize = ntohs (message->size);
   if (msize < sizeof (struct PutMessage))
@@ -2876,6 +2922,7 @@
   prq.expiration = expiration;
   prq.priority = 0;
   prq.finished = GNUNET_NO;
+  prq.request_found = GNUNET_NO;
   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
                                              &query,
                                              &process_reply,
@@ -2893,6 +2940,8 @@
                  GNUNET_h2s (&query),
                  prq.priority);
 #endif
+      start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
+      *start = GNUNET_TIME_absolute_get ();
       GNUNET_DATASTORE_put (dsh,
                            0, &query, dsize, &put[1],
                            type, prq.priority, 1 /* anonymity */, 
@@ -2900,8 +2949,37 @@
                            1 + prq.priority, MAX_DATASTORE_QUEUE,
                            GNUNET_CONSTANTS_SERVICE_TIMEOUT,
                            &put_migration_continuation, 
-                           NULL);
+                           start);
     }
+  putl = GNUNET_LOAD_get_load (datastore_put_load);
+  if ( (GNUNET_NO == prq.request_found) &&
+       ( (GNUNET_YES != active_migration) ||
+                (putl > 2.0) ) )
+    {
+      cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                             &other->hashPubKey);
+      if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 
5000)
+       return GNUNET_OK; /* already blocked */
+      /* We're too busy; send MigrationStop message! */
+      if (GNUNET_YES != active_migration) 
+       putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
+      block_time = GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_MILLISECONDS,
+                                                 5000 + 
GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                                                               
   (unsigned int) (60000 * putl * putl)));
+      
+      cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
+      pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
+                         sizeof (struct MigrationStopMessage));
+      pm->msize = sizeof (struct MigrationStopMessage);
+      pm->priority = UINT32_MAX;
+      msm = (struct MigrationStopMessage*) &pm[1];
+      msm->header.size = htons (sizeof (struct MigrationStopMessage));
+      msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
+      msm->duration = GNUNET_TIME_relative_hton (block_time);
+      add_to_pending_messages_for_peer (cp,
+                                       pm,
+                                       NULL);
+    }
   return GNUNET_OK;
 }
 
@@ -2925,7 +3003,18 @@
                           struct GNUNET_TIME_Relative latency,
                           uint32_t distance)
 {
-  // FIXME!
+  struct ConnectedPeer *cp; 
+  const struct MigrationStopMessage *msm;
+
+  msm = (const struct MigrationStopMessage*) message;
+  cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                         &other->hashPubKey);
+  if (cp == NULL)
+    {
+      GNUNET_break (0);
+      return GNUNET_OK;
+    }
+  cp->migration_blocked = GNUNET_TIME_relative_to_absolute 
(GNUNET_TIME_relative_ntoh (msm->duration));
   return GNUNET_OK;
 }
 
@@ -3110,6 +3199,7 @@
   prq.type = type;
   prq.priority = priority;  
   prq.finished = GNUNET_NO;
+  prq.request_found = GNUNET_NO;
   process_reply (&prq, key, pr);
   if ( (old_rf == 0) &&
        (pr->results_found == 1) )
@@ -3798,7 +3888,8 @@
       GNUNET_SCHEDULER_shutdown (sched);
       return;
     }
-  datastore_load = GNUNET_LOAD_value_init ();
+  datastore_get_load = GNUNET_LOAD_value_init ();
+  datastore_put_load = GNUNET_LOAD_value_init ();
   block_cfg = GNUNET_CONFIGURATION_create ();
   GNUNET_CONFIGURATION_set_value_string (block_cfg,
                                         "block",
@@ -3821,8 +3912,10 @@
       block_ctx = NULL;
       GNUNET_CONFIGURATION_destroy (block_cfg);
       block_cfg = NULL;
-      GNUNET_LOAD_value_free (datastore_load);
-      datastore_load = NULL;
+      GNUNET_LOAD_value_free (datastore_get_load);
+      datastore_get_load = NULL;
+      GNUNET_LOAD_value_free (datastore_put_load);
+      datastore_put_load = NULL;
       return;   
     }
 }




reply via email to

[Prev in Thread] Current Thread [Next in Thread]