gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r9053 - GNUnet/src/applications/fs/gap


From: gnunet
Subject: [GNUnet-SVN] r9053 - GNUnet/src/applications/fs/gap
Date: Thu, 1 Oct 2009 12:10:27 -0600

Author: nevans
Date: 2009-10-01 12:10:27 -0600 (Thu, 01 Oct 2009)
New Revision: 9053

Added:
   GNUnet/src/applications/fs/gap/dv_fs.c
   GNUnet/src/applications/fs/gap/dv_querymanager.c
   GNUnet/src/applications/fs/gap/dv_querymanager.h
   GNUnet/src/applications/fs/gap/fs_dv_dht.c
   GNUnet/src/applications/fs/gap/fs_dv_dht.h
   GNUnet/src/applications/fs/gap/test_multi_results_dv.c
Modified:
   GNUnet/src/applications/fs/gap/check.conf
Log:
dv gap testing, not yet working well (though it didn't before either ;-)

Modified: GNUnet/src/applications/fs/gap/check.conf
===================================================================
--- GNUnet/src/applications/fs/gap/check.conf   2009-10-01 16:26:02 UTC (rev 
9052)
+++ GNUnet/src/applications/fs/gap/check.conf   2009-10-01 18:10:27 UTC (rev 
9053)
@@ -1,7 +1,7 @@
 # General settings
 [GNUNET]
-GNUNET_HOME = "/tmp/gnunet-gap-test-driver"
-LOGLEVEL = "ERROR"
+GNUNET_HOME = "/tmp/gap-test-driver"
+KEEPLOG = 3
 LOGFILE = ""
 PROCESS-PRIORITY = "NORMAL"
 
@@ -9,8 +9,11 @@
 [NETWORK]
 HOST = "localhost:2087"
 
-
 [TESTING]
 WEAKRANDOM = YES
 
+[LOGGING]
+DEVELOPER = YES
+ADMIN-LEVEL = DEBUG
+USER-LEVEL = DEBUG
 

Added: GNUnet/src/applications/fs/gap/dv_fs.c
===================================================================
--- GNUnet/src/applications/fs/gap/dv_fs.c                              (rev 0)
+++ GNUnet/src/applications/fs/gap/dv_fs.c      2009-10-01 18:10:27 UTC (rev 
9053)
@@ -0,0 +1,1003 @@
+/*
+     This file is part of GNUnet.
+     (C) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008 Christian Grothoff 
(and other contributing authors)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 2, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
+*/
+
+/**
+ * @file applications/fs/gap/dv_fs.c
+ * @brief functions for handling CS and P2P file-sharing requests
+ * @author Christian Grothoff, Nathan Evans
+ *
+ * This file contains all of the entry points to the file-sharing
+ * module.
+ *
+ * TODO:
+ * - integrate with migration submodule
+ * - make sure we do an immediate PUSH for DHT stuff
+ *   given to us with anonymity_level zero.
+ */
+
+#include "platform.h"
+#include "gnunet_util.h"
+#include "gnunet_directories.h"
+#include "gnunet_protocols.h"
+#include "gnunet_datastore_service.h"
+#include "gnunet_dht_service.h"
+#include "gnunet_identity_service.h"
+#include "gnunet_stats_service.h"
+#include "gnunet_traffic_service.h"
+#include "ecrs_core.h"
+#include "anonymity.h"
+#include "fs.h"
+#include "fs_dv_dht.h"
+#include "gap.h"
+#include "migration.h"
+#include "dv_querymanager.h"
+#include "ondemand.h"
+#include "plan.h"
+#include "pid_table.h"
+#include "shared.h"
+
+
+#define DEBUG_FS GNUNET_NO
+
+/**
+ * Lock shared between all C files in this
+ * directory.
+ */
+struct GNUNET_Mutex *GNUNET_FS_lock;
+
+static struct GNUNET_GE_Context *ectx;
+
+static GNUNET_CoreAPIForPlugins *coreAPI;
+
+static GNUNET_Identity_ServiceAPI *identity;
+
+static GNUNET_Stats_ServiceAPI *stats;
+
+static GNUNET_Datastore_ServiceAPI *datastore;
+
+static int active_migration;
+
+static int stat_gap_query_received;
+
+static int stat_gap_query_drop_busy;
+
+static int stat_gap_content_received;
+
+static int stat_gap_trust_awarded;
+
+/**
+ * Hard CPU limit
+ */
+static unsigned long long hardCPULimit;
+
+/**
+ * Hard network upload limit.
+ */
+static unsigned long long hardUpLimit;
+
+
+
+/* ********************* CS handlers ********************** */
+
+/**
+ * Process a request to insert content from the client.
+ *
+ * @return GNUNET_SYSERR if the TCP connection should be closed, otherwise 
GNUNET_OK
+ */
+static int
+handle_cs_insert_request (struct GNUNET_ClientHandle *sock,
+                          const GNUNET_MessageHeader * req)
+{
+  const CS_fs_request_insert_MESSAGE *ri;
+  GNUNET_DatastoreValue *datum;
+  struct GNUNET_GE_Context *cectx;
+  GNUNET_HashCode query;
+  int ret;
+#if DEBUG_FS
+  GNUNET_EncName enc;
+#endif
+
+  ri = (const CS_fs_request_insert_MESSAGE *) req;
+  if ((ntohs (req->size) < sizeof (CS_fs_request_insert_MESSAGE)) ||
+      (GNUNET_OK !=
+       GNUNET_EC_file_block_check_and_get_query (ntohs (ri->header.size) -
+                                                 sizeof
+                                                 
(CS_fs_request_insert_MESSAGE),
+                                                 (const GNUNET_EC_DBlock *)
+                                                 &ri[1], GNUNET_YES, &query)))
+    {
+      GNUNET_GE_BREAK (ectx, 0);
+      return GNUNET_SYSERR;
+    }
+  datum = GNUNET_malloc (sizeof (GNUNET_DatastoreValue) +
+                         ntohs (req->size) -
+                         sizeof (CS_fs_request_insert_MESSAGE));
+  datum->size =
+    htonl (sizeof (GNUNET_DatastoreValue) + ntohs (req->size) -
+           sizeof (CS_fs_request_insert_MESSAGE));
+  datum->expiration_time = ri->expiration;
+  datum->priority = ri->priority;
+  datum->anonymity_level = ri->anonymity_level;
+  datum->type =
+    htonl (GNUNET_EC_file_block_get_type
+           (ntohs (ri->header.size) - sizeof (CS_fs_request_insert_MESSAGE),
+            (const GNUNET_EC_DBlock *) &ri[1]));
+#if DEBUG_FS
+  IF_GELOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+            GNUNET_hash_to_enc (&query, &enc));
+  GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+                 "FS received REQUEST INSERT (query: `%s', type: %u, priority 
%u)\n",
+                 &enc, ntohl (datum->type), ntohl (ri->priority));
+#endif
+  memcpy (&datum[1],
+          &ri[1], ntohs (req->size) - sizeof (CS_fs_request_insert_MESSAGE));
+  ret = datastore->putUpdate (&query, datum);
+  if (ret == GNUNET_NO)
+    {
+      cectx = coreAPI->cs_log_context_create (sock);
+      GNUNET_GE_LOG (cectx,
+                     GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
+                     _("Datastore full.\n"));
+      GNUNET_GE_free_context (cectx);
+    }
+  GNUNET_free (datum);
+  return coreAPI->cs_send_value (sock, ret);
+}
+
+/**
+ * Process a request to symlink a file
+ */
+static int
+handle_cs_init_index_request (struct GNUNET_ClientHandle *sock,
+                              const GNUNET_MessageHeader * req)
+{
+  const CS_fs_request_init_index_MESSAGE *ri;
+  struct GNUNET_GE_Context *cectx;
+  int fnLen;
+  int ret;
+  char *fn;
+
+  fnLen = ntohs (req->size) - sizeof (CS_fs_request_init_index_MESSAGE);
+  if ((ntohs (req->size) < sizeof (CS_fs_request_init_index_MESSAGE))
+#if WINDOWS
+      || (fnLen > _MAX_PATH)
+#endif
+    )
+    {
+      GNUNET_GE_BREAK (ectx, 0);
+      return GNUNET_SYSERR;
+    }
+  ri = (const CS_fs_request_init_index_MESSAGE *) req;
+  fn = GNUNET_malloc (fnLen + 1);
+  strncpy (fn, (const char *) &ri[1], fnLen + 1);
+  fn[fnLen] = 0;
+  cectx = coreAPI->cs_log_context_create (sock);
+  ret =
+    GNUNET_FS_ONDEMAND_index_prepare_with_symlink (cectx, &ri->fileId, fn);
+  GNUNET_GE_free_context (cectx);
+  GNUNET_free (fn);
+#if DEBUG_FS
+  GNUNET_GE_LOG (ectx,
+                 GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+                 "Sending confirmation (%s) of index initialization request to 
client\n",
+                 ret == GNUNET_OK ? "success" : "failure");
+#endif
+  return coreAPI->cs_send_value (sock, ret);
+}
+
+/**
+ * Process a request to index content from the client.
+ *
+ * @return GNUNET_SYSERR if the TCP connection should be closed, otherwise 
GNUNET_OK
+ */
+static int
+handle_cs_index_request (struct GNUNET_ClientHandle *sock,
+                         const GNUNET_MessageHeader * req)
+{
+  int ret;
+  const CS_fs_request_index_MESSAGE *ri;
+  struct GNUNET_GE_Context *cectx;
+#if DEBUG_FS
+  GNUNET_HashCode hc;
+  GNUNET_EncName enc;
+#endif
+
+  if (ntohs (req->size) < sizeof (CS_fs_request_index_MESSAGE))
+    {
+      GNUNET_GE_BREAK (ectx, 0);
+      return GNUNET_SYSERR;
+    }
+  cectx = coreAPI->cs_log_context_create (sock);
+  ri = (const CS_fs_request_index_MESSAGE *) req;
+#if DEBUG_FS
+  GNUNET_EC_file_block_get_query ((const GNUNET_EC_DBlock *) &ri[1],
+                                  ntohs (ri->header.size) -
+                                  sizeof (CS_fs_request_index_MESSAGE), &hc);
+  IF_GELOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+            GNUNET_hash_to_enc (&hc, &enc));
+  GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+                 "FS received REQUEST INDEX (query: `%s', priority %u)\n",
+                 &enc, ntohl (ri->priority));
+#endif
+  ret = GNUNET_FS_ONDEMAND_add_indexed_content (cectx,
+                                                datastore,
+                                                ntohl (ri->priority),
+                                                GNUNET_ntohll
+                                                (ri->expiration),
+                                                GNUNET_ntohll
+                                                (ri->fileOffset),
+                                                ntohl (ri->anonymity_level),
+                                                &ri->fileId,
+                                                ntohs (ri->header.size) -
+                                                sizeof
+                                                (CS_fs_request_index_MESSAGE),
+                                                (const GNUNET_EC_DBlock *)
+                                                &ri[1]);
+  GNUNET_GE_free_context (cectx);
+#if DEBUG_FS
+  GNUNET_GE_LOG (ectx,
+                 GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+                 "Sending confirmation (%s) of index request to client\n",
+                 ret == GNUNET_OK ? "success" : "failure");
+#endif
+  return coreAPI->cs_send_value (sock, ret);
+}
+
+/**
+ * Process a query to delete content.
+ *
+ * @return GNUNET_SYSERR if the TCP connection should be closed, otherwise 
GNUNET_OK
+ */
+static int
+handle_cs_delete_request (struct GNUNET_ClientHandle *sock,
+                          const GNUNET_MessageHeader * req)
+{
+  int ret;
+  const CS_fs_request_delete_MESSAGE *rd;
+  GNUNET_DatastoreValue *value;
+  GNUNET_HashCode query;
+  unsigned int type;
+#if DEBUG_FS
+  GNUNET_EncName enc;
+#endif
+
+  if (ntohs (req->size) < sizeof (CS_fs_request_delete_MESSAGE))
+    {
+      GNUNET_GE_BREAK (ectx, 0);
+      return GNUNET_SYSERR;
+    }
+  rd = (const CS_fs_request_delete_MESSAGE *) req;
+  value = GNUNET_malloc (sizeof (GNUNET_DatastoreValue) +
+                         ntohs (req->size) -
+                         sizeof (CS_fs_request_delete_MESSAGE));
+  value->size =
+    ntohl (sizeof (GNUNET_DatastoreValue) + ntohs (req->size) -
+           sizeof (CS_fs_request_delete_MESSAGE));
+  type =
+    GNUNET_EC_file_block_get_type (ntohs (rd->header.size) -
+                                   sizeof (CS_fs_request_delete_MESSAGE),
+                                   (const GNUNET_EC_DBlock *) &rd[1]);
+  value->type = htonl (type);
+  memcpy (&value[1],
+          &rd[1], ntohs (req->size) - sizeof (CS_fs_request_delete_MESSAGE));
+  if (GNUNET_OK !=
+      GNUNET_EC_file_block_check_and_get_query (ntohs (rd->header.size) -
+                                                sizeof
+                                                (CS_fs_request_delete_MESSAGE),
+                                                (const GNUNET_EC_DBlock *)
+                                                &rd[1], GNUNET_NO, &query))
+    {
+      GNUNET_free (value);
+      GNUNET_GE_BREAK (ectx, 0);
+      return GNUNET_SYSERR;
+    }
+#if DEBUG_FS
+  IF_GELOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+            GNUNET_hash_to_enc (&query, &enc));
+  GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+                 "FS received REQUEST DELETE (query: `%s', type: %u)\n", &enc,
+                 type);
+#endif
+  GNUNET_mutex_lock (GNUNET_FS_lock);
+  value->type = htonl (GNUNET_ECRS_BLOCKTYPE_ANY);
+  ret = datastore->get (&query, type,
+                        
&GNUNET_FS_HELPER_complete_value_from_database_callback,
+                        value);
+  if ((0 < ret) && (value->type != htonl (GNUNET_ECRS_BLOCKTYPE_ANY)))
+    {
+      ret = datastore->del (&query, value);
+    }
+  else
+    {                           /* not found */
+      ret = GNUNET_SYSERR;
+    }
+  GNUNET_mutex_unlock (GNUNET_FS_lock);
+  GNUNET_free (value);
+#if DEBUG_FS
+  GNUNET_GE_LOG (ectx,
+                 GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+                 "Sending confirmation (%s) of delete request to client\n",
+                 ret != GNUNET_SYSERR ? "success" : "failure");
+#endif
+  return coreAPI->cs_send_value (sock, ret);
+}
+
+/**
+ * Process a client request unindex content.
+ */
+static int
+handle_cs_unindex_request (struct GNUNET_ClientHandle *sock,
+                           const GNUNET_MessageHeader * req)
+{
+  int ret;
+  const CS_fs_request_unindex_MESSAGE *ru;
+  struct GNUNET_GE_Context *cectx;
+
+  cectx = coreAPI->cs_log_context_create (sock);
+  if (ntohs (req->size) != sizeof (CS_fs_request_unindex_MESSAGE))
+    {
+      GNUNET_GE_BREAK (ectx, 0);
+      GNUNET_GE_BREAK (cectx, 0);
+      GNUNET_GE_free_context (cectx);
+      return GNUNET_SYSERR;
+    }
+  ru = (const CS_fs_request_unindex_MESSAGE *) req;
+#if DEBUG_FS
+  GNUNET_GE_LOG (ectx,
+                 GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+                 "FS received REQUEST UNINDEX\n");
+#endif
+  ret = GNUNET_FS_ONDEMAND_delete_indexed_content (cectx,
+                                                   datastore,
+                                                   ntohl (ru->blocksize),
+                                                   &ru->fileId);
+  GNUNET_GE_free_context (cectx);
+  return coreAPI->cs_send_value (sock, ret);
+}
+
+/**
+ * Process a client request to test if certain
+ * data is indexed.
+ */
+static int
+handle_cs_test_indexed_request (struct GNUNET_ClientHandle *sock,
+                                const GNUNET_MessageHeader * req)
+{
+  int ret;
+  const CS_fs_request_test_index_MESSAGE *ru;
+
+  if (ntohs (req->size) != sizeof (CS_fs_request_test_index_MESSAGE))
+    {
+      GNUNET_GE_BREAK (ectx, 0);
+      return GNUNET_SYSERR;
+    }
+  ru = (const CS_fs_request_test_index_MESSAGE *) req;
+#if DEBUG_FS
+  GNUNET_GE_LOG (ectx,
+                 GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+                 "FS received REQUEST TESTINDEXED\n");
+#endif
+  ret = GNUNET_FS_ONDEMAND_test_indexed_file (datastore, &ru->fileId);
+  return coreAPI->cs_send_value (sock, ret);
+}
+
+struct FPPClosure
+{
+  struct GNUNET_ClientHandle *sock;
+  struct GNUNET_MultiHashMap *seen;
+  unsigned int processed;
+  int have_more;
+};
+
+/**
+ * Any response that we get should be passed
+ * back to the client.  If the response is unique,
+ * we should abort the iteration (return GNUNET_SYSERR).
+ */
+static int
+fast_path_processor (const GNUNET_HashCode * key,
+                     const GNUNET_DatastoreValue *
+                     value, void *closure, unsigned long long uid)
+{
+  struct FPPClosure *cls = closure;
+  GNUNET_HashCode hc;
+  unsigned int type;
+  int ret;
+
+  if (cls->processed > GNUNET_GAP_MAX_SYNC_PROCESSED)
+    {
+      cls->have_more = GNUNET_YES;
+      return GNUNET_SYSERR;
+    }
+  type = ntohl (((const GNUNET_EC_DBlock *) &value[1])->type);
+  ret = GNUNET_FS_HELPER_send_to_client (coreAPI,
+                                         key, value, cls->sock, NULL, &hc);
+  if (ret == GNUNET_NO)
+    return GNUNET_NO;           /* delete + continue */
+  cls->processed++;
+  if (ret != GNUNET_OK)
+    cls->have_more = GNUNET_YES;        /* switch to async processing */
+  if ((type == GNUNET_ECRS_BLOCKTYPE_DATA) || (ret != GNUNET_OK))
+    return GNUNET_SYSERR;       /* unique response or client can take no more 
*/
+  if (cls->seen == NULL)
+    cls->seen = GNUNET_multi_hash_map_create (8);
+  GNUNET_multi_hash_map_put (cls->seen,
+                             &hc,
+                             NULL, GNUNET_MultiHashMapOption_UNIQUE_FAST);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Process a query from the client. Forwards to the network.
+ *
+ * @return GNUNET_SYSERR if the TCP connection should be closed, otherwise 
GNUNET_OK
+ */
+static int
+handle_cs_query_start_request (struct GNUNET_ClientHandle *sock,
+                               const GNUNET_MessageHeader * req)
+{
+  static GNUNET_PeerIdentity all_zeros;
+  struct FPPClosure fpp;
+  const CS_fs_request_search_MESSAGE *rs;
+  unsigned int keyCount;
+  unsigned int type;
+  unsigned int anonymityLevel;
+  int have_target;
+#if DEBUG_FS
+  GNUNET_EncName enc;
+#endif
+
+  if (ntohs (req->size) < sizeof (CS_fs_request_search_MESSAGE))
+    {
+      GNUNET_GE_BREAK (ectx, 0);
+      return GNUNET_SYSERR;
+    }
+  rs = (const CS_fs_request_search_MESSAGE *) req;
+  type = ntohl (rs->type);
+  /* try "fast path" avoiding gap/dht if unique reply is locally available */
+#if DEBUG_FS
+  IF_GELOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+            GNUNET_hash_to_enc (&rs->query[0], &enc));
+  GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+                 "FS received QUERY (query: `%s', type: %u)\n", &enc, type);
+#endif
+  fpp.sock = sock;
+  fpp.seen = NULL;
+  fpp.have_more = GNUNET_NO;
+  fpp.processed = 0;
+  if (GNUNET_OK ==
+      coreAPI->cs_send_message_now_test (sock,
+                                         GNUNET_GAP_ESTIMATED_DATA_SIZE,
+                                         GNUNET_NO))
+    {
+      if (type == GNUNET_ECRS_BLOCKTYPE_DATA)
+        {
+          if (((1 == datastore->get (&rs->query[0],
+                                     type, &fast_path_processor, &fpp)) ||
+               (1 == datastore->get (&rs->query[0],
+                                     GNUNET_ECRS_BLOCKTYPE_ONDEMAND,
+                                     &fast_path_processor, &fpp))) &&
+              (fpp.have_more == GNUNET_NO))
+            goto CLEANUP;
+        }
+      else
+        datastore->get (&rs->query[0], type, &fast_path_processor, &fpp);
+    }
+  else
+    fpp.have_more = GNUNET_YES;
+  anonymityLevel = ntohl (rs->anonymity_level);
+  keyCount =
+    1 + (ntohs (req->size) -
+         sizeof (CS_fs_request_search_MESSAGE)) / sizeof (GNUNET_HashCode);
+  have_target =
+    memcmp (&all_zeros, &rs->target, sizeof (GNUNET_PeerIdentity)) != 0;
+  GNUNET_DV_FS_QUERYMANAGER_start_query (&rs->query[0], keyCount, 
anonymityLevel,
+                                      type, sock,
+                                      have_target ? &rs->target : NULL,
+                                      fpp.seen, fpp.have_more);
+CLEANUP:
+  if (fpp.seen != NULL)
+    GNUNET_multi_hash_map_destroy (fpp.seen);
+  return GNUNET_OK;
+}
+
+/**
+ * Process a stop request from the client.
+ *
+ * @return GNUNET_SYSERR if the TCP connection should be closed, otherwise 
GNUNET_OK
+ */
+static int
+handle_cs_query_stop_request (struct GNUNET_ClientHandle *sock,
+                              const GNUNET_MessageHeader * req)
+{
+  const CS_fs_request_search_MESSAGE *rs;
+  unsigned int keyCount;
+  unsigned int type;
+  unsigned int anonymityLevel;
+
+  if (ntohs (req->size) < sizeof (CS_fs_request_search_MESSAGE))
+    {
+      GNUNET_GE_BREAK (ectx, 0);
+      return GNUNET_SYSERR;
+    }
+  rs = (const CS_fs_request_search_MESSAGE *) req;
+  type = ntohl (rs->type);
+  anonymityLevel = ntohl (rs->anonymity_level);
+  keyCount =
+    1 + (ntohs (req->size) -
+         sizeof (CS_fs_request_search_MESSAGE)) / sizeof (GNUNET_HashCode);
+  GNUNET_DV_FS_QUERYMANAGER_stop_query (&rs->query[0], keyCount, 
anonymityLevel,
+                                     type, sock);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Return 1 if the current network (upstream) or CPU load is
+ * (far) too high, 0 if the load is ok.
+ */
+static int
+test_load_too_high ()
+{
+  return ((hardCPULimit > 0) &&
+          (GNUNET_cpu_get_load (ectx,
+                                coreAPI->cfg) >= hardCPULimit)) ||
+    ((hardUpLimit > 0) &&
+     (GNUNET_network_monitor_get_load (coreAPI->load_monitor,
+                                       GNUNET_ND_UPLOAD) >= hardUpLimit));
+}
+
+/**
+ * Handle P2P query for content.
+ */
+static int
+handle_p2p_query (const GNUNET_PeerIdentity * sender,
+                  const GNUNET_MessageHeader * msg)
+{
+  const P2P_gap_query_MESSAGE *req;
+  unsigned int query_count;
+  unsigned short size;
+  unsigned int bloomfilter_size;
+  int ttl;
+  unsigned int prio;
+  unsigned int type;
+  unsigned int netLoad;
+  enum GNUNET_FS_RoutingPolicy policy;
+  double preference;
+
+  if (stats != NULL)
+    stats->change (stat_gap_query_received, 1);
+  if (test_load_too_high ())
+    {
+#if DEBUG_GAP
+      if (sender != NULL)
+        {
+          IF_GELOG (ectx,
+                    GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+                    GNUNET_hash_to_enc (&sender->hashPubKey, &enc));
+        }
+      GNUNET_GE_LOG (ectx,
+                     GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+                     "Dropping query from %s, this peer is too busy.\n",
+                     sender == NULL ? "localhost" : (char *) &enc);
+#endif
+      if (stats != NULL)
+        stats->change (stat_gap_query_drop_busy, 1);
+      return GNUNET_OK;
+    }
+  size = ntohs (msg->size);
+  if (size < sizeof (P2P_gap_query_MESSAGE))
+    {
+      GNUNET_GE_BREAK_OP (ectx, 0);
+      return GNUNET_SYSERR;     /* malformed query */
+    }
+  req = (const P2P_gap_query_MESSAGE *) msg;
+  query_count = ntohl (req->number_of_queries);
+  if ((query_count == 0) ||
+      (query_count > GNUNET_MAX_BUFFER_SIZE / sizeof (GNUNET_HashCode)) ||
+      (size <
+       sizeof (P2P_gap_query_MESSAGE) + (query_count -
+                                         1) * sizeof (GNUNET_HashCode))
+      || (0 ==
+          memcmp (&req->returnTo, coreAPI->my_identity,
+                  sizeof (GNUNET_PeerIdentity))))
+    {
+      GNUNET_GE_BREAK_OP (ectx, 0);
+      return GNUNET_SYSERR;     /* malformed query */
+    }
+  bloomfilter_size =
+    size - (sizeof (P2P_gap_query_MESSAGE) +
+            (query_count - 1) * sizeof (GNUNET_HashCode));
+  GNUNET_GE_ASSERT (NULL, bloomfilter_size < size);
+  prio = ntohl (req->priority);
+  netLoad =
+    GNUNET_network_monitor_get_load (coreAPI->load_monitor, GNUNET_ND_UPLOAD);
+  if ((netLoad == (unsigned int) -1)
+      || (netLoad < GNUNET_GAP_IDLE_LOAD_THRESHOLD))
+    {
+      prio = 0;                 /* minimum priority, no charge! */
+      policy = GNUNET_FS_RoutingPolicy_ALL;
+    }
+  else
+    {
+      prio = -identity->changeHostTrust (sender, -prio);
+      if (netLoad < GNUNET_GAP_IDLE_LOAD_THRESHOLD + prio)
+        {
+          policy = GNUNET_FS_RoutingPolicy_ALL;
+        }
+      else if (netLoad < 90 + 10 * prio)
+        {
+          policy =
+            GNUNET_FS_RoutingPolicy_ANSWER | GNUNET_FS_RoutingPolicy_FORWARD;
+        }
+      else if (netLoad < 100)
+        {
+          policy = GNUNET_FS_RoutingPolicy_ANSWER;
+        }
+      else
+        {
+          if (stats != NULL)
+            stats->change (stat_gap_query_drop_busy, 1);
+          return GNUNET_OK;     /* drop */
+        }
+    }
+  if ((policy & GNUNET_FS_RoutingPolicy_INDIRECT) == 0)
+    /* kill the priority (since we cannot benefit) */
+    prio = 0;
+  ttl = GNUNET_FS_HELPER_bound_ttl (ntohl (req->ttl), prio);
+  type = ntohl (req->type);
+  /* decrement ttl (always) */
+  if (ttl < 0)
+    {
+      ttl -= 2 * GNUNET_GAP_TTL_DECREMENT +
+        GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK,
+                           GNUNET_GAP_TTL_DECREMENT);
+      if (ttl > 0)
+        /* integer underflow => drop (should be very rare)! */
+        return GNUNET_OK;
+    }
+  else
+    {
+      ttl -= 2 * GNUNET_GAP_TTL_DECREMENT +
+        GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK,
+                           GNUNET_GAP_TTL_DECREMENT);
+    }
+  preference = (double) prio;
+  if (preference < GNUNET_GAP_QUERY_BANDWIDTH_VALUE)
+    preference = GNUNET_GAP_QUERY_BANDWIDTH_VALUE;
+  coreAPI->p2p_connection_preference_increase (sender, preference);
+  GNUNET_FS_GAP_execute_query (sender,
+                               prio,
+                               ntohl (req->priority),
+                               policy,
+                               ttl,
+                               type,
+                               query_count,
+                               &req->queries[0],
+                               ntohl (req->filter_mutator),
+                               bloomfilter_size, &req->queries[query_count]);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Use content (forward to whoever sent the query).
+ * @param hostId the peer from where the content came,
+ *     NULL for the local peer
+ */
+static int
+handle_p2p_content (const GNUNET_PeerIdentity * sender,
+                    const GNUNET_MessageHeader * pmsg)
+{
+  const P2P_gap_reply_MESSAGE *msg;
+  const GNUNET_EC_DBlock *dblock;
+  GNUNET_DatastoreValue *value;
+  GNUNET_HashCode query;
+  unsigned short size;
+  unsigned int data_size;
+  unsigned int prio;
+  unsigned long long expiration;
+  double preference;
+  GNUNET_CronTime now;
+
+  size = ntohs (pmsg->size);
+  if (size < sizeof (P2P_gap_reply_MESSAGE))
+    {
+      GNUNET_GE_BREAK_OP (ectx, 0);
+      return GNUNET_SYSERR;     /* invalid! */
+    }
+  msg = (const P2P_gap_reply_MESSAGE *) pmsg;
+  data_size = size - sizeof (P2P_gap_reply_MESSAGE);
+  dblock = (const GNUNET_EC_DBlock *) &msg[1];
+
+  expiration = GNUNET_ntohll (msg->expiration);
+  if ((expiration > GNUNET_GAP_MAX_MIGRATION_EXP_KSK) &&
+      (ntohl (dblock->type) == GNUNET_ECRS_BLOCKTYPE_KEYWORD))
+    return GNUNET_OK;           /* expired KSK block -- ignore! */
+  if (GNUNET_OK !=
+      GNUNET_EC_file_block_check_and_get_query (data_size,
+                                                dblock, GNUNET_YES, &query))
+    {
+      GNUNET_GE_BREAK_OP (ectx, 0);
+      return GNUNET_SYSERR;     /* invalid! */
+    }
+  if ((stats != NULL) && (sender != NULL))
+    stats->change (stat_gap_content_received, 1);
+  /* forward to other peers */
+  prio = GNUNET_FS_GAP_handle_response (sender,
+                                        &query,
+                                        expiration, data_size, dblock);
+  /* convert expiration to absolute time and bound properly for
+     storage in local datastore */
+  now = GNUNET_get_time ();
+  if (expiration > GNUNET_GAP_MAX_MIGRATION_EXP)
+    {
+      /* expired, sometime in the past */
+      expiration = now - 1;
+    }
+  else
+    {
+      /* expires in future, apply bounding! */
+      if (ntohl (dblock->type) == GNUNET_ECRS_BLOCKTYPE_KEYWORD)
+        expiration %= GNUNET_GAP_MAX_MIGRATION_EXP_KSK;
+      else
+        expiration %= GNUNET_GAP_MAX_MIGRATION_EXP;
+      expiration += now;
+    }
+  /* forward to local clients */
+  prio += GNUNET_DV_FS_QUERYMANAGER_handle_response (sender,
+                                                  &query,
+                                                  expiration,
+                                                  data_size, dblock);
+  if ((sender != NULL) &&
+      (active_migration == GNUNET_YES) &&
+      ((prio > 0) || (!test_load_too_high ())))
+    {
+      /* consider storing in local datastore */
+      value = GNUNET_malloc (data_size + sizeof (GNUNET_DatastoreValue));
+      value->size = htonl (data_size + sizeof (GNUNET_DatastoreValue));
+      value->type = dblock->type;
+      value->priority = htonl (prio);
+      value->anonymity_level = htonl (1);
+      value->expiration_time = GNUNET_htonll (expiration);
+      memcpy (&value[1], dblock, data_size);
+      datastore->putUpdate (&query, value);
+      GNUNET_free (value);
+    }
+  if (sender != NULL)
+    {                           /* if we are the sender, sender will be NULL */
+      identity->changeHostTrust (sender, prio);
+      if (stats != NULL)
+        stats->change (stat_gap_trust_awarded, prio);
+      preference = (double) prio;
+      if (preference < GNUNET_GAP_CONTENT_BANDWIDTH_VALUE)
+        preference = GNUNET_GAP_CONTENT_BANDWIDTH_VALUE;
+      coreAPI->p2p_connection_preference_increase (sender, preference);
+    }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Initialize the FS module. This method name must match
+ * the library name (libgnunet_XXX => initialize_XXX).
+ *
+ * @return GNUNET_SYSERR on errors
+ */
+int
+initialize_module_dv_fs (GNUNET_CoreAPIForPlugins * capi)
+{
+  ectx = capi->ectx;
+  coreAPI = capi;
+  GNUNET_GE_ASSERT (ectx, sizeof (GNUNET_EC_ContentHashKey) == 128);
+  GNUNET_GE_ASSERT (ectx, sizeof (GNUNET_EC_DBlock) == 4);
+  GNUNET_GE_ASSERT (ectx, sizeof (GNUNET_EC_IBlock) == 132);
+  GNUNET_GE_ASSERT (ectx, sizeof (GNUNET_EC_KBlock) == 524);
+  GNUNET_GE_ASSERT (ectx, sizeof (GNUNET_EC_SBlock) == 588);
+  GNUNET_GE_ASSERT (ectx, sizeof (GNUNET_EC_KSBlock) == 1116);
+
+  if ((-1 == GNUNET_GC_get_configuration_value_number (coreAPI->cfg, "LOAD", 
"HARDCPULIMIT", 0, 100000, /* 1000 CPUs!? */
+                                                       0,       /* 0 == no 
limit */
+                                                       &hardCPULimit)) || (-1 
== GNUNET_GC_get_configuration_value_number (coreAPI->cfg, "LOAD", 
"HARDUPLIMIT", 0, 999999999, 0,        /* 0 == no limit */
+                                                                               
                                            &hardUpLimit)))
+    return GNUNET_SYSERR;
+  active_migration
+    = GNUNET_GC_get_configuration_value_yesno (coreAPI->cfg,
+                                               "DV_FS",
+                                               "ACTIVEMIGRATION", GNUNET_NO);
+  stats = capi->service_request ("stats");
+  if (stats != NULL)
+    {
+      stat_gap_query_received =
+        stats->create (gettext_noop ("# gap requests total received"));
+      stat_gap_query_drop_busy =
+        stats->create (gettext_noop ("# gap requests dropped due to load"));
+      stat_gap_content_received =
+        stats->create (gettext_noop ("# gap content total received"));
+      stat_gap_trust_awarded =
+        stats->create (gettext_noop ("# gap total trust awarded"));
+    }
+  identity = capi->service_request ("identity");
+  if (identity == NULL)
+    {
+      GNUNET_GE_BREAK (ectx, 0);
+      capi->service_release (stats);
+      return GNUNET_SYSERR;
+    }
+  datastore = capi->service_request ("datastore");
+  if (datastore == NULL)
+    {
+      capi->service_release (identity);
+      capi->service_release (stats);
+      GNUNET_GE_BREAK (ectx, 0);
+      return GNUNET_SYSERR;
+    }
+  GNUNET_FS_lock = capi->global_lock_get ();    // GNUNET_mutex_create 
(GNUNET_YES);
+  GNUNET_FS_ANONYMITY_init (capi);
+  GNUNET_FS_PLAN_init (capi);
+  GNUNET_FS_ONDEMAND_init (capi);
+  GNUNET_FS_PT_init (ectx, stats);
+  GNUNET_DV_FS_QUERYMANAGER_init (capi);
+  GNUNET_FS_DV_DHT_init (capi);
+  GNUNET_FS_GAP_init (capi);
+  GNUNET_FS_MIGRATION_init (capi);
+  GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+                 _
+                 ("`%s' registering client handlers %d %d %d %d %d %d %d %d 
and P2P handlers %d %d\n"),
+                 "fs", GNUNET_CS_PROTO_GAP_QUERY_START,
+                 GNUNET_CS_PROTO_GAP_QUERY_STOP,
+                 GNUNET_CS_PROTO_GAP_INSERT,
+                 GNUNET_CS_PROTO_GAP_INDEX, GNUNET_CS_PROTO_GAP_DELETE,
+                 GNUNET_CS_PROTO_GAP_UNINDEX, GNUNET_CS_PROTO_GAP_TESTINDEX,
+                 GNUNET_CS_PROTO_GAP_INIT_INDEX,
+                 GNUNET_P2P_PROTO_GAP_QUERY, GNUNET_P2P_PROTO_GAP_RESULT);
+  GNUNET_GE_ASSERT (ectx,
+                    GNUNET_SYSERR !=
+                    capi->p2p_ciphertext_handler_register
+                    (GNUNET_P2P_PROTO_GAP_QUERY, &handle_p2p_query));
+  GNUNET_GE_ASSERT (ectx,
+                    GNUNET_SYSERR !=
+                    capi->p2p_ciphertext_handler_register
+                    (GNUNET_P2P_PROTO_GAP_RESULT, &handle_p2p_content));
+  GNUNET_GE_ASSERT (ectx,
+                    GNUNET_SYSERR !=
+                    capi->cs_handler_register
+                    (GNUNET_CS_PROTO_GAP_QUERY_START,
+                     &handle_cs_query_start_request));
+  GNUNET_GE_ASSERT (ectx,
+                    GNUNET_SYSERR !=
+                    capi->cs_handler_register
+                    (GNUNET_CS_PROTO_GAP_QUERY_STOP,
+                     &handle_cs_query_stop_request));
+  GNUNET_GE_ASSERT (ectx,
+                    GNUNET_SYSERR !=
+                    capi->cs_handler_register (GNUNET_CS_PROTO_GAP_INSERT,
+                                               &handle_cs_insert_request));
+  GNUNET_GE_ASSERT (ectx,
+                    GNUNET_SYSERR !=
+                    capi->cs_handler_register (GNUNET_CS_PROTO_GAP_INDEX,
+                                               &handle_cs_index_request));
+  GNUNET_GE_ASSERT (ectx,
+                    GNUNET_SYSERR !=
+                    capi->cs_handler_register (GNUNET_CS_PROTO_GAP_INIT_INDEX,
+                                               &handle_cs_init_index_request));
+  GNUNET_GE_ASSERT (ectx,
+                    GNUNET_SYSERR !=
+                    capi->cs_handler_register (GNUNET_CS_PROTO_GAP_DELETE,
+                                               &handle_cs_delete_request));
+  GNUNET_GE_ASSERT (ectx,
+                    GNUNET_SYSERR !=
+                    capi->cs_handler_register (GNUNET_CS_PROTO_GAP_UNINDEX,
+                                               &handle_cs_unindex_request));
+  GNUNET_GE_ASSERT (ectx,
+                    GNUNET_SYSERR !=
+                    capi->cs_handler_register (GNUNET_CS_PROTO_GAP_TESTINDEX,
+                                               
&handle_cs_test_indexed_request));
+  GNUNET_GE_ASSERT (capi->ectx,
+                    0 == GNUNET_GC_set_configuration_value_string (capi->cfg,
+                                                                   capi->ectx,
+                                                                   "ABOUT",
+                                                                   "fs",
+                                                                   gettext_noop
+                                                                   ("enables 
(anonymous) file-sharing")));
+  return GNUNET_OK;
+}
+
+void
+done_module_dv_fs ()
+{
+  GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+                 "fs shutdown\n");
+
+  GNUNET_GE_ASSERT (ectx,
+                    GNUNET_SYSERR !=
+                    coreAPI->p2p_ciphertext_handler_unregister
+                    (GNUNET_P2P_PROTO_GAP_QUERY, &handle_p2p_query));
+
+  GNUNET_GE_ASSERT (ectx,
+                    GNUNET_SYSERR !=
+                    coreAPI->p2p_ciphertext_handler_unregister
+                    (GNUNET_P2P_PROTO_GAP_RESULT, &handle_p2p_content));
+
+  GNUNET_GE_ASSERT (ectx,
+                    GNUNET_SYSERR !=
+                    coreAPI->cs_handler_unregister
+                    (GNUNET_CS_PROTO_GAP_QUERY_START,
+                     &handle_cs_query_start_request));
+  GNUNET_GE_ASSERT (ectx,
+                    GNUNET_SYSERR !=
+                    coreAPI->cs_handler_unregister
+                    (GNUNET_CS_PROTO_GAP_INSERT, &handle_cs_insert_request));
+  GNUNET_GE_ASSERT (ectx,
+                    GNUNET_SYSERR !=
+                    coreAPI->cs_handler_unregister (GNUNET_CS_PROTO_GAP_INDEX,
+                                                    &handle_cs_index_request));
+  GNUNET_GE_ASSERT (ectx,
+                    GNUNET_SYSERR !=
+                    coreAPI->cs_handler_unregister
+                    (GNUNET_CS_PROTO_GAP_INIT_INDEX,
+                     &handle_cs_init_index_request));
+  GNUNET_GE_ASSERT (ectx,
+                    GNUNET_SYSERR !=
+                    coreAPI->cs_handler_unregister
+                    (GNUNET_CS_PROTO_GAP_DELETE, &handle_cs_delete_request));
+  GNUNET_GE_ASSERT (ectx,
+                    GNUNET_SYSERR !=
+                    coreAPI->cs_handler_unregister
+                    (GNUNET_CS_PROTO_GAP_UNINDEX,
+                     &handle_cs_unindex_request));
+  GNUNET_GE_ASSERT (ectx,
+                    GNUNET_SYSERR !=
+                    coreAPI->cs_handler_unregister
+                    (GNUNET_CS_PROTO_GAP_TESTINDEX,
+                     &handle_cs_test_indexed_request));
+  GNUNET_FS_MIGRATION_done ();
+  GNUNET_FS_GAP_done ();
+  GNUNET_FS_DV_DHT_done ();
+  GNUNET_DV_FS_QUERYMANAGER_done ();
+  GNUNET_FS_ONDEMAND_done ();
+  GNUNET_FS_PLAN_done ();
+  GNUNET_FS_ANONYMITY_done ();
+  GNUNET_FS_PT_done ();
+  if (stats != NULL)
+    {
+      coreAPI->service_release (stats);
+      stats = NULL;
+    }
+  coreAPI->service_release (datastore);
+  datastore = NULL;
+  coreAPI->service_release (identity);
+  identity = NULL;
+  GNUNET_FS_lock = NULL;
+}
+
+
+/**
+ * Update FS.
+ */
+void
+update_module_dv_fs (GNUNET_UpdateAPI * uapi)
+{
+  uapi->service_update ("datastore");
+}
+
+
+/* end of dv_fs.c */

Added: GNUnet/src/applications/fs/gap/dv_querymanager.c
===================================================================
--- GNUnet/src/applications/fs/gap/dv_querymanager.c                            
(rev 0)
+++ GNUnet/src/applications/fs/gap/dv_querymanager.c    2009-10-01 18:10:27 UTC 
(rev 9053)
@@ -0,0 +1,746 @@
+/*
+      This file is part of GNUnet
+      (C) 2001 - 2009 Christian Grothoff (and other contributing authors)
+
+      GNUnet is free software; you can redistribute it and/or modify
+      it under the terms of the GNU General Public License as published
+      by the Free Software Foundation; either version 2, or (at your
+      option) any later version.
+
+      GNUnet is distributed in the hope that it will be useful, but
+      WITHOUT ANY WARRANTY; without even the implied warranty of
+      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+      General Public License for more details.
+
+      You should have received a copy of the GNU General Public License
+      along with GNUnet; see the file COPYING.  If not, write to the
+      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+      Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file fs/gap/dv_querymanager.c
+ * @brief management of queries from our clients
+ * @author Christian Grothoff, Nathan Evans
+ *
+ * This code forwards queries (using GAP and DHT) to other peers and
+ * passes replies (from GAP or DHT) back to clients.
+ */
+
+#include "platform.h"
+#include "gnunet_protocols.h"
+#include "gnunet_stats_service.h"
+#include "querymanager.h"
+#include "fs.h"
+#include "fs_dv_dht.h"
+#include "gap.h"
+#include "plan.h"
+#include "pid_table.h"
+#include "shared.h"
+
+#define CHECK_REPEAT_FREQUENCY (150 * GNUNET_CRON_MILLISECONDS)
+
+/**
+ * Linked list with information for each client.
+ */
+struct ClientDataList
+{
+
+  /**
+   * This is a linked list.
+   */
+  struct ClientDataList *next;
+
+  /**
+   * For which client is this data kept?
+   */
+  struct GNUNET_ClientHandle *client;
+
+  /**
+   * List of active requests for the client.
+   */
+  struct RequestList *requests;
+
+  /**
+   * Tail of the requests list.
+   */
+  struct RequestList *request_tail;
+
+};
+
+/**
+ * List of all clients, their active requests and other
+ * per-client information.
+ */
+static struct ClientDataList *clients;
+
+static struct ClientDataList *clients_tail;
+
+static GNUNET_CoreAPIForPlugins *coreAPI;
+
+static GNUNET_Stats_ServiceAPI *stats;
+
+static GNUNET_Datastore_ServiceAPI *datastore;
+
+static int stat_gap_client_query_received;
+
+static int stat_gap_client_response_sent;
+
+static int stat_gap_client_query_tracked;
+
+static int stat_gap_client_query_injected;
+
+static int stat_gap_client_bf_updates;
+
+
+/**
+ * How many bytes should a bloomfilter be if
+ * we have already seen entry_count responses?
+ * Note that GNUNET_GAP_BLOOMFILTER_K gives us the
+ * number of bits set per entry.  Furthermore,
+ * we should not re-size the filter too often
+ * (to keep it cheap).
+ *
+ * Since other peers will also add entries but
+ * not resize the filter, we should generally
+ * pick a slightly larger size than what the
+ * strict math would suggest.
+ *
+ * @return must be a power of two and smaller
+ *         or equal to 2^15.
+ */
+static unsigned int
+compute_bloomfilter_size (unsigned int entry_count)
+{
+  unsigned short size;
+  unsigned short max = 1 << 15;
+  unsigned int ideal = (entry_count * GNUNET_GAP_BLOOMFILTER_K) / 4;
+
+  if (entry_count > max)
+    return max;
+  size = 8;
+  while ((size < max) && (size < ideal))
+    size *= 2;
+  return size;
+}
+
+static int
+mark_response_seen (const GNUNET_HashCode * key, void *value, void *cls)
+{
+  GNUNET_FS_SHARED_mark_response_seen (key, cls);
+  return GNUNET_OK;
+}
+
+/**
+ * A client is asking us to run a query.  The query should be issued
+ * until either a unique response has been obtained or until the
+ * client disconnects.
+ *
+ * @param target peer known to have the content, maybe NULL.
+ */
+void
+GNUNET_DV_FS_QUERYMANAGER_start_query (const GNUNET_HashCode * query,
+                                    unsigned int key_count,
+                                    unsigned int anonymityLevel,
+                                    unsigned int type,
+                                    struct GNUNET_ClientHandle *client,
+                                    const GNUNET_PeerIdentity * target,
+                                    const struct GNUNET_MultiHashMap *seen,
+                                    int have_more)
+{
+  struct ClientDataList *cl;
+  struct RequestList *request;
+
+  GNUNET_GE_ASSERT (NULL, key_count > 0);
+  if (stats != NULL)
+    {
+      stats->change (stat_gap_client_query_tracked, 1);
+      stats->change (stat_gap_client_query_received, 1);
+    }
+  request =
+    GNUNET_malloc (sizeof (struct RequestList) +
+                   (key_count - 1) * sizeof (GNUNET_HashCode));
+  memset (request, 0, sizeof (struct RequestList));
+  request->anonymityLevel = anonymityLevel;
+  request->key_count = key_count;
+  request->type = type;
+  request->primary_target = GNUNET_FS_PT_intern (target);
+  request->response_client = client;
+  request->policy = GNUNET_FS_RoutingPolicy_ALL;
+  if (have_more != GNUNET_NO)
+    request->have_more = GNUNET_GAP_HAVE_MORE_INCREMENT;
+  memcpy (&request->queries[0], query, sizeof (GNUNET_HashCode) * key_count);
+  if (seen != NULL)
+    {
+      request->bloomfilter_entry_count = GNUNET_multi_hash_map_size (seen);
+      request->bloomfilter_size =
+        compute_bloomfilter_size (request->bloomfilter_entry_count);
+      request->bloomfilter_mutator =
+        GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, -1);
+      request->bloomfilter =
+        GNUNET_bloomfilter_init (NULL, NULL, request->bloomfilter_size,
+                                 GNUNET_GAP_BLOOMFILTER_K);
+      if (stats != NULL)
+        stats->change (stat_gap_client_bf_updates, 1);
+
+      GNUNET_multi_hash_map_iterate (seen, &mark_response_seen, request);
+    }
+  GNUNET_mutex_lock (GNUNET_FS_lock);
+  cl = clients;
+  while ((cl != NULL) && (cl->client != client))
+    cl = cl->next;
+  if (cl == NULL)
+    {
+      cl = GNUNET_malloc (sizeof (struct ClientDataList));
+      memset (cl, 0, sizeof (struct ClientDataList));
+      cl->client = client;
+      cl->next = clients;
+      clients = cl;
+      if (clients_tail == NULL)
+        clients_tail = cl;
+    }
+  request->next = cl->requests;
+  cl->requests = request;
+  if (cl->request_tail == NULL)
+    cl->request_tail = request;
+  if ((GNUNET_YES == GNUNET_FS_PLAN_request (client, 0, request)) &&
+      (stats != NULL))
+    stats->change (stat_gap_client_query_injected, 1);
+  if (anonymityLevel == 0)
+    {
+      request->last_dht_get = GNUNET_get_time ();
+      request->dht_back_off = GNUNET_GAP_MAX_DHT_DELAY;
+    }
+  GNUNET_mutex_unlock (GNUNET_FS_lock);
+  if (anonymityLevel == 0)
+    GNUNET_FS_DV_DHT_execute_query (type, query);
+}
+
+/**
+ * A client is asking us to stop running a query (without disconnect).
+ */
+int
+GNUNET_DV_FS_QUERYMANAGER_stop_query (const GNUNET_HashCode * query,
+                                   unsigned int key_count,
+                                   unsigned int anonymityLevel,
+                                   unsigned int type,
+                                   struct GNUNET_ClientHandle *client)
+{
+  struct ClientDataList *cl;
+  struct ClientDataList *cprev;
+  struct RequestList *pos;
+  struct RequestList *rprev;
+
+  GNUNET_mutex_lock (GNUNET_FS_lock);
+  cl = clients;
+  cprev = NULL;
+  while ((cl != NULL) && (cl->client != client))
+    {
+      cprev = cl;
+      cl = cl->next;
+    }
+  if (cl == NULL)
+    {
+      GNUNET_mutex_unlock (GNUNET_FS_lock);
+      return GNUNET_SYSERR;
+    }
+  rprev = NULL;
+  pos = cl->requests;
+  while (pos != NULL)
+    {
+      if ((pos->type == type) &&
+          (pos->key_count == key_count) &&
+          (0 == memcmp (query,
+                        &pos->queries[0],
+                        sizeof (GNUNET_HashCode) * key_count)) &&
+          (pos->anonymityLevel == anonymityLevel))
+        break;
+      rprev = pos;
+      pos = pos->next;
+    }
+  if (pos == NULL)
+    {
+      GNUNET_mutex_unlock (GNUNET_FS_lock);
+      return GNUNET_SYSERR;
+    }
+  if (cl->request_tail == pos)
+    cl->request_tail = rprev;
+  if (rprev == NULL)
+    cl->requests = pos->next;
+  else
+    rprev->next = pos->next;
+  GNUNET_FS_SHARED_free_request_list (pos);
+  if (stats != NULL)
+    stats->change (stat_gap_client_query_tracked, -1);
+  if (cl->requests == NULL)
+    {
+      if (cl == clients_tail)
+        clients_tail = cprev;
+      if (cprev == NULL)
+        clients = cl->next;
+      else
+        cprev->next = cl->next;
+      GNUNET_free (cl);
+    }
+  GNUNET_mutex_unlock (GNUNET_FS_lock);
+  return GNUNET_OK;
+}
+
+struct IteratorClosure
+{
+  struct GNUNET_BloomFilter *filter;
+  int mingle_number;
+};
+
+/**
+ * Iterator over Map hash codes.
+ *
+ * @param arg pointer to a location where we
+ *        have our current index into the linked list.
+ * @return GNUNET_YES if we have more,
+ *         GNUNET_NO if this is the last entry
+ */
+static int
+response_bf_iterator (const GNUNET_HashCode * key, void *value, void *arg)
+{
+  struct IteratorClosure *cls = arg;
+  GNUNET_HashCode n;
+
+  GNUNET_FS_HELPER_mingle_hash (key, cls->mingle_number, &n);
+  GNUNET_bloomfilter_add (cls->filter, &n);
+  return GNUNET_YES;
+}
+
+/**
+ * We got a response for a client request.
+ * Check if we have seen this response already.
+ * If not, check if it truly matches (namespace!).
+ * If so, transmit to client and update response
+ * lists and bloomfilter accordingly.
+ *
+ * @param value how much is this response worth to us?
+ *        the function should increment value accordingly
+ * @return GNUNET_OK if this was the last response
+ *         and we should remove the request entry.
+ *         GNUNET_NO if we should continue looking
+ *         GNUNET_SYSERR on serious errors
+ */
+static int
+handle_response (PID_INDEX sender,
+                 struct GNUNET_ClientHandle *client,
+                 struct RequestList *rl,
+                 const GNUNET_HashCode * primary_key,
+                 GNUNET_CronTime expirationTime,
+                 unsigned int size, const GNUNET_EC_DBlock * data,
+                 unsigned int *value)
+{
+  struct IteratorClosure ic;
+  CS_fs_reply_content_MESSAGE *msg;
+  GNUNET_HashCode hc;
+  int ret;
+  unsigned int bf_size;
+
+  /* check that content matches query */
+  ret = GNUNET_FS_SHARED_test_valid_new_response (rl,
+                                                  primary_key,
+                                                  size, data, &hc);
+  if (ret != GNUNET_OK)
+    return ret;
+  if (sender == 0)              /* dht produced response */
+    rl->dht_back_off = GNUNET_GAP_MAX_DHT_DELAY;        /* go back! */
+  /* send to client */
+  msg = GNUNET_malloc (sizeof (CS_fs_reply_content_MESSAGE) + size);
+  msg->header.size = htons (sizeof (CS_fs_reply_content_MESSAGE) + size);
+  msg->header.type = htons (GNUNET_CS_PROTO_GAP_RESULT);
+  msg->anonymity_level = htonl (0);     /* unknown */
+  msg->expiration_time = GNUNET_htonll (expirationTime);
+  memcpy (&msg[1], data, size);
+  ret = coreAPI->cs_send_message (client,
+                                  &msg->header,
+                                  (rl->type != GNUNET_ECRS_BLOCKTYPE_DATA)
+                                  ? GNUNET_NO : GNUNET_YES);
+  GNUNET_free (msg);
+  if (ret != GNUNET_OK)
+    return GNUNET_NO;
+  if (stats != NULL)
+    stats->change (stat_gap_client_response_sent, 1);
+
+  /* update *value */
+  *value += 1 + rl->value;
+  GNUNET_FS_PLAN_success (sender, client, 0, rl);
+
+  if (rl->type == GNUNET_ECRS_BLOCKTYPE_DATA)
+    return GNUNET_OK;           /* the end */
+
+  /* update bloom filter */
+  rl->bloomfilter_entry_count++;
+  bf_size = compute_bloomfilter_size (rl->bloomfilter_entry_count);
+  if (rl->bloomfilter == NULL)
+    {
+      rl->bloomfilter_mutator
+        = GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, -1);
+      rl->bloomfilter_size = bf_size;
+      rl->bloomfilter = GNUNET_bloomfilter_init (NULL,
+                                                 NULL,
+                                                 rl->bloomfilter_size,
+                                                 GNUNET_GAP_BLOOMFILTER_K);
+      if (stats != NULL)
+        stats->change (stat_gap_client_bf_updates, 1);
+    }
+  else if (rl->bloomfilter_size != bf_size)
+    {
+      rl->bloomfilter_mutator
+        = GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, -1);
+      GNUNET_bloomfilter_free (rl->bloomfilter);
+      rl->bloomfilter =
+        GNUNET_bloomfilter_init (NULL,
+                                 NULL, bf_size, GNUNET_GAP_BLOOMFILTER_K);
+      ic.filter = rl->bloomfilter;
+      ic.mingle_number = rl->bloomfilter_mutator;
+      if (rl->responses != NULL)
+        GNUNET_multi_hash_map_iterate (rl->responses,
+                                       &response_bf_iterator, &ic);
+      if (stats != NULL)
+        stats->change (stat_gap_client_bf_updates, 1);
+    }
+  GNUNET_FS_SHARED_mark_response_seen (&hc, rl);
+
+  /* we want more */
+  return GNUNET_NO;
+}
+
+/**
+ * Handle the given response (by forwarding it to
+ * other peers as necessary).
+ *
+ * @param sender who send the response (good too know
+ *        for future routing decisions)
+ * @param primary_query hash code used for lookup
+ *        (note that namespace membership may
+ *        require additional verification that has
+ *        not yet been performed; checking the
+ *        signature has already been done)
+ * @param size size of the data
+ * @param data the data itself (a GNUNET_EC_DBlock)
+ * @return how much was this content worth to us?
+ */
+unsigned int
+GNUNET_DV_FS_QUERYMANAGER_handle_response (const GNUNET_PeerIdentity * sender,
+                                        const GNUNET_HashCode * primary_query,
+                                        GNUNET_CronTime expirationTime,
+                                        unsigned int size,
+                                        const GNUNET_EC_DBlock * data)
+{
+  struct ClientDataList *cl;
+  struct RequestList *rl;
+  struct RequestList *prev;
+  unsigned int value;
+  PID_INDEX rid;
+
+  rid = GNUNET_FS_PT_intern (sender);
+  GNUNET_mutex_lock (GNUNET_FS_lock);
+  value = 0;
+  cl = clients;
+  while (cl != NULL)
+    {
+      rl = cl->requests;
+      prev = NULL;
+      while (rl != NULL)
+        {
+          if (GNUNET_OK ==
+              handle_response (rid,
+                               cl->client,
+                               rl,
+                               primary_query,
+                               expirationTime, size, data, &value))
+            {
+              if (prev != NULL)
+                prev->next = rl->next;
+              else
+                cl->requests = rl->next;
+              if (rl == cl->request_tail)
+                cl->request_tail = prev;
+              GNUNET_FS_SHARED_free_request_list (rl);
+              if (stats != NULL)
+                stats->change (stat_gap_client_query_tracked, -1);
+              if (prev == NULL)
+                rl = cl->requests;
+              else
+                rl = prev->next;
+            }
+          else
+            {
+              prev = rl;
+              rl = rl->next;
+            }
+        }
+      cl = cl->next;
+    }
+
+  GNUNET_mutex_unlock (GNUNET_FS_lock);
+  GNUNET_FS_PT_change_rc (rid, -1);
+  return value;
+}
+
+/**
+ * Method called whenever a given client disconnects.
+ * Frees all of the associated data structures.
+ */
+static void
+handle_client_exit (struct GNUNET_ClientHandle *client)
+{
+  struct ClientDataList *cl;
+  struct ClientDataList *prev;
+  struct RequestList *rl;
+
+  GNUNET_mutex_lock (GNUNET_FS_lock);
+  cl = clients;
+  prev = NULL;
+  while ((cl != NULL) && (cl->client != client))
+    {
+      prev = cl;
+      cl = cl->next;
+    }
+  if (cl == clients_tail)
+    clients_tail = prev;
+  if (cl != NULL)
+    {
+      while (cl->requests != NULL)
+        {
+          rl = cl->requests;
+          cl->requests = rl->next;
+          GNUNET_FS_SHARED_free_request_list (rl);
+          if (stats != NULL)
+            stats->change (stat_gap_client_query_tracked, -1);
+        }
+      if (prev == NULL)
+        clients = cl->next;
+      else
+        prev->next = cl->next;
+      GNUNET_free (cl);
+    }
+  GNUNET_mutex_unlock (GNUNET_FS_lock);
+}
+
+
+struct HMClosure
+{
+  struct RequestList *request;
+  unsigned int processed;
+  int have_more;
+};
+
+/**
+ * Any response that we get should be passed
+ * back to the client.  If the response is unique,
+ * we should about the iteration (return GNUNET_SYSERR).
+ */
+static int
+have_more_processor (const GNUNET_HashCode * key,
+                     const GNUNET_DatastoreValue *
+                     value, void *closure, unsigned long long uid)
+{
+  struct HMClosure *cls = closure;
+  GNUNET_HashCode hc;
+  int ret;
+
+  ret = GNUNET_FS_HELPER_send_to_client (coreAPI,
+                                         key, value,
+                                         cls->request->response_client,
+                                         cls->request, &hc);
+  if (ret != GNUNET_OK)
+    {
+      /* client can take no more right now */
+      cls->have_more = GNUNET_YES;
+      return ret;               /* NO: delete, SYSERR: abort */
+    }
+  GNUNET_FS_SHARED_mark_response_seen (&hc, cls->request);
+  cls->processed++;
+  if (cls->processed > GNUNET_GAP_MAX_ASYNC_PROCESSED)
+    {
+      cls->have_more = GNUNET_YES;
+      return GNUNET_SYSERR;
+    }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Cron-job to periodically check if we should
+ * repeat requests.
+ */
+static void
+repeat_requests_job (void *unused)
+{
+  struct HMClosure hmc;
+  struct ClientDataList *client;
+  struct RequestList *request;
+  struct RequestList *prev;
+  GNUNET_CronTime now;
+
+  GNUNET_mutex_lock (GNUNET_FS_lock);
+  if (clients == NULL)
+    {
+      GNUNET_mutex_unlock (GNUNET_FS_lock);
+      return;
+    }
+  now = GNUNET_get_time ();
+  client = clients;
+  if (clients_tail != client)
+    {
+      /* move client to tail of list */
+      GNUNET_GE_ASSERT (NULL, clients_tail->next == NULL);
+      clients = clients->next;
+      clients_tail->next = client;
+      clients_tail = client;
+      client->next = NULL;
+    }
+  request = client->requests;
+  if (request == NULL)
+    {
+      GNUNET_mutex_unlock (GNUNET_FS_lock);
+      return;
+    }
+  if (client->request_tail != request)
+    {
+      /* move request to tail of list */
+      GNUNET_GE_ASSERT (NULL, client->request_tail->next == NULL);
+      client->requests = request->next;
+      client->request_tail->next = request;
+      prev = client->request_tail;
+      client->request_tail = request;
+      request->next = NULL;
+    }
+  else
+    {
+      prev = NULL;
+    }
+  GNUNET_GE_ASSERT (NULL, request->next == NULL);
+  GNUNET_GE_ASSERT (NULL, client->request_tail->next == NULL);
+  if ((client->client != NULL) &&
+      (GNUNET_OK !=
+       coreAPI->cs_send_message_now_test (client->client,
+                                          GNUNET_GAP_ESTIMATED_DATA_SIZE,
+                                          GNUNET_NO)))
+    {
+      GNUNET_mutex_unlock (GNUNET_FS_lock);
+      return;
+    }
+  if (request->have_more > 0)
+    {
+      request->have_more--;
+      hmc.request = request;
+      hmc.processed = 0;
+      hmc.have_more = GNUNET_NO;
+
+      if (request->type == GNUNET_ECRS_BLOCKTYPE_DATA)
+        {
+          if (((1 == datastore->get (&request->queries[0], request->type,
+                                     &have_more_processor, &hmc)) ||
+               (1 == datastore->get (&request->queries[0],
+                                     GNUNET_ECRS_BLOCKTYPE_ONDEMAND,
+                                     &have_more_processor, &hmc))) &&
+              (hmc.have_more == GNUNET_NO))
+            {
+              if (prev == NULL)
+                {
+                  client->request_tail = NULL;
+                  client->requests = NULL;
+                }
+              else
+                {
+                  prev->next = NULL;
+                  if (client->request_tail == request)
+                    client->request_tail = prev;
+                }
+              GNUNET_FS_SHARED_free_request_list (request);
+              if (stats != NULL)
+                stats->change (stat_gap_client_query_tracked, -1);
+            }
+        }
+      else
+        {
+          datastore->get (&request->queries[0], request->type,
+                          &have_more_processor, &hmc);
+        }
+      if (hmc.have_more)
+        request->have_more += GNUNET_GAP_HAVE_MORE_INCREMENT;
+    }
+  else
+    {
+      if ((NULL == request->plan_entries) &&
+          ((client->client != NULL) ||
+           (request->expiration > now)) &&
+          (request->last_ttl_used * GNUNET_CRON_SECONDS +
+           request->last_request_time < now))
+        {
+          if ((GNUNET_OK ==
+               GNUNET_FS_PLAN_request (client->client, 0, request))
+              && (stats != NULL))
+            stats->change (stat_gap_client_query_injected, 1);
+        }
+
+      if ((request->anonymityLevel == 0) &&
+          (request->last_dht_get + request->dht_back_off < now))
+        {
+          if (request->dht_back_off * 2 > request->dht_back_off)
+            request->dht_back_off *= 2;
+          request->last_dht_get = now;
+          GNUNET_FS_DV_DHT_execute_query (request->type, &request->queries[0]);
+        }
+    }
+  GNUNET_mutex_unlock (GNUNET_FS_lock);
+}
+
+int
+GNUNET_DV_FS_QUERYMANAGER_init (GNUNET_CoreAPIForPlugins * capi)
+{
+  coreAPI = capi;
+  GNUNET_GE_ASSERT (capi->ectx,
+                    GNUNET_SYSERR !=
+                    capi->cs_disconnect_handler_register
+                    (&handle_client_exit));
+  datastore = capi->service_request ("datastore");
+  stats = capi->service_request ("stats");
+  if (stats != NULL)
+    {
+      stat_gap_client_query_received =
+        stats->create (gettext_noop ("# gap client queries received"));
+      stat_gap_client_response_sent =
+        stats->create (gettext_noop ("# gap replies sent to clients"));
+      stat_gap_client_query_tracked =
+        stats->create (gettext_noop ("# gap client requests tracked"));
+      stat_gap_client_query_injected =
+        stats->create (gettext_noop ("# gap client requests injected"));
+      stat_gap_client_bf_updates =
+        stats->create (gettext_noop
+                       ("# gap query bloomfilter resizing updates"));
+    }
+  GNUNET_cron_add_job (capi->cron,
+                       &repeat_requests_job,
+                       CHECK_REPEAT_FREQUENCY, CHECK_REPEAT_FREQUENCY, NULL);
+  return 0;
+}
+
+int
+GNUNET_DV_FS_QUERYMANAGER_done ()
+{
+  GNUNET_cron_del_job (coreAPI->cron,
+                       &repeat_requests_job, CHECK_REPEAT_FREQUENCY, NULL);
+  GNUNET_GE_ASSERT (coreAPI->ectx,
+                    GNUNET_SYSERR !=
+                    coreAPI->cs_disconnect_handler_unregister
+                    (&handle_client_exit));
+  while (clients != NULL)
+    handle_client_exit (clients->client);
+  coreAPI->service_release (datastore);
+  datastore = NULL;
+  if (stats != NULL)
+    {
+      coreAPI->service_release (stats);
+      stats = NULL;
+    }
+  return 0;
+}
+
+/* end of dv_querymanager.c */

Added: GNUnet/src/applications/fs/gap/dv_querymanager.h
===================================================================
--- GNUnet/src/applications/fs/gap/dv_querymanager.h                            
(rev 0)
+++ GNUnet/src/applications/fs/gap/dv_querymanager.h    2009-10-01 18:10:27 UTC 
(rev 9053)
@@ -0,0 +1,90 @@
+/*
+      This file is part of GNUnet
+      (C) 2001, 2002, 2003, 2004, 2005, 2006, 2008 Christian Grothoff (and 
other contributing authors)
+
+      GNUnet is free software; you can redistribute it and/or modify
+      it under the terms of the GNU General Public License as published
+      by the Free Software Foundation; either version 2, or (at your
+      option) any later version.
+
+      GNUnet is distributed in the hope that it will be useful, but
+      WITHOUT ANY WARRANTY; without even the implied warranty of
+      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+      General Public License for more details.
+
+      You should have received a copy of the GNU General Public License
+      along with GNUnet; see the file COPYING.  If not, write to the
+      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+      Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file fs/gap/dv_querymanager.h
+ * @brief management of queries from our clients
+ * @author Christian Grothoff, Nathan Evans
+ */
+#ifndef DV_QUERYMANAGER_H
+#define DV_QUERYMANAGER_H
+
+#include "gnunet_util.h"
+#include "gnunet_core.h"
+#include "ecrs_core.h"
+#include "shared.h"
+
+int GNUNET_DV_FS_QUERYMANAGER_init (GNUNET_CoreAPIForPlugins * capi);
+
+int GNUNET_DV_FS_QUERYMANAGER_done (void);
+
+
+/**
+ * A client is asking us to run a query.  The query should be issued
+ * until either a unique response has been obtained, the client
+ * requests us to stop or until the client disconnects.
+ *
+ * @param target peer known to have the content, maybe NULL.
+ * @param have_more do we have more results in our local datastore?
+ */
+void
+GNUNET_DV_FS_QUERYMANAGER_start_query (const GNUNET_HashCode * query,
+                                    unsigned int key_count,
+                                    unsigned int anonymityLevel,
+                                    unsigned int type,
+                                    struct GNUNET_ClientHandle *client,
+                                    const GNUNET_PeerIdentity * target,
+                                    const struct GNUNET_MultiHashMap *seen,
+                                    int have_more);
+
+/**
+ * A client is asking us to stop running a query (without disconnect).
+ */
+int
+GNUNET_DV_FS_QUERYMANAGER_stop_query (const GNUNET_HashCode * query,
+                                   unsigned int key_count,
+                                   unsigned int anonymityLevel,
+                                   unsigned int type,
+                                   struct GNUNET_ClientHandle *client);
+
+/**
+ * Handle the given response (by forwarding it to
+ * other peers as necessary).
+ *
+ * @param sender who send the response (good too know
+ *        for future routing decisions)
+ * @param primary_query hash code used for lookup
+ *        (note that namespace membership may
+ *        require additional verification that has
+ *        not yet been performed; checking the
+ *        signature has already been done)
+ * @param size size of the data
+ * @param data the data itself (a GNUNET_EC_DBlock)
+ * @return how much was this content worth to us?
+ */
+unsigned int
+GNUNET_DV_FS_QUERYMANAGER_handle_response (const GNUNET_PeerIdentity * sender,
+                                        const GNUNET_HashCode * primary_query,
+                                        GNUNET_CronTime expirationTime,
+                                        unsigned int size,
+                                        const GNUNET_EC_DBlock * data);
+
+
+#endif

Added: GNUnet/src/applications/fs/gap/fs_dv_dht.c
===================================================================
--- GNUnet/src/applications/fs/gap/fs_dv_dht.c                          (rev 0)
+++ GNUnet/src/applications/fs/gap/fs_dv_dht.c  2009-10-01 18:10:27 UTC (rev 
9053)
@@ -0,0 +1,291 @@
+/*
+      This file is part of GNUnet
+      (C) 2001 - 2009 Christian Grothoff (and other contributing authors)
+
+      GNUnet is free software; you can redistribute it and/or modify
+      it under the terms of the GNU General Public License as published
+      by the Free Software Foundation; either version 2, or (at your
+      option) any later version.
+
+      GNUnet is distributed in the hope that it will be useful, but
+      WITHOUT ANY WARRANTY; without even the implied warranty of
+      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+      General Public License for more details.
+
+      You should have received a copy of the GNU General Public License
+      along with GNUnet; see the file COPYING.  If not, write to the
+      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+      Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file fs/gap/fs_dv_dht.c
+ * @brief integration of file-sharing with the DHT
+ *        infrastructure
+ * @author Christian Grothoff, Nathan Evans
+ */
+
+#include "platform.h"
+#include "gnunet_dv_dht_service.h"
+#include "gnunet_sqstore_service.h"
+#include "gnunet_stats_service.h"
+#include "gnunet_protocols.h"
+#include "ecrs_core.h"
+#include "fs.h"
+#include "shared.h"
+#include "fs_dv_dht.h"
+#include "dv_querymanager.h"
+
+/**
+ * Linked list containing the DHT get handles
+ * of our active requests.
+ */
+struct ActiveRequestRecords
+{
+
+  struct ActiveRequestRecords *next;
+
+  struct GNUNET_DV_DHT_GetHandle *handle;
+
+  GNUNET_CronTime end_time;
+
+  unsigned int type;
+
+};
+
+static GNUNET_DV_DHT_ServiceAPI *dv_dht;
+
+static GNUNET_SQstore_ServiceAPI *sqstore;
+
+static GNUNET_CoreAPIForPlugins *coreAPI;
+
+static GNUNET_Stats_ServiceAPI *stats;
+
+static int stat_push_count;
+
+static struct ActiveRequestRecords *records;
+
+/**
+ * Thread that does the pushing.
+ */
+static struct GNUNET_ThreadHandle *thread;
+
+/**
+ * Should the thread terminate?
+ */
+static int shutdown_requested;
+
+/**
+ * Total number of entries with anonymity 0.
+ * Used to calculate how long we should wait
+ * between iterations.
+ */
+static unsigned int total;
+
+
+/**
+ * Cancel all requests with the DHT that
+ * are older than a certain time limit.
+ */
+static void
+purge_old_records (GNUNET_CronTime limit)
+{
+  struct ActiveRequestRecords *pos;
+  struct ActiveRequestRecords *prev;
+
+  prev = NULL;
+  pos = records;
+  while (pos != NULL)
+    {
+      if (pos->end_time < limit)
+        {
+          if (prev == NULL)
+            records = pos->next;
+          else
+            prev->next = pos->next;
+          dv_dht->get_stop (pos->handle);
+          GNUNET_free (pos);
+          if (prev == NULL)
+            pos = records;
+          else
+            pos = prev->next;
+        }
+      else
+        {
+          prev = pos;
+          pos = pos->next;
+        }
+    }
+}
+
+
+/**
+ * We got a result from the DHT.  Check that it is valid
+ * and pass to our clients.
+ *
+ * @param key the current key
+ * @param value the current value
+ * @param cls argument passed for context (closure)
+ * @return GNUNET_OK to continue with iteration, GNUNET_SYSERR to abort
+ */
+static int
+response_callback (const GNUNET_HashCode * key,
+                   unsigned int type,
+                   unsigned int size, const char *value, void *cls)
+{
+  struct ActiveRequestRecords *record = cls;
+  const GNUNET_EC_DBlock *dblock;
+  GNUNET_HashCode hc;
+
+  dblock = (const GNUNET_EC_DBlock *) value;
+  if ((GNUNET_SYSERR ==
+       GNUNET_EC_file_block_check_and_get_query (size,
+                                                 dblock,
+                                                 GNUNET_YES,
+                                                 &hc)) ||
+      (0 != memcmp (key, &hc, sizeof (GNUNET_HashCode))))
+    {
+      GNUNET_GE_BREAK_OP (NULL, 0);
+      return GNUNET_OK;
+    }
+  GNUNET_DV_FS_QUERYMANAGER_handle_response (NULL, &hc, 0, size, dblock);
+  if (record->type == GNUNET_ECRS_BLOCKTYPE_DATA)
+    {
+      record->end_time = 0;     /* delete ASAP */
+      return GNUNET_SYSERR;     /* no more! */
+    }
+  return GNUNET_OK;
+}
+
+/**
+ * Execute a GAP query.  Determines where to forward
+ * the query and when (and captures state for the response).
+ * May also have to check the local datastore.
+ *
+ * @param type type of content requested
+ * @param query hash code of the query
+ */
+void
+GNUNET_FS_DV_DHT_execute_query (unsigned int type, const GNUNET_HashCode * 
query)
+{
+  struct ActiveRequestRecords *record;
+  GNUNET_CronTime now;
+
+  if (dv_dht == NULL)
+    return;
+  now = GNUNET_get_time ();
+  record = GNUNET_malloc (sizeof (struct ActiveRequestRecords));
+  record->end_time = now + GNUNET_GAP_MAX_DHT_DELAY;
+  record->type = type;
+  record->handle = dv_dht->get_start (type, query, &response_callback, record);
+  if (record->handle == NULL)
+    {
+      GNUNET_free (record);
+      return;                   /* failed in DHT */
+    }
+  GNUNET_mutex_lock (GNUNET_FS_lock);
+  record->next = records;
+  records = record;
+  purge_old_records (now);
+  GNUNET_mutex_unlock (GNUNET_FS_lock);
+}
+
+/**
+ * Callback invoked on zero-anonymity content
+ * (used to push that content into the DHT).
+ */
+static int
+push_callback (const GNUNET_HashCode * key,
+               const GNUNET_DatastoreValue * value, void *closure,
+               unsigned long long uid)
+{
+  GNUNET_CronTime delay;
+
+  if (GNUNET_YES == shutdown_requested)
+    return GNUNET_SYSERR;
+  /* try pushing out everything every 6h,
+     but do not push more often than every 5s */
+  delay = 6 * GNUNET_CRON_HOURS / total;
+  if (delay < 5 * GNUNET_CRON_SECONDS)
+    delay = 5 * GNUNET_CRON_SECONDS;
+  if (delay > 60 * GNUNET_CRON_SECONDS)
+    delay = 60 * GNUNET_CRON_SECONDS;
+  GNUNET_thread_sleep (delay);
+  if (GNUNET_YES == shutdown_requested)
+    return GNUNET_SYSERR;
+
+  dv_dht->put (key,
+            ntohl (value->type),
+            ntohl (value->size) - sizeof (GNUNET_DatastoreValue),
+            (const char *) &value[1]);
+  if (stats != NULL)
+    stats->change (stat_push_count, 1);
+  if (GNUNET_YES == shutdown_requested)
+    return GNUNET_SYSERR;
+  return GNUNET_OK;
+}
+
+/**
+ * Main method of the thread responsible for pushing
+ * out the content.
+ */
+static void *
+push_thread (void *cls)
+{
+  while ((shutdown_requested == GNUNET_NO) &&
+         (dv_dht != NULL) && (sqstore != NULL))
+    {
+      if (total == 0)
+        total = 1;
+      total = sqstore->iterateNonAnonymous (0, &push_callback, NULL);
+      if ((shutdown_requested == GNUNET_NO) && (total == 0))
+        GNUNET_thread_sleep (5 * GNUNET_CRON_MINUTES);
+    }
+  return NULL;
+}
+
+
+int
+GNUNET_FS_DV_DHT_init (GNUNET_CoreAPIForPlugins * capi)
+{
+  coreAPI = capi;
+  dv_dht = capi->service_request ("dv_dht");
+  sqstore = capi->service_request ("sqstore");
+  stats = capi->service_request ("stats");
+  if (stats != NULL)
+    stat_push_count
+      = stats->create (gettext_noop ("# blocks pushed into DHT"));
+  if ((dv_dht != NULL) && (sqstore != NULL))
+    {
+      shutdown_requested = GNUNET_NO;
+      thread = GNUNET_thread_create (&push_thread, NULL, 1024 * 128);
+    }
+  return 0;
+}
+
+int
+GNUNET_FS_DV_DHT_done ()
+{
+  void *unused;
+
+  purge_old_records (-1);
+  if (thread != NULL)
+    {
+      shutdown_requested = GNUNET_YES;
+      GNUNET_thread_stop_sleep (thread);
+      GNUNET_thread_join (thread, &unused);
+    }
+  if (stats != NULL)
+    {
+      coreAPI->service_release (stats);
+      stats = NULL;
+    }
+  if (dv_dht != NULL)
+    coreAPI->service_release (dv_dht);
+  dv_dht = NULL;
+  if (sqstore != NULL)
+    coreAPI->service_release (sqstore);
+  sqstore = NULL;
+  coreAPI = NULL;
+  return 0;
+}

Added: GNUnet/src/applications/fs/gap/fs_dv_dht.h
===================================================================
--- GNUnet/src/applications/fs/gap/fs_dv_dht.h                          (rev 0)
+++ GNUnet/src/applications/fs/gap/fs_dv_dht.h  2009-10-01 18:10:27 UTC (rev 
9053)
@@ -0,0 +1,48 @@
+/*
+      This file is part of GNUnet
+      (C) 2001 - 2009 Christian Grothoff (and other contributing authors)
+
+      GNUnet is free software; you can redistribute it and/or modify
+      it under the terms of the GNU General Public License as published
+      by the Free Software Foundation; either version 2, or (at your
+      option) any later version.
+
+      GNUnet is distributed in the hope that it will be useful, but
+      WITHOUT ANY WARRANTY; without even the implied warranty of
+      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+      General Public License for more details.
+
+      You should have received a copy of the GNU General Public License
+      along with GNUnet; see the file COPYING.  If not, write to the
+      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+      Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file fs/gap/fs_dv_dht.h
+ * @brief integration of file-sharing with the DV_DHT
+ *        infrastructure
+ * @author Christian Grothoff, Nathan Evans
+ */
+#ifndef FS_DV_DHT_H
+#define FS_DV_DHT_H
+
+#include "gnunet_util.h"
+
+int GNUNET_FS_DV_DHT_init (GNUNET_CoreAPIForPlugins * capi);
+
+int GNUNET_FS_DV_DHT_done (void);
+
+/**
+ * Execute a GAP query.  Determines where to forward
+ * the query and when (and captures state for the response).
+ * May also have to check the local datastore.
+ *
+ * @param type type of content requested
+ * @param query hash code of the query
+ */
+void
+GNUNET_FS_DV_DHT_execute_query (unsigned int type,
+                             const GNUNET_HashCode * query);
+
+#endif

Added: GNUnet/src/applications/fs/gap/test_multi_results_dv.c
===================================================================
--- GNUnet/src/applications/fs/gap/test_multi_results_dv.c                      
        (rev 0)
+++ GNUnet/src/applications/fs/gap/test_multi_results_dv.c      2009-10-01 
18:10:27 UTC (rev 9053)
@@ -0,0 +1,227 @@
+/*
+     This file is part of GNUnet.
+     (C) 2005, 2006, 2007, 2008 Christian Grothoff (and other contributing 
authors)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 2, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
+*/
+
+/**
+ * @file applications/gap/test_multi_results.c
+ * @brief GAP routing testcase, linear topology
+ * @author Christian Grothoff
+ */
+
+#include "platform.h"
+#include "gnunet_protocols.h"
+#include "gnunet_ecrs_lib.h"
+#include "gnunet_testing_lib.h"
+#include "gnunet_stats_lib.h"
+#include "gnunet_util.h"
+#include "gnunet_stats_lib.h"
+#include "../ecrs/ecrs.h"
+
+#define START_PEERS 1
+#define DEBUG 1
+#define PEER_COUNT 2
+
+/**
+ * How many search results are there?
+ */
+#define TOTAL 40
+
+static struct GNUNET_GE_Context *ectx;
+
+static struct GNUNET_GC_Configuration *cfg;
+
+static struct GNUNET_ECRS_URI *uris[TOTAL];
+
+static struct GNUNET_ECRS_URI *key;
+
+static unsigned int found;
+
+static int
+testTerminate (void *unused)
+{
+  /* wait for us to find 90% */
+  return (found > (TOTAL * 90) / 100) ? GNUNET_SYSERR : GNUNET_OK;
+}
+
+static char *
+makeName (unsigned int i)
+{
+  char *fn;
+
+  fn = GNUNET_malloc (strlen ("/tmp/gaptest/GAPTEST") + 14);
+  GNUNET_snprintf (fn,
+                   strlen ("/tmp/gaptest/GAPTEST") + 14,
+                   "/tmp/gaptest/GAPTEST%u", i);
+  GNUNET_disk_directory_create_for_file (NULL, fn);
+  return fn;
+}
+
+static struct GNUNET_ECRS_URI *
+uploadFile (int size)
+{
+  int ret;
+  char *name;
+  int fd;
+  char *buf;
+  struct GNUNET_ECRS_URI *uri;
+
+  name = makeName (size);
+  fd =
+    GNUNET_disk_file_open (ectx, name, O_WRONLY | O_CREAT, S_IWUSR | S_IRUSR);
+  if (fd == -1)
+    {
+      GNUNET_free (name);
+      return NULL;
+    }
+  buf = GNUNET_malloc (size);
+  memset (buf, size % 255, size);
+  WRITE (fd, buf, size);
+  GNUNET_free (buf);
+  GNUNET_disk_file_close (ectx, name, fd);
+  ret = GNUNET_ECRS_file_upload (ectx, cfg, name, GNUNET_YES,   /* index */
+                                 1,     /* anonymous */
+                                 0,     /* priority */
+                                 GNUNET_get_time () + 100 * 
GNUNET_CRON_MINUTES,        /* expire */
+                                 NULL, NULL, &testTerminate, NULL, &uri);
+  if (ret != GNUNET_SYSERR)
+    {
+      struct GNUNET_MetaData *meta;
+
+      meta = GNUNET_meta_data_create ();
+      ret = GNUNET_ECRS_publish_under_keyword (ectx, cfg, key, 0, 0, 
GNUNET_get_time () + 100 * GNUNET_CRON_MINUTES,    /* expire */
+                                               uri, meta);
+      GNUNET_meta_data_destroy (meta);
+      GNUNET_free (name);
+      if (ret == GNUNET_OK)
+        return uri;
+      GNUNET_ECRS_uri_destroy (uri);
+      return NULL;
+    }
+  else
+    {
+      GNUNET_ECRS_uri_destroy (uri);
+      GNUNET_free (name);
+      return NULL;
+    }
+}
+
+static int
+searchCB (const GNUNET_ECRS_FileInfo * fi,
+          const GNUNET_HashCode * key, int isRoot, void *closure)
+{
+  int i;
+
+#if DEBUG
+  if (fi->uri->type == loc)
+       fprintf(stdout, "Got location information from search, great!\n");
+#endif
+
+  for (i = 0; i < TOTAL; i++)
+    {
+      if ((uris[i] != NULL) &&
+          (GNUNET_ECRS_uri_test_equal (uris[i], fi->uri)))
+        {
+          GNUNET_ECRS_uri_destroy (uris[i]);
+          uris[i] = NULL;
+          found++;
+          fprintf (stderr, ".");
+          return GNUNET_OK;
+        }
+    }
+  return GNUNET_OK;
+}
+
+#define CHECK(a) if (!(a)) { ret = 1; GNUNET_GE_BREAK(ectx, 0); goto FAILURE; }
+
+/**
+ * Testcase to test gap routing (2 peers only).
+ * @return 0: ok, -1: error
+ */
+int
+main (int argc, char **argv)
+{
+#if START_PEERS
+  struct GNUNET_TESTING_DaemonContext *peers;
+#endif
+  int ret;
+  int i;
+  char buf[128];
+  char *tmp;
+  ret = 0;
+  cfg = GNUNET_GC_create ();
+  if (-1 == GNUNET_GC_parse_configuration (cfg, "check.conf"))
+    {
+      GNUNET_GC_free (cfg);
+      return -1;
+    }
+#if START_PEERS
+  peers = GNUNET_TESTING_start_daemons ("tcp",
+                                        "advertising topology dv_fs stats",
+                                        "/tmp/gap-dv-multi-results-test",
+                                        2087, 10, PEER_COUNT);
+  if (peers == NULL)
+    {
+      fprintf (stderr, "Failed to start the gnunetd daemons!\n");
+      GNUNET_GC_free (cfg);
+      return -1;
+    }
+#endif
+  for (i = 1; i < PEER_COUNT; i++)
+    {
+      if (GNUNET_OK != GNUNET_TESTING_connect_daemons (2077 + (10 * i),
+                                                       2087 + (10 * i)))
+        {
+#if START_PEERS
+          GNUNET_TESTING_stop_daemons (peers);
+#endif
+          fprintf (stderr, "Failed to connect the peers!\n");
+          GNUNET_GC_free (cfg);
+          return -1;
+        }
+    }
+  key = GNUNET_ECRS_keyword_string_to_uri (NULL, "GAPTEST");
+  fprintf (stderr, "Uploading...");
+  for (i = 0; i < TOTAL; i++)
+    {
+      uris[i] = uploadFile (i + 1);
+      CHECK (uris[i] != NULL);
+      tmp = GNUNET_ECRS_uri_to_string(uris[i]);
+      //fprintf (stderr, "URI is %s\n", tmp);
+      GNUNET_free(tmp);
+    }
+  GNUNET_thread_sleep(360 * GNUNET_CRON_SECONDS);
+  fprintf (stderr, "\nSearching...");
+  GNUNET_snprintf (buf, 128, "localhost:%u", 2077 + PEER_COUNT * 10);
+  GNUNET_GC_set_configuration_value_string (cfg, ectx, "NETWORK", "HOST",
+                                            buf);
+
+  GNUNET_ECRS_search (ectx,
+                      cfg, key, 0, &searchCB, NULL, &testTerminate, NULL);
+  fprintf (stderr, "\n");
+  CHECK (found > (TOTAL * 90) / 100);
+FAILURE:
+#if START_PEERS
+  GNUNET_TESTING_stop_daemons (peers);
+#endif
+  GNUNET_ECRS_uri_destroy (key);
+  GNUNET_GC_free (cfg);
+  return ret;
+}
+
+/* end of test_multi_results_dv.c */





reply via email to

[Prev in Thread] Current Thread [Next in Thread]