gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r163 - GNUnet/src/applications/dht/module


From: grothoff
Subject: [GNUnet-SVN] r163 - GNUnet/src/applications/dht/module
Date: Thu, 3 Feb 2005 01:23:45 -0800 (PST)

Author: grothoff
Date: 2005-02-03 01:23:44 -0800 (Thu, 03 Feb 2005)
New Revision: 163

Modified:
   GNUnet/src/applications/dht/module/dht.c
Log:
working on DHT

Modified: GNUnet/src/applications/dht/module/dht.c
===================================================================
--- GNUnet/src/applications/dht/module/dht.c    2005-02-03 08:25:31 UTC (rev 
162)
+++ GNUnet/src/applications/dht/module/dht.c    2005-02-03 09:23:44 UTC (rev 
163)
@@ -378,13 +378,8 @@
    */
   FindKNodesContext * kfnc;
 
-  /**
-   * How many more results are we looking for?
-   */
-  unsigned int maxResults;
+  DHT_OP_Complete callback;
 
-  DHT_GET_Complete callback;
-
   void * closure;
 
   /**
@@ -449,7 +444,7 @@
   /**
    * Callback to call upon completion.
    */
-  DHT_PUT_Complete callback;
+  DHT_OP_Complete callback;
 
   /**
    * Extra argument to callback.
@@ -517,7 +512,7 @@
   /**
    * Callback to call upon completion.
    */ 
-  DHT_REMOVE_Complete callback;
+  DHT_OP_Complete callback;
 
   /**
    * Extra argument to callback.
@@ -552,11 +547,8 @@
 
 
 typedef struct {
+
   /**
-   * Maximum number of results for this get operation.
-   */
-  unsigned int maxResults;
-  /**
    * Number of results currently received (size of the
    * results-array).
    */
@@ -1146,7 +1138,7 @@
   }
   FREE(tabs);
 
-  /* here: add other optional fields (HELOs) */
+  /* FIXME: here: add other optional fields (HELOs) */
 }
 
 /**
@@ -1502,12 +1494,9 @@
       max);
 #endif
   for (i=0;i<max;i++) {
-    value.data = NULL; 
-    value.dataLength = 0;
-    if (OK != RPC_paramValueByPosition(results,
-                                      i,
-                                      &value.dataLength,
-                                      (void**)&value.data)) {
+    value = RPC_paramDataContainerByPosition(results,
+                                            i);
+    if (value == NULL) {
       hash2enc(&responder->hashPubKey,
               &enc);
       LOG(LOG_WARNING,
@@ -1517,15 +1506,12 @@
       return;
     }
     MUTEX_LOCK(&record->lock);
-    if (record->maxResults > 0) {
-      record->maxResults--;
-      record->resultsFound++; 
-      if (record->callback != NULL) {
-       record->callback(&value,
-                        record->closure);
-      }
-    } 
+    if (record->callback != NULL) 
+      record->resultCallback(&record->keys[0],
+                            value,
+                            record->resultClosure);
     MUTEX_UNLOCK(&record->lock);
+    FREE(value);
   }
 }
 
@@ -1555,7 +1541,7 @@
 #endif  
   if (isNotCloserThanMe(&record->table,
                        peer,                
-                       &record->key))
+                       &record->keys[0]))
     return; /* refuse! */
   cronTime(&now);
   if (record->timeout > now)
@@ -1598,7 +1584,7 @@
 
 /**
  * Callback called for local results found in
- * dht_get_async_start.  Calls the DHT_GET_Complete
+ * dht_get_async_start.  Calls the DHT_OP_Complete
  * callback with the results found locally.
  * A DataProcessor.
  */
@@ -1627,8 +1613,6 @@
  * @param key the key to look up  
  * @param timeout how long to wait until this operation should
  *        automatically time-out
- * @param maxResults maximum number of results to obtain;
- *        also used to determine the level of parallelism; is that wise?
  * @param callback function to call on each result
  * @param closure extra argument to callback
  * @return handle to stop the async get
@@ -1641,7 +1625,7 @@
                    cron_t timeout,
                    DataProcessor resultCallback,
                    void * cls,
-                   DHT_GET_Complete callback,
+                   DHT_OP_Complete callback,
                    void * closure) {
   int i;
   LocalTableData * ltd;
@@ -1708,13 +1692,13 @@
 #endif
     /* We do participate in the table, it is fair to assume
        that we know the relevant peers in my neighbour set */
-    hosts = MALLOC(sizeof(PeerIdentity) * maxResults);
+    hosts = MALLOC(sizeof(PeerIdentity) * ALPHA);
     count = findLocalNodes(table,
                           &keys[0],
                           hosts,
-                          maxResults);
+                          ALPHA);
     /* try adding this peer to hosts */
-    k_best_insert(maxResults,
+    k_best_insert(ALPHA,
                  &count,
                  &keys[0],
                  (HashCode160*) hosts,
@@ -1749,8 +1733,8 @@
        break;
       }  
     
-    if (maxResults > ret->resultsFound) {
-      /* if less than maxResults replies were found, send 
+    if (ALPHA > ret->resultsFound) {
+      /* if less than ALPHA replies were found, send 
         dht_get_RPC to the other peers */
       for (i=0;i<count;i++) {
        if (! hostIdentityEquals(coreAPI->myIdentity,
@@ -1777,7 +1761,7 @@
     LOG(LOG_DEBUG,
        "I do not participate in the table '%s', finding %d other nodes that 
do.\n",
        &enc,
-       maxResults);
+       ALPHA);
 #endif
     /* We do not particpate in the table; hence we need to use 
        findKNodes to find an initial set of peers in that
@@ -1789,7 +1773,7 @@
       = findKNodes_start(table,
                         key,
                         timeout,
-                        maxResults,
+                        ALPHA,
                         (NodeFoundCallback) &send_dht_get_rpc,
                         ret);
   }  
@@ -1974,7 +1958,7 @@
                              timeout,
                              ALPHA - fnc->k, /* level of parallelism 
proportional to 
                                                 number of peers we're looking 
for */
-                             
(DHT_GET_Complete)&findnodes_dht_master_get_callback,
+                             &findnodes_dht_master_get_callback,
                              fnc);
     }
   }
@@ -2186,7 +2170,7 @@
                            timeout,
                            fnc->k, /* level of parallelism proportional to 
                                       number of peers we're looking for */
-                           
(DHT_GET_Complete)&find_k_nodes_dht_master_get_callback,
+                           &find_k_nodes_dht_master_get_callback,
                            fnc);
   }  
   return fnc;
@@ -2369,7 +2353,7 @@
                                                   cron_t timeout,
                                                   const DataContainer * value,
                                                   unsigned int 
replicationLevel,
-                                                  DHT_PUT_Complete callback,
+                                                  DHT_OP_Complete callback,
                                                   void * closure) {
   int i;
   LocalTableData * ltd;
@@ -2495,6 +2479,8 @@
                         (NodeFoundCallback) &send_dht_put_rpc,
                         ret);
   }  
+  /* FIXME: ensure we call OP_Complete callback
+     after timeout! */
   MUTEX_UNLOCK(lock);
   return ret;
 }
@@ -2667,7 +2653,7 @@
                       cron_t timeout,
                       const DataContainer * value,
                       unsigned int replicationLevel,
-                      DHT_REMOVE_Complete callback,
+                      DHT_OP_Complete callback,
                       void * closure) {
   int i;
   LocalTableData * ltd;
@@ -3097,8 +3083,6 @@
  */
 static void rpc_dht_findValue_callback(const DataContainer * value,
                                       RPC_DHT_FindValue_Context * fw) {
-  int stop;
-
   ENTER();
   MUTEX_LOCK(&fw->lock);
   GROW(fw->results,
@@ -3109,7 +3093,6 @@
   memcpy(fw->results[fw->count-1].data,
         value->data,
         value->dataLength);
-  stop = fw->count == fw->maxResults;
   MUTEX_UNLOCK(&fw->lock);
   if (stop) {
     /* don't wait for timeout, run now! */
@@ -3191,8 +3174,6 @@
   fw_context 
     = MALLOC(sizeof(RPC_DHT_FindValue_Context));
   MUTEX_CREATE_RECURSIVE(&fw_context->lock);
-  fw_context->maxResults
-    = ntohl(*maxResults);
   fw_context->count
     = 0;
   fw_context->done
@@ -3211,7 +3192,7 @@
                          ntohll(*timeout),
                          NULL, /* FIXME: resultCallback required! */
                          fw_context,
-                         (DHT_GET_Complete) &rpc_dht_findValue_callback,
+                         &rpc_dht_findValue_callback,
                          fw_context);
   addAbortJob((CronJob)&rpc_DHT_findValue_abort,
              fw_context);
@@ -3362,7 +3343,7 @@
                          ntohll(*timeout),
                          &value,
                          fw_context->replicationLevel,
-                         (DHT_PUT_Complete) &rpc_dht_store_callback,
+                         &rpc_dht_store_callback,
                          fw_context);
   addAbortJob((CronJob)&rpc_DHT_store_abort,
              fw_context);
@@ -3527,7 +3508,7 @@
                             ntohll(*timeout),
                             (value.dataLength==0) ? NULL : &value,
                             fw_context->replicationLevel,
-                            (DHT_REMOVE_Complete) &rpc_dht_remove_callback,
+                            &rpc_dht_remove_callback,
                             fw_context);
   addAbortJob((CronJob)&rpc_DHT_remove_abort,
              fw_context);





reply via email to

[Prev in Thread] Current Thread [Next in Thread]