gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r179 - in GNUnet: . src/applications/fs/module


From: grothoff
Subject: [GNUnet-SVN] r179 - in GNUnet: . src/applications/fs/module
Date: Fri, 4 Feb 2005 07:29:29 -0800 (PST)

Author: grothoff
Date: 2005-02-04 07:29:26 -0800 (Fri, 04 Feb 2005)
New Revision: 179

Modified:
   GNUnet/src/applications/fs/module/fs.c
   GNUnet/todo
Log:
FS-DHT integration done

Modified: GNUnet/src/applications/fs/module/fs.c
===================================================================
--- GNUnet/src/applications/fs/module/fs.c      2005-02-04 13:00:22 UTC (rev 
178)
+++ GNUnet/src/applications/fs/module/fs.c      2005-02-04 15:29:26 UTC (rev 
179)
@@ -25,9 +25,6 @@
  *
  * FS CORE. This is the code that is plugged into the GNUnet core to
  * enable File Sharing.
- *
- * TODO:
- * - DHT integration (will have to modify DHT API, too!)
  */
 
 #include "platform.h"
@@ -86,6 +83,15 @@
  */
 #define MAX_MIGRATION_EXP (1L * cronMONTHS)
 
+typedef struct {
+  struct DHT_GET_RECORD * rec;
+  unsigned int prio;
+} DHT_GET_CLS;
+
+typedef struct {
+  struct DHT_PUT_RECORD * rec;
+} DHT_PUT_CLS;
+
 /**
  * Global core API.
  */
@@ -113,8 +119,95 @@
 
 static Mutex lock;
 
+/**
+ * ID of the FS table in the DHT infrastructure.
+ */
+static DHT_TableId dht_table;
 
 /**
+ * Store an item in the datastore.
+ *
+ * @param key the key of the item
+ * @param value the value to store
+ * @param prio how much does our routing code value
+ *        this datum?
+ * @return OK if the value could be stored,
+ *         NO if the value verifies but is not stored,
+ *         SYSERR if the value is malformed
+ */
+static int gapPut(void * closure,
+                 const HashCode160 * key,
+                 const DataContainer * value,
+                 unsigned int prio) {
+  Datastore_Value * dv;
+  GapWrapper * gw;
+  unsigned int size;
+  int ret;
+  HashCode160 hc;
+  cron_t et;
+  cron_t now;
+
+  if (ntohl(value->size) < sizeof(GapWrapper)) {
+    BREAK();
+    return SYSERR;
+  }
+  gw = (GapWrapper*) value;
+  size = ntohl(gw->dc.size) 
+    - sizeof(GapWrapper) 
+    + sizeof(Datastore_Value);
+  if ( (OK != getQueryFor(size - sizeof(Datastore_Value),
+                         (char*)&gw[1],
+                         &hc)) ||
+       (! equalsHashCode160(&hc, key)) ) {
+    BREAK(); /* value failed verification! */
+    return SYSERR;
+  }
+
+  dv = MALLOC(size);
+  dv->size = htonl(size);
+  dv->type = gw->type;
+  dv->prio = htonl(prio);
+  dv->anonymityLevel = htonl(0);
+  et = ntohll(gw->timeout);
+  cronTime(&now);
+  /* bound ET to MAX_MIGRATION_EXP from now */ 
+  if (et > now) {
+    et -= now;
+    et = et % MAX_MIGRATION_EXP;
+    et += now;
+  }
+  dv->expirationTime = htonll(et);
+  memcpy(&dv[1],
+        &gw[1],
+        size - sizeof(Datastore_Value));
+  processResponse(key, dv); 
+  ret = datastore->putUpdate(key,
+                            dv);
+  FREE(dv);
+  return ret;
+}
+
+static int get_result_callback(const HashCode160 * key,
+                              const DataContainer * value,
+                              DHT_GET_CLS * cls) {
+  gapPut(NULL,
+        key,
+        value,
+        cls->prio);
+  return OK;
+}                                  
+
+static void get_complete_callback(DHT_GET_CLS * cls) {
+  dht->get_stop(cls->rec);
+  FREE(cls);
+}
+
+static void put_complete_callback(DHT_PUT_CLS * cls) {
+  dht->put_stop(cls->rec);
+  FREE(cls);
+}
+
+/**
  * Process a query from the client. Forwards to the network.
  *
  * @return SYSERR if the TCP connection should be closed, otherwise OK
@@ -122,6 +215,7 @@
 static int csHandleRequestQueryStart(ClientHandle sock,
                                     const CS_HEADER * req) {
   RequestSearch * rs;
+  unsigned int keyCount;
 
   if (ntohs(req->size) < sizeof(RequestSearch)) {
     BREAK();
@@ -129,14 +223,28 @@
   }
   rs = (RequestSearch*) req;
   trackQuery(&rs->query[0], sock);
+  keyCount = 1 + (ntohs(req->size) - sizeof(RequestSearch)) / 
sizeof(HashCode160);
   gap->get_start(ntohl(rs->type),
                 ntohl(rs->anonymityLevel),              
-                1 + (ntohs(req->size) - sizeof(RequestSearch)) / 
sizeof(HashCode160),
+                keyCount,
                 &rs->query[0],
                 ntohll(rs->expiration),
                 ntohl(rs->prio));
-  if (ntohl(rs->anonymityLevel) == 0) {
-    /* FIXME: query(rs); -- pass to dht! */
+  if ( (ntohl(rs->anonymityLevel) == 0) &&
+       (dht != NULL) ) {
+    DHT_GET_CLS * cls;
+
+    cls = MALLOC(sizeof(DHT_GET_CLS));
+    cls->prio = ntohl(rs->prio);
+    cls->rec = dht->get_start(&dht_table,
+                             ntohl(rs->type),
+                             keyCount,
+                             &rs->query[0],
+                             ntohll(rs->expiration),
+                             (DataProcessor) &get_result_callback,
+                             cls,
+                             (DHT_OP_Complete) &get_complete_callback,
+                             cls);
   }
   return OK;
 }
@@ -157,7 +265,7 @@
   }
   rs = (RequestSearch*) req;
   if (ntohl(rs->anonymityLevel) == 0) {
-    /* FIXME: cancel with dht */
+    /* FIXME 0.7.1: cancel with dht? */
   }
   gap->get_stop(ntohl(rs->type),
                1 + (ntohs(req->size) - sizeof(RequestSearch)) / 
sizeof(HashCode160),
@@ -177,6 +285,7 @@
   Datastore_Value * datum;
   int ret;
   HashCode160 query;
+  unsigned int type;
 
   if (ntohs(req->size) < sizeof(RequestIndex)) {
     BREAK();
@@ -195,14 +304,47 @@
     FREE(datum);
     return SYSERR;
   }
-  datum->type = htonl(getTypeOfBlock(ntohs(ri->header.size) - 
sizeof(RequestInsert),
-                                    &ri[1]));
+  type = getTypeOfBlock(ntohs(ri->header.size) - sizeof(RequestInsert),
+                       &ri[1]);
+  datum->type = htonl(type);
   MUTEX_LOCK(&lock);
   ret = datastore->put(&query,
                       datum);
   MUTEX_UNLOCK(&lock);
   if (ntohl(ri->anonymityLevel) == 0) {
-  /* do DHT put! */
+    GapWrapper * gw;
+    unsigned int size;
+    cron_t now;
+    cron_t et;
+    DHT_PUT_CLS * cls;
+    
+    size = sizeof(GapWrapper) +
+      ntohs(ri->header.size) - sizeof(RequestInsert) -
+      sizeof(Datastore_Value);
+    gw = MALLOC(size);
+    gw->dc.size = htonl(size);
+    gw->type = htonl(type);
+    et = ntohll(ri->expiration);
+    /* expiration time normalization and randomization */
+    cronTime(&now);
+    if (et > now) {
+      et -= now;
+      et = et % MAX_MIGRATION_EXP;
+      if (et > 0)
+       et = randomi(et);
+      et = et + now;
+    }
+    gw->timeout = htonll(et);
+    memcpy(&gw[1],
+          &ri[1],
+          size - sizeof(GapWrapper));
+    cls = MALLOC(sizeof(DHT_PUT_CLS));
+    cls->rec = dht->put_start(&dht_table,
+                             &query,
+                             15 * cronSECONDS, /* FIXME 0.7.1: better timeout 
for DHT PUT operation */
+                             &gw->dc,
+                             (DHT_OP_Complete) &put_complete_callback,
+                             cls);
   }
 
   FREE(datum);
@@ -509,69 +651,6 @@
 }
   
 /**
- * Store an item in the datastore.
- *
- * @param key the key of the item
- * @param value the value to store
- * @param prio how much does our routing code value
- *        this datum?
- * @return OK if the value could be stored,
- *         NO if the value verifies but is not stored,
- *         SYSERR if the value is malformed
- */
-static int gapPut(void * closure,
-                 const HashCode160 * key,
-                 const DataContainer * value,
-                 unsigned int prio) {
-  Datastore_Value * dv;
-  GapWrapper * gw;
-  unsigned int size;
-  int ret;
-  HashCode160 hc;
-  cron_t et;
-  cron_t now;
-
-  if (ntohl(value->size) < sizeof(GapWrapper)) {
-    BREAK();
-    return SYSERR;
-  }
-  gw = (GapWrapper*) value;
-  size = ntohl(gw->dc.size) 
-    - sizeof(GapWrapper) 
-    + sizeof(Datastore_Value);
-  if ( (OK != getQueryFor(size - sizeof(Datastore_Value),
-                         (char*)&gw[1],
-                         &hc)) ||
-       (! equalsHashCode160(&hc, key)) ) {
-    BREAK(); /* value failed verification! */
-    return SYSERR;
-  }
-
-  dv = MALLOC(size);
-  dv->size = htonl(size);
-  dv->type = gw->type;
-  dv->prio = htonl(prio);
-  dv->anonymityLevel = htonl(0);
-  et = ntohll(gw->timeout);
-  cronTime(&now);
-  /* bound ET to MAX_MIGRATION_EXP from now */ 
-  if (et > now) {
-    et -= now;
-    et = et % MAX_MIGRATION_EXP;
-    et += now;
-  }
-  dv->expirationTime = htonll(et);
-  memcpy(&dv[1],
-        &gw[1],
-        size - sizeof(Datastore_Value));
-  processResponse(key, dv); 
-  ret = datastore->putUpdate(key,
-                            dv);
-  FREE(dv);
-  return ret;
-}
-
-/**
  * Remove an item from the datastore.
  *
  * @param key the key of the item
@@ -599,7 +678,102 @@
   return SYSERR;
 }
 
+
 /**
+ * Callback that converts the Datastore_Value values
+ * from the datastore to Blockstore values for the
+ * DHT routing protocol.
+ */
+static int dhtGetConverter(const HashCode160 * key,
+                          const Datastore_Value * value,
+                          void * cls) {
+  GGC * ggc = (GGC*) cls;
+  GapWrapper * gw;
+  int ret;
+  unsigned int size;
+  cron_t et;
+  cron_t now;
+
+  ret = isDatumApplicable(ntohl(value->type),
+                         ntohl(value->size) - sizeof(Datastore_Value),
+                         (char*) &value[1],
+                         ggc->keyCount,
+                         ggc->keys);
+  if (ret == SYSERR)
+    return SYSERR; /* no query will ever match */
+  if (ret == NO)
+    return OK; /* Additional filtering based on type;
+                 i.e., namespace request and namespace
+                 in reply does not match namespace in query */
+  size = sizeof(GapWrapper) +
+    ntohl(value->size) -
+    sizeof(Datastore_Value);
+
+  if (ntohl(value->anonymityLevel) != 0) 
+    return OK;
+  
+  gw = MALLOC(size);
+  gw->dc.size = htonl(size);
+  gw->type = value->type;
+  et = ntohll(value->expirationTime);
+  /* expiration time normalization and randomization */
+  cronTime(&now);
+  if (et > now) {
+    et -= now;
+    et = et % MAX_MIGRATION_EXP;
+    if (et > 0)
+      et = randomi(et);
+    et = et + now;
+  }
+  gw->timeout = htonll(et);
+  memcpy(&gw[1],
+        &value[1],
+        size - sizeof(GapWrapper));
+
+  if (ggc->resultCallback != NULL)
+    ret = ggc->resultCallback(key,
+                             &gw->dc,
+                             ggc->resCallbackClosure);
+  else
+    ret = OK;
+  FREE(gw);
+  return ret;
+}
+
+/**
+ * Lookup an item in the datastore.
+ *
+ * @param key the value to lookup
+ * @param resultCallback function to call for each result that was found
+ * @param resCallbackClosure extra argument to resultCallback
+ * @return number of results, SYSERR on error
+ */
+static int dhtGet(void * closure,
+                 unsigned int type,
+                 unsigned int prio,
+                 unsigned int keyCount,
+                 const HashCode160 * keys,
+                 DataProcessor resultCallback,
+                 void * resCallbackClosure) {
+  int ret;
+  GGC myClosure;
+
+  myClosure.keyCount = keyCount;
+  myClosure.keys = keys;
+  myClosure.resultCallback = resultCallback;
+  myClosure.resCallbackClosure = resCallbackClosure;
+  ret = datastore->get(&keys[0],
+                      type,
+                      &dhtGetConverter,
+                      &myClosure);
+  if (ret != SYSERR)
+    ret = myClosure.count; /* return number of actual
+                             results (unfiltered) that
+                             were found */
+  return ret;
+}
+  
+/**
  * Initialize the FS module. This method name must match
  * the library name (libgnunet_XXX => initialize_XXX).
  *
@@ -607,7 +781,11 @@
  */
 int initialize_module_fs(CoreAPIForApplication * capi) {
   static Blockstore dsGap;
+  static Blockstore dsDht;
 
+  hash("GNUNET_FS", 
+       strlen("GNUNET_FS"), 
+       &dht_table);
   if (getConfigurationInt("AFS",
                          "DISKQUOTA") <= 0) {
     LOG(LOG_ERROR,
@@ -627,7 +805,8 @@
     capi->releaseService(datastore);
     return SYSERR;
   }
-  dht = capi->requestService("dht");
+  // dht = capi->requestService("dht");
+  dht = NULL;
 
   coreAPI = capi;  
   MUTEX_CREATE(&lock);
@@ -639,7 +818,14 @@
   initQueryManager(capi);
   gap->init(&dsGap);
   
-  /* if (dht != NULL) dht->join(&dsDht, &table);*/ 
+  if (dht != NULL) {
+    dsDht.closure = NULL;
+    dsDht.get = &dhtGet;
+    dsDht.put = &gapPut; /* exactly the same method for gap/dht*/
+    dsDht.del = &gapDel; /* exactly the same method for gap/dht*/
+    dsDht.iterate = &gapIterate;  /* exactly the same method for gap/dht*/
+    dht->join(&dsDht, &dht_table);
+  } 
 
   LOG(LOG_DEBUG,
       _("'%s' registering client handlers %d %d %d %d %d %d %d %d %d\n"),
@@ -676,6 +862,15 @@
 
 void done_module_fs() {
   doneMigration();
+  if (dht != NULL) {
+    LOG(LOG_INFO,
+       "Leaving DHT (this may take a while).");
+    dht->leave(&dht_table, 
+              15 * cronSECONDS); 
+    LOG(LOG_INFO,
+       "Leaving DHT complete.");
+
+  }
   GNUNET_ASSERT(SYSERR != 
coreAPI->unregisterClientHandler(AFS_CS_PROTO_QUERY_START,
                                                           
&csHandleRequestQueryStart));
   GNUNET_ASSERT(SYSERR != 
coreAPI->unregisterClientHandler(AFS_CS_PROTO_QUERY_STOP,
@@ -692,7 +887,6 @@
                                                           
&csHandleRequestTestIndexed));
   GNUNET_ASSERT(SYSERR != 
coreAPI->unregisterClientHandler(AFS_CS_PROTO_GET_AVG_PRIORITY,
                                                           
&csHandleRequestGetAvgPriority));
-  /* dht->leave(&table, timeout); */  
   doneQueryManager();
   coreAPI->releaseService(datastore);
   datastore = NULL;

Modified: GNUnet/todo
===================================================================
--- GNUnet/todo 2005-02-04 13:00:22 UTC (rev 178)
+++ GNUnet/todo 2005-02-04 15:29:26 UTC (rev 179)
@@ -34,9 +34,6 @@
 - Missing Features:
   * topology: do aggressive bootstrap on first start (Christian) [ easy ]
   * ecrs-unindex: code cleanup [ easy ]
-  * fix dht routing service
-     - make dht respect new dht API (long way to go)
-     - fs-dht integration [ difficult ]
   * configure.ac: flags for mysql, gmp, libgcrypt should ONLY be passed when
     linking the respective modules / libraries (gnunet_util, sqstore_mysql) [ 
tricky ]
 - Features removed but to be revived:





reply via email to

[Prev in Thread] Current Thread [Next in Thread]