[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r9053 - GNUnet/src/applications/fs/gap
From: |
gnunet |
Subject: |
[GNUnet-SVN] r9053 - GNUnet/src/applications/fs/gap |
Date: |
Thu, 1 Oct 2009 12:10:27 -0600 |
Author: nevans
Date: 2009-10-01 12:10:27 -0600 (Thu, 01 Oct 2009)
New Revision: 9053
Added:
GNUnet/src/applications/fs/gap/dv_fs.c
GNUnet/src/applications/fs/gap/dv_querymanager.c
GNUnet/src/applications/fs/gap/dv_querymanager.h
GNUnet/src/applications/fs/gap/fs_dv_dht.c
GNUnet/src/applications/fs/gap/fs_dv_dht.h
GNUnet/src/applications/fs/gap/test_multi_results_dv.c
Modified:
GNUnet/src/applications/fs/gap/check.conf
Log:
dv gap testing, not yet working well (though it didn't before either ;-)
Modified: GNUnet/src/applications/fs/gap/check.conf
===================================================================
--- GNUnet/src/applications/fs/gap/check.conf 2009-10-01 16:26:02 UTC (rev
9052)
+++ GNUnet/src/applications/fs/gap/check.conf 2009-10-01 18:10:27 UTC (rev
9053)
@@ -1,7 +1,7 @@
# General settings
[GNUNET]
-GNUNET_HOME = "/tmp/gnunet-gap-test-driver"
-LOGLEVEL = "ERROR"
+GNUNET_HOME = "/tmp/gap-test-driver"
+KEEPLOG = 3
LOGFILE = ""
PROCESS-PRIORITY = "NORMAL"
@@ -9,8 +9,11 @@
[NETWORK]
HOST = "localhost:2087"
-
[TESTING]
WEAKRANDOM = YES
+[LOGGING]
+DEVELOPER = YES
+ADMIN-LEVEL = DEBUG
+USER-LEVEL = DEBUG
Added: GNUnet/src/applications/fs/gap/dv_fs.c
===================================================================
--- GNUnet/src/applications/fs/gap/dv_fs.c (rev 0)
+++ GNUnet/src/applications/fs/gap/dv_fs.c 2009-10-01 18:10:27 UTC (rev
9053)
@@ -0,0 +1,1003 @@
+/*
+ This file is part of GNUnet.
+ (C) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008 Christian Grothoff
(and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+*/
+
+/**
+ * @file applications/fs/gap/dv_fs.c
+ * @brief functions for handling CS and P2P file-sharing requests
+ * @author Christian Grothoff, Nathan Evans
+ *
+ * This file contains all of the entry points to the file-sharing
+ * module.
+ *
+ * TODO:
+ * - integrate with migration submodule
+ * - make sure we do an immediate PUSH for DHT stuff
+ * given to us with anonymity_level zero.
+ */
+
+#include "platform.h"
+#include "gnunet_util.h"
+#include "gnunet_directories.h"
+#include "gnunet_protocols.h"
+#include "gnunet_datastore_service.h"
+#include "gnunet_dht_service.h"
+#include "gnunet_identity_service.h"
+#include "gnunet_stats_service.h"
+#include "gnunet_traffic_service.h"
+#include "ecrs_core.h"
+#include "anonymity.h"
+#include "fs.h"
+#include "fs_dv_dht.h"
+#include "gap.h"
+#include "migration.h"
+#include "dv_querymanager.h"
+#include "ondemand.h"
+#include "plan.h"
+#include "pid_table.h"
+#include "shared.h"
+
+
+#define DEBUG_FS GNUNET_NO
+
+/**
+ * Lock shared between all C files in this
+ * directory.
+ */
+struct GNUNET_Mutex *GNUNET_FS_lock;
+
+static struct GNUNET_GE_Context *ectx;
+
+static GNUNET_CoreAPIForPlugins *coreAPI;
+
+static GNUNET_Identity_ServiceAPI *identity;
+
+static GNUNET_Stats_ServiceAPI *stats;
+
+static GNUNET_Datastore_ServiceAPI *datastore;
+
+static int active_migration;
+
+static int stat_gap_query_received;
+
+static int stat_gap_query_drop_busy;
+
+static int stat_gap_content_received;
+
+static int stat_gap_trust_awarded;
+
+/**
+ * Hard CPU limit
+ */
+static unsigned long long hardCPULimit;
+
+/**
+ * Hard network upload limit.
+ */
+static unsigned long long hardUpLimit;
+
+
+
+/* ********************* CS handlers ********************** */
+
+/**
+ * Process a request to insert content from the client.
+ *
+ * @return GNUNET_SYSERR if the TCP connection should be closed, otherwise
GNUNET_OK
+ */
+static int
+handle_cs_insert_request (struct GNUNET_ClientHandle *sock,
+ const GNUNET_MessageHeader * req)
+{
+ const CS_fs_request_insert_MESSAGE *ri;
+ GNUNET_DatastoreValue *datum;
+ struct GNUNET_GE_Context *cectx;
+ GNUNET_HashCode query;
+ int ret;
+#if DEBUG_FS
+ GNUNET_EncName enc;
+#endif
+
+ ri = (const CS_fs_request_insert_MESSAGE *) req;
+ if ((ntohs (req->size) < sizeof (CS_fs_request_insert_MESSAGE)) ||
+ (GNUNET_OK !=
+ GNUNET_EC_file_block_check_and_get_query (ntohs (ri->header.size) -
+ sizeof
+
(CS_fs_request_insert_MESSAGE),
+ (const GNUNET_EC_DBlock *)
+ &ri[1], GNUNET_YES, &query)))
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ datum = GNUNET_malloc (sizeof (GNUNET_DatastoreValue) +
+ ntohs (req->size) -
+ sizeof (CS_fs_request_insert_MESSAGE));
+ datum->size =
+ htonl (sizeof (GNUNET_DatastoreValue) + ntohs (req->size) -
+ sizeof (CS_fs_request_insert_MESSAGE));
+ datum->expiration_time = ri->expiration;
+ datum->priority = ri->priority;
+ datum->anonymity_level = ri->anonymity_level;
+ datum->type =
+ htonl (GNUNET_EC_file_block_get_type
+ (ntohs (ri->header.size) - sizeof (CS_fs_request_insert_MESSAGE),
+ (const GNUNET_EC_DBlock *) &ri[1]));
+#if DEBUG_FS
+ IF_GELOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ GNUNET_hash_to_enc (&query, &enc));
+ GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "FS received REQUEST INSERT (query: `%s', type: %u, priority
%u)\n",
+ &enc, ntohl (datum->type), ntohl (ri->priority));
+#endif
+ memcpy (&datum[1],
+ &ri[1], ntohs (req->size) - sizeof (CS_fs_request_insert_MESSAGE));
+ ret = datastore->putUpdate (&query, datum);
+ if (ret == GNUNET_NO)
+ {
+ cectx = coreAPI->cs_log_context_create (sock);
+ GNUNET_GE_LOG (cectx,
+ GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
+ _("Datastore full.\n"));
+ GNUNET_GE_free_context (cectx);
+ }
+ GNUNET_free (datum);
+ return coreAPI->cs_send_value (sock, ret);
+}
+
+/**
+ * Process a request to symlink a file
+ */
+static int
+handle_cs_init_index_request (struct GNUNET_ClientHandle *sock,
+ const GNUNET_MessageHeader * req)
+{
+ const CS_fs_request_init_index_MESSAGE *ri;
+ struct GNUNET_GE_Context *cectx;
+ int fnLen;
+ int ret;
+ char *fn;
+
+ fnLen = ntohs (req->size) - sizeof (CS_fs_request_init_index_MESSAGE);
+ if ((ntohs (req->size) < sizeof (CS_fs_request_init_index_MESSAGE))
+#if WINDOWS
+ || (fnLen > _MAX_PATH)
+#endif
+ )
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ ri = (const CS_fs_request_init_index_MESSAGE *) req;
+ fn = GNUNET_malloc (fnLen + 1);
+ strncpy (fn, (const char *) &ri[1], fnLen + 1);
+ fn[fnLen] = 0;
+ cectx = coreAPI->cs_log_context_create (sock);
+ ret =
+ GNUNET_FS_ONDEMAND_index_prepare_with_symlink (cectx, &ri->fileId, fn);
+ GNUNET_GE_free_context (cectx);
+ GNUNET_free (fn);
+#if DEBUG_FS
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Sending confirmation (%s) of index initialization request to
client\n",
+ ret == GNUNET_OK ? "success" : "failure");
+#endif
+ return coreAPI->cs_send_value (sock, ret);
+}
+
+/**
+ * Process a request to index content from the client.
+ *
+ * @return GNUNET_SYSERR if the TCP connection should be closed, otherwise
GNUNET_OK
+ */
+static int
+handle_cs_index_request (struct GNUNET_ClientHandle *sock,
+ const GNUNET_MessageHeader * req)
+{
+ int ret;
+ const CS_fs_request_index_MESSAGE *ri;
+ struct GNUNET_GE_Context *cectx;
+#if DEBUG_FS
+ GNUNET_HashCode hc;
+ GNUNET_EncName enc;
+#endif
+
+ if (ntohs (req->size) < sizeof (CS_fs_request_index_MESSAGE))
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ cectx = coreAPI->cs_log_context_create (sock);
+ ri = (const CS_fs_request_index_MESSAGE *) req;
+#if DEBUG_FS
+ GNUNET_EC_file_block_get_query ((const GNUNET_EC_DBlock *) &ri[1],
+ ntohs (ri->header.size) -
+ sizeof (CS_fs_request_index_MESSAGE), &hc);
+ IF_GELOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ GNUNET_hash_to_enc (&hc, &enc));
+ GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "FS received REQUEST INDEX (query: `%s', priority %u)\n",
+ &enc, ntohl (ri->priority));
+#endif
+ ret = GNUNET_FS_ONDEMAND_add_indexed_content (cectx,
+ datastore,
+ ntohl (ri->priority),
+ GNUNET_ntohll
+ (ri->expiration),
+ GNUNET_ntohll
+ (ri->fileOffset),
+ ntohl (ri->anonymity_level),
+ &ri->fileId,
+ ntohs (ri->header.size) -
+ sizeof
+ (CS_fs_request_index_MESSAGE),
+ (const GNUNET_EC_DBlock *)
+ &ri[1]);
+ GNUNET_GE_free_context (cectx);
+#if DEBUG_FS
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Sending confirmation (%s) of index request to client\n",
+ ret == GNUNET_OK ? "success" : "failure");
+#endif
+ return coreAPI->cs_send_value (sock, ret);
+}
+
+/**
+ * Process a query to delete content.
+ *
+ * @return GNUNET_SYSERR if the TCP connection should be closed, otherwise
GNUNET_OK
+ */
+static int
+handle_cs_delete_request (struct GNUNET_ClientHandle *sock,
+ const GNUNET_MessageHeader * req)
+{
+ int ret;
+ const CS_fs_request_delete_MESSAGE *rd;
+ GNUNET_DatastoreValue *value;
+ GNUNET_HashCode query;
+ unsigned int type;
+#if DEBUG_FS
+ GNUNET_EncName enc;
+#endif
+
+ if (ntohs (req->size) < sizeof (CS_fs_request_delete_MESSAGE))
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ rd = (const CS_fs_request_delete_MESSAGE *) req;
+ value = GNUNET_malloc (sizeof (GNUNET_DatastoreValue) +
+ ntohs (req->size) -
+ sizeof (CS_fs_request_delete_MESSAGE));
+ value->size =
+ ntohl (sizeof (GNUNET_DatastoreValue) + ntohs (req->size) -
+ sizeof (CS_fs_request_delete_MESSAGE));
+ type =
+ GNUNET_EC_file_block_get_type (ntohs (rd->header.size) -
+ sizeof (CS_fs_request_delete_MESSAGE),
+ (const GNUNET_EC_DBlock *) &rd[1]);
+ value->type = htonl (type);
+ memcpy (&value[1],
+ &rd[1], ntohs (req->size) - sizeof (CS_fs_request_delete_MESSAGE));
+ if (GNUNET_OK !=
+ GNUNET_EC_file_block_check_and_get_query (ntohs (rd->header.size) -
+ sizeof
+ (CS_fs_request_delete_MESSAGE),
+ (const GNUNET_EC_DBlock *)
+ &rd[1], GNUNET_NO, &query))
+ {
+ GNUNET_free (value);
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+#if DEBUG_FS
+ IF_GELOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ GNUNET_hash_to_enc (&query, &enc));
+ GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "FS received REQUEST DELETE (query: `%s', type: %u)\n", &enc,
+ type);
+#endif
+ GNUNET_mutex_lock (GNUNET_FS_lock);
+ value->type = htonl (GNUNET_ECRS_BLOCKTYPE_ANY);
+ ret = datastore->get (&query, type,
+
&GNUNET_FS_HELPER_complete_value_from_database_callback,
+ value);
+ if ((0 < ret) && (value->type != htonl (GNUNET_ECRS_BLOCKTYPE_ANY)))
+ {
+ ret = datastore->del (&query, value);
+ }
+ else
+ { /* not found */
+ ret = GNUNET_SYSERR;
+ }
+ GNUNET_mutex_unlock (GNUNET_FS_lock);
+ GNUNET_free (value);
+#if DEBUG_FS
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Sending confirmation (%s) of delete request to client\n",
+ ret != GNUNET_SYSERR ? "success" : "failure");
+#endif
+ return coreAPI->cs_send_value (sock, ret);
+}
+
+/**
+ * Process a client request unindex content.
+ */
+static int
+handle_cs_unindex_request (struct GNUNET_ClientHandle *sock,
+ const GNUNET_MessageHeader * req)
+{
+ int ret;
+ const CS_fs_request_unindex_MESSAGE *ru;
+ struct GNUNET_GE_Context *cectx;
+
+ cectx = coreAPI->cs_log_context_create (sock);
+ if (ntohs (req->size) != sizeof (CS_fs_request_unindex_MESSAGE))
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ GNUNET_GE_BREAK (cectx, 0);
+ GNUNET_GE_free_context (cectx);
+ return GNUNET_SYSERR;
+ }
+ ru = (const CS_fs_request_unindex_MESSAGE *) req;
+#if DEBUG_FS
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "FS received REQUEST UNINDEX\n");
+#endif
+ ret = GNUNET_FS_ONDEMAND_delete_indexed_content (cectx,
+ datastore,
+ ntohl (ru->blocksize),
+ &ru->fileId);
+ GNUNET_GE_free_context (cectx);
+ return coreAPI->cs_send_value (sock, ret);
+}
+
+/**
+ * Process a client request to test if certain
+ * data is indexed.
+ */
+static int
+handle_cs_test_indexed_request (struct GNUNET_ClientHandle *sock,
+ const GNUNET_MessageHeader * req)
+{
+ int ret;
+ const CS_fs_request_test_index_MESSAGE *ru;
+
+ if (ntohs (req->size) != sizeof (CS_fs_request_test_index_MESSAGE))
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ ru = (const CS_fs_request_test_index_MESSAGE *) req;
+#if DEBUG_FS
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "FS received REQUEST TESTINDEXED\n");
+#endif
+ ret = GNUNET_FS_ONDEMAND_test_indexed_file (datastore, &ru->fileId);
+ return coreAPI->cs_send_value (sock, ret);
+}
+
+struct FPPClosure
+{
+ struct GNUNET_ClientHandle *sock;
+ struct GNUNET_MultiHashMap *seen;
+ unsigned int processed;
+ int have_more;
+};
+
+/**
+ * Any response that we get should be passed
+ * back to the client. If the response is unique,
+ * we should abort the iteration (return GNUNET_SYSERR).
+ */
+static int
+fast_path_processor (const GNUNET_HashCode * key,
+ const GNUNET_DatastoreValue *
+ value, void *closure, unsigned long long uid)
+{
+ struct FPPClosure *cls = closure;
+ GNUNET_HashCode hc;
+ unsigned int type;
+ int ret;
+
+ if (cls->processed > GNUNET_GAP_MAX_SYNC_PROCESSED)
+ {
+ cls->have_more = GNUNET_YES;
+ return GNUNET_SYSERR;
+ }
+ type = ntohl (((const GNUNET_EC_DBlock *) &value[1])->type);
+ ret = GNUNET_FS_HELPER_send_to_client (coreAPI,
+ key, value, cls->sock, NULL, &hc);
+ if (ret == GNUNET_NO)
+ return GNUNET_NO; /* delete + continue */
+ cls->processed++;
+ if (ret != GNUNET_OK)
+ cls->have_more = GNUNET_YES; /* switch to async processing */
+ if ((type == GNUNET_ECRS_BLOCKTYPE_DATA) || (ret != GNUNET_OK))
+ return GNUNET_SYSERR; /* unique response or client can take no more
*/
+ if (cls->seen == NULL)
+ cls->seen = GNUNET_multi_hash_map_create (8);
+ GNUNET_multi_hash_map_put (cls->seen,
+ &hc,
+ NULL, GNUNET_MultiHashMapOption_UNIQUE_FAST);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Process a query from the client. Forwards to the network.
+ *
+ * @return GNUNET_SYSERR if the TCP connection should be closed, otherwise
GNUNET_OK
+ */
+static int
+handle_cs_query_start_request (struct GNUNET_ClientHandle *sock,
+ const GNUNET_MessageHeader * req)
+{
+ static GNUNET_PeerIdentity all_zeros;
+ struct FPPClosure fpp;
+ const CS_fs_request_search_MESSAGE *rs;
+ unsigned int keyCount;
+ unsigned int type;
+ unsigned int anonymityLevel;
+ int have_target;
+#if DEBUG_FS
+ GNUNET_EncName enc;
+#endif
+
+ if (ntohs (req->size) < sizeof (CS_fs_request_search_MESSAGE))
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ rs = (const CS_fs_request_search_MESSAGE *) req;
+ type = ntohl (rs->type);
+ /* try "fast path" avoiding gap/dht if unique reply is locally available */
+#if DEBUG_FS
+ IF_GELOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ GNUNET_hash_to_enc (&rs->query[0], &enc));
+ GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "FS received QUERY (query: `%s', type: %u)\n", &enc, type);
+#endif
+ fpp.sock = sock;
+ fpp.seen = NULL;
+ fpp.have_more = GNUNET_NO;
+ fpp.processed = 0;
+ if (GNUNET_OK ==
+ coreAPI->cs_send_message_now_test (sock,
+ GNUNET_GAP_ESTIMATED_DATA_SIZE,
+ GNUNET_NO))
+ {
+ if (type == GNUNET_ECRS_BLOCKTYPE_DATA)
+ {
+ if (((1 == datastore->get (&rs->query[0],
+ type, &fast_path_processor, &fpp)) ||
+ (1 == datastore->get (&rs->query[0],
+ GNUNET_ECRS_BLOCKTYPE_ONDEMAND,
+ &fast_path_processor, &fpp))) &&
+ (fpp.have_more == GNUNET_NO))
+ goto CLEANUP;
+ }
+ else
+ datastore->get (&rs->query[0], type, &fast_path_processor, &fpp);
+ }
+ else
+ fpp.have_more = GNUNET_YES;
+ anonymityLevel = ntohl (rs->anonymity_level);
+ keyCount =
+ 1 + (ntohs (req->size) -
+ sizeof (CS_fs_request_search_MESSAGE)) / sizeof (GNUNET_HashCode);
+ have_target =
+ memcmp (&all_zeros, &rs->target, sizeof (GNUNET_PeerIdentity)) != 0;
+ GNUNET_DV_FS_QUERYMANAGER_start_query (&rs->query[0], keyCount,
anonymityLevel,
+ type, sock,
+ have_target ? &rs->target : NULL,
+ fpp.seen, fpp.have_more);
+CLEANUP:
+ if (fpp.seen != NULL)
+ GNUNET_multi_hash_map_destroy (fpp.seen);
+ return GNUNET_OK;
+}
+
+/**
+ * Process a stop request from the client.
+ *
+ * @return GNUNET_SYSERR if the TCP connection should be closed, otherwise
GNUNET_OK
+ */
+static int
+handle_cs_query_stop_request (struct GNUNET_ClientHandle *sock,
+ const GNUNET_MessageHeader * req)
+{
+ const CS_fs_request_search_MESSAGE *rs;
+ unsigned int keyCount;
+ unsigned int type;
+ unsigned int anonymityLevel;
+
+ if (ntohs (req->size) < sizeof (CS_fs_request_search_MESSAGE))
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ rs = (const CS_fs_request_search_MESSAGE *) req;
+ type = ntohl (rs->type);
+ anonymityLevel = ntohl (rs->anonymity_level);
+ keyCount =
+ 1 + (ntohs (req->size) -
+ sizeof (CS_fs_request_search_MESSAGE)) / sizeof (GNUNET_HashCode);
+ GNUNET_DV_FS_QUERYMANAGER_stop_query (&rs->query[0], keyCount,
anonymityLevel,
+ type, sock);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Return 1 if the current network (upstream) or CPU load is
+ * (far) too high, 0 if the load is ok.
+ */
+static int
+test_load_too_high ()
+{
+ return ((hardCPULimit > 0) &&
+ (GNUNET_cpu_get_load (ectx,
+ coreAPI->cfg) >= hardCPULimit)) ||
+ ((hardUpLimit > 0) &&
+ (GNUNET_network_monitor_get_load (coreAPI->load_monitor,
+ GNUNET_ND_UPLOAD) >= hardUpLimit));
+}
+
+/**
+ * Handle P2P query for content.
+ */
+static int
+handle_p2p_query (const GNUNET_PeerIdentity * sender,
+ const GNUNET_MessageHeader * msg)
+{
+ const P2P_gap_query_MESSAGE *req;
+ unsigned int query_count;
+ unsigned short size;
+ unsigned int bloomfilter_size;
+ int ttl;
+ unsigned int prio;
+ unsigned int type;
+ unsigned int netLoad;
+ enum GNUNET_FS_RoutingPolicy policy;
+ double preference;
+
+ if (stats != NULL)
+ stats->change (stat_gap_query_received, 1);
+ if (test_load_too_high ())
+ {
+#if DEBUG_GAP
+ if (sender != NULL)
+ {
+ IF_GELOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ GNUNET_hash_to_enc (&sender->hashPubKey, &enc));
+ }
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Dropping query from %s, this peer is too busy.\n",
+ sender == NULL ? "localhost" : (char *) &enc);
+#endif
+ if (stats != NULL)
+ stats->change (stat_gap_query_drop_busy, 1);
+ return GNUNET_OK;
+ }
+ size = ntohs (msg->size);
+ if (size < sizeof (P2P_gap_query_MESSAGE))
+ {
+ GNUNET_GE_BREAK_OP (ectx, 0);
+ return GNUNET_SYSERR; /* malformed query */
+ }
+ req = (const P2P_gap_query_MESSAGE *) msg;
+ query_count = ntohl (req->number_of_queries);
+ if ((query_count == 0) ||
+ (query_count > GNUNET_MAX_BUFFER_SIZE / sizeof (GNUNET_HashCode)) ||
+ (size <
+ sizeof (P2P_gap_query_MESSAGE) + (query_count -
+ 1) * sizeof (GNUNET_HashCode))
+ || (0 ==
+ memcmp (&req->returnTo, coreAPI->my_identity,
+ sizeof (GNUNET_PeerIdentity))))
+ {
+ GNUNET_GE_BREAK_OP (ectx, 0);
+ return GNUNET_SYSERR; /* malformed query */
+ }
+ bloomfilter_size =
+ size - (sizeof (P2P_gap_query_MESSAGE) +
+ (query_count - 1) * sizeof (GNUNET_HashCode));
+ GNUNET_GE_ASSERT (NULL, bloomfilter_size < size);
+ prio = ntohl (req->priority);
+ netLoad =
+ GNUNET_network_monitor_get_load (coreAPI->load_monitor, GNUNET_ND_UPLOAD);
+ if ((netLoad == (unsigned int) -1)
+ || (netLoad < GNUNET_GAP_IDLE_LOAD_THRESHOLD))
+ {
+ prio = 0; /* minimum priority, no charge! */
+ policy = GNUNET_FS_RoutingPolicy_ALL;
+ }
+ else
+ {
+ prio = -identity->changeHostTrust (sender, -prio);
+ if (netLoad < GNUNET_GAP_IDLE_LOAD_THRESHOLD + prio)
+ {
+ policy = GNUNET_FS_RoutingPolicy_ALL;
+ }
+ else if (netLoad < 90 + 10 * prio)
+ {
+ policy =
+ GNUNET_FS_RoutingPolicy_ANSWER | GNUNET_FS_RoutingPolicy_FORWARD;
+ }
+ else if (netLoad < 100)
+ {
+ policy = GNUNET_FS_RoutingPolicy_ANSWER;
+ }
+ else
+ {
+ if (stats != NULL)
+ stats->change (stat_gap_query_drop_busy, 1);
+ return GNUNET_OK; /* drop */
+ }
+ }
+ if ((policy & GNUNET_FS_RoutingPolicy_INDIRECT) == 0)
+ /* kill the priority (since we cannot benefit) */
+ prio = 0;
+ ttl = GNUNET_FS_HELPER_bound_ttl (ntohl (req->ttl), prio);
+ type = ntohl (req->type);
+ /* decrement ttl (always) */
+ if (ttl < 0)
+ {
+ ttl -= 2 * GNUNET_GAP_TTL_DECREMENT +
+ GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK,
+ GNUNET_GAP_TTL_DECREMENT);
+ if (ttl > 0)
+ /* integer underflow => drop (should be very rare)! */
+ return GNUNET_OK;
+ }
+ else
+ {
+ ttl -= 2 * GNUNET_GAP_TTL_DECREMENT +
+ GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK,
+ GNUNET_GAP_TTL_DECREMENT);
+ }
+ preference = (double) prio;
+ if (preference < GNUNET_GAP_QUERY_BANDWIDTH_VALUE)
+ preference = GNUNET_GAP_QUERY_BANDWIDTH_VALUE;
+ coreAPI->p2p_connection_preference_increase (sender, preference);
+ GNUNET_FS_GAP_execute_query (sender,
+ prio,
+ ntohl (req->priority),
+ policy,
+ ttl,
+ type,
+ query_count,
+ &req->queries[0],
+ ntohl (req->filter_mutator),
+ bloomfilter_size, &req->queries[query_count]);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Use content (forward to whoever sent the query).
+ * @param hostId the peer from where the content came,
+ * NULL for the local peer
+ */
+static int
+handle_p2p_content (const GNUNET_PeerIdentity * sender,
+ const GNUNET_MessageHeader * pmsg)
+{
+ const P2P_gap_reply_MESSAGE *msg;
+ const GNUNET_EC_DBlock *dblock;
+ GNUNET_DatastoreValue *value;
+ GNUNET_HashCode query;
+ unsigned short size;
+ unsigned int data_size;
+ unsigned int prio;
+ unsigned long long expiration;
+ double preference;
+ GNUNET_CronTime now;
+
+ size = ntohs (pmsg->size);
+ if (size < sizeof (P2P_gap_reply_MESSAGE))
+ {
+ GNUNET_GE_BREAK_OP (ectx, 0);
+ return GNUNET_SYSERR; /* invalid! */
+ }
+ msg = (const P2P_gap_reply_MESSAGE *) pmsg;
+ data_size = size - sizeof (P2P_gap_reply_MESSAGE);
+ dblock = (const GNUNET_EC_DBlock *) &msg[1];
+
+ expiration = GNUNET_ntohll (msg->expiration);
+ if ((expiration > GNUNET_GAP_MAX_MIGRATION_EXP_KSK) &&
+ (ntohl (dblock->type) == GNUNET_ECRS_BLOCKTYPE_KEYWORD))
+ return GNUNET_OK; /* expired KSK block -- ignore! */
+ if (GNUNET_OK !=
+ GNUNET_EC_file_block_check_and_get_query (data_size,
+ dblock, GNUNET_YES, &query))
+ {
+ GNUNET_GE_BREAK_OP (ectx, 0);
+ return GNUNET_SYSERR; /* invalid! */
+ }
+ if ((stats != NULL) && (sender != NULL))
+ stats->change (stat_gap_content_received, 1);
+ /* forward to other peers */
+ prio = GNUNET_FS_GAP_handle_response (sender,
+ &query,
+ expiration, data_size, dblock);
+ /* convert expiration to absolute time and bound properly for
+ storage in local datastore */
+ now = GNUNET_get_time ();
+ if (expiration > GNUNET_GAP_MAX_MIGRATION_EXP)
+ {
+ /* expired, sometime in the past */
+ expiration = now - 1;
+ }
+ else
+ {
+ /* expires in future, apply bounding! */
+ if (ntohl (dblock->type) == GNUNET_ECRS_BLOCKTYPE_KEYWORD)
+ expiration %= GNUNET_GAP_MAX_MIGRATION_EXP_KSK;
+ else
+ expiration %= GNUNET_GAP_MAX_MIGRATION_EXP;
+ expiration += now;
+ }
+ /* forward to local clients */
+ prio += GNUNET_DV_FS_QUERYMANAGER_handle_response (sender,
+ &query,
+ expiration,
+ data_size, dblock);
+ if ((sender != NULL) &&
+ (active_migration == GNUNET_YES) &&
+ ((prio > 0) || (!test_load_too_high ())))
+ {
+ /* consider storing in local datastore */
+ value = GNUNET_malloc (data_size + sizeof (GNUNET_DatastoreValue));
+ value->size = htonl (data_size + sizeof (GNUNET_DatastoreValue));
+ value->type = dblock->type;
+ value->priority = htonl (prio);
+ value->anonymity_level = htonl (1);
+ value->expiration_time = GNUNET_htonll (expiration);
+ memcpy (&value[1], dblock, data_size);
+ datastore->putUpdate (&query, value);
+ GNUNET_free (value);
+ }
+ if (sender != NULL)
+ { /* if we are the sender, sender will be NULL */
+ identity->changeHostTrust (sender, prio);
+ if (stats != NULL)
+ stats->change (stat_gap_trust_awarded, prio);
+ preference = (double) prio;
+ if (preference < GNUNET_GAP_CONTENT_BANDWIDTH_VALUE)
+ preference = GNUNET_GAP_CONTENT_BANDWIDTH_VALUE;
+ coreAPI->p2p_connection_preference_increase (sender, preference);
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Initialize the FS module. This method name must match
+ * the library name (libgnunet_XXX => initialize_XXX).
+ *
+ * @return GNUNET_SYSERR on errors
+ */
+int
+initialize_module_dv_fs (GNUNET_CoreAPIForPlugins * capi)
+{
+ ectx = capi->ectx;
+ coreAPI = capi;
+ GNUNET_GE_ASSERT (ectx, sizeof (GNUNET_EC_ContentHashKey) == 128);
+ GNUNET_GE_ASSERT (ectx, sizeof (GNUNET_EC_DBlock) == 4);
+ GNUNET_GE_ASSERT (ectx, sizeof (GNUNET_EC_IBlock) == 132);
+ GNUNET_GE_ASSERT (ectx, sizeof (GNUNET_EC_KBlock) == 524);
+ GNUNET_GE_ASSERT (ectx, sizeof (GNUNET_EC_SBlock) == 588);
+ GNUNET_GE_ASSERT (ectx, sizeof (GNUNET_EC_KSBlock) == 1116);
+
+ if ((-1 == GNUNET_GC_get_configuration_value_number (coreAPI->cfg, "LOAD",
"HARDCPULIMIT", 0, 100000, /* 1000 CPUs!? */
+ 0, /* 0 == no
limit */
+ &hardCPULimit)) || (-1
== GNUNET_GC_get_configuration_value_number (coreAPI->cfg, "LOAD",
"HARDUPLIMIT", 0, 999999999, 0, /* 0 == no limit */
+
&hardUpLimit)))
+ return GNUNET_SYSERR;
+ active_migration
+ = GNUNET_GC_get_configuration_value_yesno (coreAPI->cfg,
+ "DV_FS",
+ "ACTIVEMIGRATION", GNUNET_NO);
+ stats = capi->service_request ("stats");
+ if (stats != NULL)
+ {
+ stat_gap_query_received =
+ stats->create (gettext_noop ("# gap requests total received"));
+ stat_gap_query_drop_busy =
+ stats->create (gettext_noop ("# gap requests dropped due to load"));
+ stat_gap_content_received =
+ stats->create (gettext_noop ("# gap content total received"));
+ stat_gap_trust_awarded =
+ stats->create (gettext_noop ("# gap total trust awarded"));
+ }
+ identity = capi->service_request ("identity");
+ if (identity == NULL)
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ capi->service_release (stats);
+ return GNUNET_SYSERR;
+ }
+ datastore = capi->service_request ("datastore");
+ if (datastore == NULL)
+ {
+ capi->service_release (identity);
+ capi->service_release (stats);
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ GNUNET_FS_lock = capi->global_lock_get (); // GNUNET_mutex_create
(GNUNET_YES);
+ GNUNET_FS_ANONYMITY_init (capi);
+ GNUNET_FS_PLAN_init (capi);
+ GNUNET_FS_ONDEMAND_init (capi);
+ GNUNET_FS_PT_init (ectx, stats);
+ GNUNET_DV_FS_QUERYMANAGER_init (capi);
+ GNUNET_FS_DV_DHT_init (capi);
+ GNUNET_FS_GAP_init (capi);
+ GNUNET_FS_MIGRATION_init (capi);
+ GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ _
+ ("`%s' registering client handlers %d %d %d %d %d %d %d %d
and P2P handlers %d %d\n"),
+ "fs", GNUNET_CS_PROTO_GAP_QUERY_START,
+ GNUNET_CS_PROTO_GAP_QUERY_STOP,
+ GNUNET_CS_PROTO_GAP_INSERT,
+ GNUNET_CS_PROTO_GAP_INDEX, GNUNET_CS_PROTO_GAP_DELETE,
+ GNUNET_CS_PROTO_GAP_UNINDEX, GNUNET_CS_PROTO_GAP_TESTINDEX,
+ GNUNET_CS_PROTO_GAP_INIT_INDEX,
+ GNUNET_P2P_PROTO_GAP_QUERY, GNUNET_P2P_PROTO_GAP_RESULT);
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ capi->p2p_ciphertext_handler_register
+ (GNUNET_P2P_PROTO_GAP_QUERY, &handle_p2p_query));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ capi->p2p_ciphertext_handler_register
+ (GNUNET_P2P_PROTO_GAP_RESULT, &handle_p2p_content));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ capi->cs_handler_register
+ (GNUNET_CS_PROTO_GAP_QUERY_START,
+ &handle_cs_query_start_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ capi->cs_handler_register
+ (GNUNET_CS_PROTO_GAP_QUERY_STOP,
+ &handle_cs_query_stop_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ capi->cs_handler_register (GNUNET_CS_PROTO_GAP_INSERT,
+ &handle_cs_insert_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ capi->cs_handler_register (GNUNET_CS_PROTO_GAP_INDEX,
+ &handle_cs_index_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ capi->cs_handler_register (GNUNET_CS_PROTO_GAP_INIT_INDEX,
+ &handle_cs_init_index_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ capi->cs_handler_register (GNUNET_CS_PROTO_GAP_DELETE,
+ &handle_cs_delete_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ capi->cs_handler_register (GNUNET_CS_PROTO_GAP_UNINDEX,
+ &handle_cs_unindex_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ capi->cs_handler_register (GNUNET_CS_PROTO_GAP_TESTINDEX,
+
&handle_cs_test_indexed_request));
+ GNUNET_GE_ASSERT (capi->ectx,
+ 0 == GNUNET_GC_set_configuration_value_string (capi->cfg,
+ capi->ectx,
+ "ABOUT",
+ "fs",
+ gettext_noop
+ ("enables
(anonymous) file-sharing")));
+ return GNUNET_OK;
+}
+
+void
+done_module_dv_fs ()
+{
+ GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "fs shutdown\n");
+
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ coreAPI->p2p_ciphertext_handler_unregister
+ (GNUNET_P2P_PROTO_GAP_QUERY, &handle_p2p_query));
+
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ coreAPI->p2p_ciphertext_handler_unregister
+ (GNUNET_P2P_PROTO_GAP_RESULT, &handle_p2p_content));
+
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ coreAPI->cs_handler_unregister
+ (GNUNET_CS_PROTO_GAP_QUERY_START,
+ &handle_cs_query_start_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ coreAPI->cs_handler_unregister
+ (GNUNET_CS_PROTO_GAP_INSERT, &handle_cs_insert_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ coreAPI->cs_handler_unregister (GNUNET_CS_PROTO_GAP_INDEX,
+ &handle_cs_index_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ coreAPI->cs_handler_unregister
+ (GNUNET_CS_PROTO_GAP_INIT_INDEX,
+ &handle_cs_init_index_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ coreAPI->cs_handler_unregister
+ (GNUNET_CS_PROTO_GAP_DELETE, &handle_cs_delete_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ coreAPI->cs_handler_unregister
+ (GNUNET_CS_PROTO_GAP_UNINDEX,
+ &handle_cs_unindex_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ coreAPI->cs_handler_unregister
+ (GNUNET_CS_PROTO_GAP_TESTINDEX,
+ &handle_cs_test_indexed_request));
+ GNUNET_FS_MIGRATION_done ();
+ GNUNET_FS_GAP_done ();
+ GNUNET_FS_DV_DHT_done ();
+ GNUNET_DV_FS_QUERYMANAGER_done ();
+ GNUNET_FS_ONDEMAND_done ();
+ GNUNET_FS_PLAN_done ();
+ GNUNET_FS_ANONYMITY_done ();
+ GNUNET_FS_PT_done ();
+ if (stats != NULL)
+ {
+ coreAPI->service_release (stats);
+ stats = NULL;
+ }
+ coreAPI->service_release (datastore);
+ datastore = NULL;
+ coreAPI->service_release (identity);
+ identity = NULL;
+ GNUNET_FS_lock = NULL;
+}
+
+
+/**
+ * Update FS.
+ */
+void
+update_module_dv_fs (GNUNET_UpdateAPI * uapi)
+{
+ uapi->service_update ("datastore");
+}
+
+
+/* end of dv_fs.c */
Added: GNUnet/src/applications/fs/gap/dv_querymanager.c
===================================================================
--- GNUnet/src/applications/fs/gap/dv_querymanager.c
(rev 0)
+++ GNUnet/src/applications/fs/gap/dv_querymanager.c 2009-10-01 18:10:27 UTC
(rev 9053)
@@ -0,0 +1,746 @@
+/*
+ This file is part of GNUnet
+ (C) 2001 - 2009 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file fs/gap/dv_querymanager.c
+ * @brief management of queries from our clients
+ * @author Christian Grothoff, Nathan Evans
+ *
+ * This code forwards queries (using GAP and DHT) to other peers and
+ * passes replies (from GAP or DHT) back to clients.
+ */
+
+#include "platform.h"
+#include "gnunet_protocols.h"
+#include "gnunet_stats_service.h"
+#include "querymanager.h"
+#include "fs.h"
+#include "fs_dv_dht.h"
+#include "gap.h"
+#include "plan.h"
+#include "pid_table.h"
+#include "shared.h"
+
+#define CHECK_REPEAT_FREQUENCY (150 * GNUNET_CRON_MILLISECONDS)
+
+/**
+ * Linked list with information for each client.
+ */
+struct ClientDataList
+{
+
+ /**
+ * This is a linked list.
+ */
+ struct ClientDataList *next;
+
+ /**
+ * For which client is this data kept?
+ */
+ struct GNUNET_ClientHandle *client;
+
+ /**
+ * List of active requests for the client.
+ */
+ struct RequestList *requests;
+
+ /**
+ * Tail of the requests list.
+ */
+ struct RequestList *request_tail;
+
+};
+
+/**
+ * List of all clients, their active requests and other
+ * per-client information.
+ */
+static struct ClientDataList *clients;
+
+static struct ClientDataList *clients_tail;
+
+static GNUNET_CoreAPIForPlugins *coreAPI;
+
+static GNUNET_Stats_ServiceAPI *stats;
+
+static GNUNET_Datastore_ServiceAPI *datastore;
+
+static int stat_gap_client_query_received;
+
+static int stat_gap_client_response_sent;
+
+static int stat_gap_client_query_tracked;
+
+static int stat_gap_client_query_injected;
+
+static int stat_gap_client_bf_updates;
+
+
+/**
+ * How many bytes should a bloomfilter be if
+ * we have already seen entry_count responses?
+ * Note that GNUNET_GAP_BLOOMFILTER_K gives us the
+ * number of bits set per entry. Furthermore,
+ * we should not re-size the filter too often
+ * (to keep it cheap).
+ *
+ * Since other peers will also add entries but
+ * not resize the filter, we should generally
+ * pick a slightly larger size than what the
+ * strict math would suggest.
+ *
+ * @return must be a power of two and smaller
+ * or equal to 2^15.
+ */
+static unsigned int
+compute_bloomfilter_size (unsigned int entry_count)
+{
+ unsigned short size;
+ unsigned short max = 1 << 15;
+ unsigned int ideal = (entry_count * GNUNET_GAP_BLOOMFILTER_K) / 4;
+
+ if (entry_count > max)
+ return max;
+ size = 8;
+ while ((size < max) && (size < ideal))
+ size *= 2;
+ return size;
+}
+
+static int
+mark_response_seen (const GNUNET_HashCode * key, void *value, void *cls)
+{
+ GNUNET_FS_SHARED_mark_response_seen (key, cls);
+ return GNUNET_OK;
+}
+
+/**
+ * A client is asking us to run a query. The query should be issued
+ * until either a unique response has been obtained or until the
+ * client disconnects.
+ *
+ * @param target peer known to have the content, maybe NULL.
+ */
+void
+GNUNET_DV_FS_QUERYMANAGER_start_query (const GNUNET_HashCode * query,
+ unsigned int key_count,
+ unsigned int anonymityLevel,
+ unsigned int type,
+ struct GNUNET_ClientHandle *client,
+ const GNUNET_PeerIdentity * target,
+ const struct GNUNET_MultiHashMap *seen,
+ int have_more)
+{
+ struct ClientDataList *cl;
+ struct RequestList *request;
+
+ GNUNET_GE_ASSERT (NULL, key_count > 0);
+ if (stats != NULL)
+ {
+ stats->change (stat_gap_client_query_tracked, 1);
+ stats->change (stat_gap_client_query_received, 1);
+ }
+ request =
+ GNUNET_malloc (sizeof (struct RequestList) +
+ (key_count - 1) * sizeof (GNUNET_HashCode));
+ memset (request, 0, sizeof (struct RequestList));
+ request->anonymityLevel = anonymityLevel;
+ request->key_count = key_count;
+ request->type = type;
+ request->primary_target = GNUNET_FS_PT_intern (target);
+ request->response_client = client;
+ request->policy = GNUNET_FS_RoutingPolicy_ALL;
+ if (have_more != GNUNET_NO)
+ request->have_more = GNUNET_GAP_HAVE_MORE_INCREMENT;
+ memcpy (&request->queries[0], query, sizeof (GNUNET_HashCode) * key_count);
+ if (seen != NULL)
+ {
+ request->bloomfilter_entry_count = GNUNET_multi_hash_map_size (seen);
+ request->bloomfilter_size =
+ compute_bloomfilter_size (request->bloomfilter_entry_count);
+ request->bloomfilter_mutator =
+ GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, -1);
+ request->bloomfilter =
+ GNUNET_bloomfilter_init (NULL, NULL, request->bloomfilter_size,
+ GNUNET_GAP_BLOOMFILTER_K);
+ if (stats != NULL)
+ stats->change (stat_gap_client_bf_updates, 1);
+
+ GNUNET_multi_hash_map_iterate (seen, &mark_response_seen, request);
+ }
+ GNUNET_mutex_lock (GNUNET_FS_lock);
+ cl = clients;
+ while ((cl != NULL) && (cl->client != client))
+ cl = cl->next;
+ if (cl == NULL)
+ {
+ cl = GNUNET_malloc (sizeof (struct ClientDataList));
+ memset (cl, 0, sizeof (struct ClientDataList));
+ cl->client = client;
+ cl->next = clients;
+ clients = cl;
+ if (clients_tail == NULL)
+ clients_tail = cl;
+ }
+ request->next = cl->requests;
+ cl->requests = request;
+ if (cl->request_tail == NULL)
+ cl->request_tail = request;
+ if ((GNUNET_YES == GNUNET_FS_PLAN_request (client, 0, request)) &&
+ (stats != NULL))
+ stats->change (stat_gap_client_query_injected, 1);
+ if (anonymityLevel == 0)
+ {
+ request->last_dht_get = GNUNET_get_time ();
+ request->dht_back_off = GNUNET_GAP_MAX_DHT_DELAY;
+ }
+ GNUNET_mutex_unlock (GNUNET_FS_lock);
+ if (anonymityLevel == 0)
+ GNUNET_FS_DV_DHT_execute_query (type, query);
+}
+
+/**
+ * A client is asking us to stop running a query (without disconnect).
+ */
+int
+GNUNET_DV_FS_QUERYMANAGER_stop_query (const GNUNET_HashCode * query,
+ unsigned int key_count,
+ unsigned int anonymityLevel,
+ unsigned int type,
+ struct GNUNET_ClientHandle *client)
+{
+ struct ClientDataList *cl;
+ struct ClientDataList *cprev;
+ struct RequestList *pos;
+ struct RequestList *rprev;
+
+ GNUNET_mutex_lock (GNUNET_FS_lock);
+ cl = clients;
+ cprev = NULL;
+ while ((cl != NULL) && (cl->client != client))
+ {
+ cprev = cl;
+ cl = cl->next;
+ }
+ if (cl == NULL)
+ {
+ GNUNET_mutex_unlock (GNUNET_FS_lock);
+ return GNUNET_SYSERR;
+ }
+ rprev = NULL;
+ pos = cl->requests;
+ while (pos != NULL)
+ {
+ if ((pos->type == type) &&
+ (pos->key_count == key_count) &&
+ (0 == memcmp (query,
+ &pos->queries[0],
+ sizeof (GNUNET_HashCode) * key_count)) &&
+ (pos->anonymityLevel == anonymityLevel))
+ break;
+ rprev = pos;
+ pos = pos->next;
+ }
+ if (pos == NULL)
+ {
+ GNUNET_mutex_unlock (GNUNET_FS_lock);
+ return GNUNET_SYSERR;
+ }
+ if (cl->request_tail == pos)
+ cl->request_tail = rprev;
+ if (rprev == NULL)
+ cl->requests = pos->next;
+ else
+ rprev->next = pos->next;
+ GNUNET_FS_SHARED_free_request_list (pos);
+ if (stats != NULL)
+ stats->change (stat_gap_client_query_tracked, -1);
+ if (cl->requests == NULL)
+ {
+ if (cl == clients_tail)
+ clients_tail = cprev;
+ if (cprev == NULL)
+ clients = cl->next;
+ else
+ cprev->next = cl->next;
+ GNUNET_free (cl);
+ }
+ GNUNET_mutex_unlock (GNUNET_FS_lock);
+ return GNUNET_OK;
+}
+
+struct IteratorClosure
+{
+ struct GNUNET_BloomFilter *filter;
+ int mingle_number;
+};
+
+/**
+ * Iterator over Map hash codes.
+ *
+ * @param arg pointer to a location where we
+ * have our current index into the linked list.
+ * @return GNUNET_YES if we have more,
+ * GNUNET_NO if this is the last entry
+ */
+static int
+response_bf_iterator (const GNUNET_HashCode * key, void *value, void *arg)
+{
+ struct IteratorClosure *cls = arg;
+ GNUNET_HashCode n;
+
+ GNUNET_FS_HELPER_mingle_hash (key, cls->mingle_number, &n);
+ GNUNET_bloomfilter_add (cls->filter, &n);
+ return GNUNET_YES;
+}
+
+/**
+ * We got a response for a client request.
+ * Check if we have seen this response already.
+ * If not, check if it truly matches (namespace!).
+ * If so, transmit to client and update response
+ * lists and bloomfilter accordingly.
+ *
+ * @param value how much is this response worth to us?
+ * the function should increment value accordingly
+ * @return GNUNET_OK if this was the last response
+ * and we should remove the request entry.
+ * GNUNET_NO if we should continue looking
+ * GNUNET_SYSERR on serious errors
+ */
+static int
+handle_response (PID_INDEX sender,
+ struct GNUNET_ClientHandle *client,
+ struct RequestList *rl,
+ const GNUNET_HashCode * primary_key,
+ GNUNET_CronTime expirationTime,
+ unsigned int size, const GNUNET_EC_DBlock * data,
+ unsigned int *value)
+{
+ struct IteratorClosure ic;
+ CS_fs_reply_content_MESSAGE *msg;
+ GNUNET_HashCode hc;
+ int ret;
+ unsigned int bf_size;
+
+ /* check that content matches query */
+ ret = GNUNET_FS_SHARED_test_valid_new_response (rl,
+ primary_key,
+ size, data, &hc);
+ if (ret != GNUNET_OK)
+ return ret;
+ if (sender == 0) /* dht produced response */
+ rl->dht_back_off = GNUNET_GAP_MAX_DHT_DELAY; /* go back! */
+ /* send to client */
+ msg = GNUNET_malloc (sizeof (CS_fs_reply_content_MESSAGE) + size);
+ msg->header.size = htons (sizeof (CS_fs_reply_content_MESSAGE) + size);
+ msg->header.type = htons (GNUNET_CS_PROTO_GAP_RESULT);
+ msg->anonymity_level = htonl (0); /* unknown */
+ msg->expiration_time = GNUNET_htonll (expirationTime);
+ memcpy (&msg[1], data, size);
+ ret = coreAPI->cs_send_message (client,
+ &msg->header,
+ (rl->type != GNUNET_ECRS_BLOCKTYPE_DATA)
+ ? GNUNET_NO : GNUNET_YES);
+ GNUNET_free (msg);
+ if (ret != GNUNET_OK)
+ return GNUNET_NO;
+ if (stats != NULL)
+ stats->change (stat_gap_client_response_sent, 1);
+
+ /* update *value */
+ *value += 1 + rl->value;
+ GNUNET_FS_PLAN_success (sender, client, 0, rl);
+
+ if (rl->type == GNUNET_ECRS_BLOCKTYPE_DATA)
+ return GNUNET_OK; /* the end */
+
+ /* update bloom filter */
+ rl->bloomfilter_entry_count++;
+ bf_size = compute_bloomfilter_size (rl->bloomfilter_entry_count);
+ if (rl->bloomfilter == NULL)
+ {
+ rl->bloomfilter_mutator
+ = GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, -1);
+ rl->bloomfilter_size = bf_size;
+ rl->bloomfilter = GNUNET_bloomfilter_init (NULL,
+ NULL,
+ rl->bloomfilter_size,
+ GNUNET_GAP_BLOOMFILTER_K);
+ if (stats != NULL)
+ stats->change (stat_gap_client_bf_updates, 1);
+ }
+ else if (rl->bloomfilter_size != bf_size)
+ {
+ rl->bloomfilter_mutator
+ = GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, -1);
+ GNUNET_bloomfilter_free (rl->bloomfilter);
+ rl->bloomfilter =
+ GNUNET_bloomfilter_init (NULL,
+ NULL, bf_size, GNUNET_GAP_BLOOMFILTER_K);
+ ic.filter = rl->bloomfilter;
+ ic.mingle_number = rl->bloomfilter_mutator;
+ if (rl->responses != NULL)
+ GNUNET_multi_hash_map_iterate (rl->responses,
+ &response_bf_iterator, &ic);
+ if (stats != NULL)
+ stats->change (stat_gap_client_bf_updates, 1);
+ }
+ GNUNET_FS_SHARED_mark_response_seen (&hc, rl);
+
+ /* we want more */
+ return GNUNET_NO;
+}
+
+/**
+ * Handle the given response (by forwarding it to
+ * other peers as necessary).
+ *
+ * @param sender who send the response (good too know
+ * for future routing decisions)
+ * @param primary_query hash code used for lookup
+ * (note that namespace membership may
+ * require additional verification that has
+ * not yet been performed; checking the
+ * signature has already been done)
+ * @param size size of the data
+ * @param data the data itself (a GNUNET_EC_DBlock)
+ * @return how much was this content worth to us?
+ */
+unsigned int
+GNUNET_DV_FS_QUERYMANAGER_handle_response (const GNUNET_PeerIdentity * sender,
+ const GNUNET_HashCode * primary_query,
+ GNUNET_CronTime expirationTime,
+ unsigned int size,
+ const GNUNET_EC_DBlock * data)
+{
+ struct ClientDataList *cl;
+ struct RequestList *rl;
+ struct RequestList *prev;
+ unsigned int value;
+ PID_INDEX rid;
+
+ rid = GNUNET_FS_PT_intern (sender);
+ GNUNET_mutex_lock (GNUNET_FS_lock);
+ value = 0;
+ cl = clients;
+ while (cl != NULL)
+ {
+ rl = cl->requests;
+ prev = NULL;
+ while (rl != NULL)
+ {
+ if (GNUNET_OK ==
+ handle_response (rid,
+ cl->client,
+ rl,
+ primary_query,
+ expirationTime, size, data, &value))
+ {
+ if (prev != NULL)
+ prev->next = rl->next;
+ else
+ cl->requests = rl->next;
+ if (rl == cl->request_tail)
+ cl->request_tail = prev;
+ GNUNET_FS_SHARED_free_request_list (rl);
+ if (stats != NULL)
+ stats->change (stat_gap_client_query_tracked, -1);
+ if (prev == NULL)
+ rl = cl->requests;
+ else
+ rl = prev->next;
+ }
+ else
+ {
+ prev = rl;
+ rl = rl->next;
+ }
+ }
+ cl = cl->next;
+ }
+
+ GNUNET_mutex_unlock (GNUNET_FS_lock);
+ GNUNET_FS_PT_change_rc (rid, -1);
+ return value;
+}
+
+/**
+ * Method called whenever a given client disconnects.
+ * Frees all of the associated data structures.
+ */
+static void
+handle_client_exit (struct GNUNET_ClientHandle *client)
+{
+ struct ClientDataList *cl;
+ struct ClientDataList *prev;
+ struct RequestList *rl;
+
+ GNUNET_mutex_lock (GNUNET_FS_lock);
+ cl = clients;
+ prev = NULL;
+ while ((cl != NULL) && (cl->client != client))
+ {
+ prev = cl;
+ cl = cl->next;
+ }
+ if (cl == clients_tail)
+ clients_tail = prev;
+ if (cl != NULL)
+ {
+ while (cl->requests != NULL)
+ {
+ rl = cl->requests;
+ cl->requests = rl->next;
+ GNUNET_FS_SHARED_free_request_list (rl);
+ if (stats != NULL)
+ stats->change (stat_gap_client_query_tracked, -1);
+ }
+ if (prev == NULL)
+ clients = cl->next;
+ else
+ prev->next = cl->next;
+ GNUNET_free (cl);
+ }
+ GNUNET_mutex_unlock (GNUNET_FS_lock);
+}
+
+
+struct HMClosure
+{
+ struct RequestList *request;
+ unsigned int processed;
+ int have_more;
+};
+
+/**
+ * Any response that we get should be passed
+ * back to the client. If the response is unique,
+ * we should about the iteration (return GNUNET_SYSERR).
+ */
+static int
+have_more_processor (const GNUNET_HashCode * key,
+ const GNUNET_DatastoreValue *
+ value, void *closure, unsigned long long uid)
+{
+ struct HMClosure *cls = closure;
+ GNUNET_HashCode hc;
+ int ret;
+
+ ret = GNUNET_FS_HELPER_send_to_client (coreAPI,
+ key, value,
+ cls->request->response_client,
+ cls->request, &hc);
+ if (ret != GNUNET_OK)
+ {
+ /* client can take no more right now */
+ cls->have_more = GNUNET_YES;
+ return ret; /* NO: delete, SYSERR: abort */
+ }
+ GNUNET_FS_SHARED_mark_response_seen (&hc, cls->request);
+ cls->processed++;
+ if (cls->processed > GNUNET_GAP_MAX_ASYNC_PROCESSED)
+ {
+ cls->have_more = GNUNET_YES;
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Cron-job to periodically check if we should
+ * repeat requests.
+ */
+static void
+repeat_requests_job (void *unused)
+{
+ struct HMClosure hmc;
+ struct ClientDataList *client;
+ struct RequestList *request;
+ struct RequestList *prev;
+ GNUNET_CronTime now;
+
+ GNUNET_mutex_lock (GNUNET_FS_lock);
+ if (clients == NULL)
+ {
+ GNUNET_mutex_unlock (GNUNET_FS_lock);
+ return;
+ }
+ now = GNUNET_get_time ();
+ client = clients;
+ if (clients_tail != client)
+ {
+ /* move client to tail of list */
+ GNUNET_GE_ASSERT (NULL, clients_tail->next == NULL);
+ clients = clients->next;
+ clients_tail->next = client;
+ clients_tail = client;
+ client->next = NULL;
+ }
+ request = client->requests;
+ if (request == NULL)
+ {
+ GNUNET_mutex_unlock (GNUNET_FS_lock);
+ return;
+ }
+ if (client->request_tail != request)
+ {
+ /* move request to tail of list */
+ GNUNET_GE_ASSERT (NULL, client->request_tail->next == NULL);
+ client->requests = request->next;
+ client->request_tail->next = request;
+ prev = client->request_tail;
+ client->request_tail = request;
+ request->next = NULL;
+ }
+ else
+ {
+ prev = NULL;
+ }
+ GNUNET_GE_ASSERT (NULL, request->next == NULL);
+ GNUNET_GE_ASSERT (NULL, client->request_tail->next == NULL);
+ if ((client->client != NULL) &&
+ (GNUNET_OK !=
+ coreAPI->cs_send_message_now_test (client->client,
+ GNUNET_GAP_ESTIMATED_DATA_SIZE,
+ GNUNET_NO)))
+ {
+ GNUNET_mutex_unlock (GNUNET_FS_lock);
+ return;
+ }
+ if (request->have_more > 0)
+ {
+ request->have_more--;
+ hmc.request = request;
+ hmc.processed = 0;
+ hmc.have_more = GNUNET_NO;
+
+ if (request->type == GNUNET_ECRS_BLOCKTYPE_DATA)
+ {
+ if (((1 == datastore->get (&request->queries[0], request->type,
+ &have_more_processor, &hmc)) ||
+ (1 == datastore->get (&request->queries[0],
+ GNUNET_ECRS_BLOCKTYPE_ONDEMAND,
+ &have_more_processor, &hmc))) &&
+ (hmc.have_more == GNUNET_NO))
+ {
+ if (prev == NULL)
+ {
+ client->request_tail = NULL;
+ client->requests = NULL;
+ }
+ else
+ {
+ prev->next = NULL;
+ if (client->request_tail == request)
+ client->request_tail = prev;
+ }
+ GNUNET_FS_SHARED_free_request_list (request);
+ if (stats != NULL)
+ stats->change (stat_gap_client_query_tracked, -1);
+ }
+ }
+ else
+ {
+ datastore->get (&request->queries[0], request->type,
+ &have_more_processor, &hmc);
+ }
+ if (hmc.have_more)
+ request->have_more += GNUNET_GAP_HAVE_MORE_INCREMENT;
+ }
+ else
+ {
+ if ((NULL == request->plan_entries) &&
+ ((client->client != NULL) ||
+ (request->expiration > now)) &&
+ (request->last_ttl_used * GNUNET_CRON_SECONDS +
+ request->last_request_time < now))
+ {
+ if ((GNUNET_OK ==
+ GNUNET_FS_PLAN_request (client->client, 0, request))
+ && (stats != NULL))
+ stats->change (stat_gap_client_query_injected, 1);
+ }
+
+ if ((request->anonymityLevel == 0) &&
+ (request->last_dht_get + request->dht_back_off < now))
+ {
+ if (request->dht_back_off * 2 > request->dht_back_off)
+ request->dht_back_off *= 2;
+ request->last_dht_get = now;
+ GNUNET_FS_DV_DHT_execute_query (request->type, &request->queries[0]);
+ }
+ }
+ GNUNET_mutex_unlock (GNUNET_FS_lock);
+}
+
+int
+GNUNET_DV_FS_QUERYMANAGER_init (GNUNET_CoreAPIForPlugins * capi)
+{
+ coreAPI = capi;
+ GNUNET_GE_ASSERT (capi->ectx,
+ GNUNET_SYSERR !=
+ capi->cs_disconnect_handler_register
+ (&handle_client_exit));
+ datastore = capi->service_request ("datastore");
+ stats = capi->service_request ("stats");
+ if (stats != NULL)
+ {
+ stat_gap_client_query_received =
+ stats->create (gettext_noop ("# gap client queries received"));
+ stat_gap_client_response_sent =
+ stats->create (gettext_noop ("# gap replies sent to clients"));
+ stat_gap_client_query_tracked =
+ stats->create (gettext_noop ("# gap client requests tracked"));
+ stat_gap_client_query_injected =
+ stats->create (gettext_noop ("# gap client requests injected"));
+ stat_gap_client_bf_updates =
+ stats->create (gettext_noop
+ ("# gap query bloomfilter resizing updates"));
+ }
+ GNUNET_cron_add_job (capi->cron,
+ &repeat_requests_job,
+ CHECK_REPEAT_FREQUENCY, CHECK_REPEAT_FREQUENCY, NULL);
+ return 0;
+}
+
+int
+GNUNET_DV_FS_QUERYMANAGER_done ()
+{
+ GNUNET_cron_del_job (coreAPI->cron,
+ &repeat_requests_job, CHECK_REPEAT_FREQUENCY, NULL);
+ GNUNET_GE_ASSERT (coreAPI->ectx,
+ GNUNET_SYSERR !=
+ coreAPI->cs_disconnect_handler_unregister
+ (&handle_client_exit));
+ while (clients != NULL)
+ handle_client_exit (clients->client);
+ coreAPI->service_release (datastore);
+ datastore = NULL;
+ if (stats != NULL)
+ {
+ coreAPI->service_release (stats);
+ stats = NULL;
+ }
+ return 0;
+}
+
+/* end of dv_querymanager.c */
Added: GNUnet/src/applications/fs/gap/dv_querymanager.h
===================================================================
--- GNUnet/src/applications/fs/gap/dv_querymanager.h
(rev 0)
+++ GNUnet/src/applications/fs/gap/dv_querymanager.h 2009-10-01 18:10:27 UTC
(rev 9053)
@@ -0,0 +1,90 @@
+/*
+ This file is part of GNUnet
+ (C) 2001, 2002, 2003, 2004, 2005, 2006, 2008 Christian Grothoff (and
other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file fs/gap/dv_querymanager.h
+ * @brief management of queries from our clients
+ * @author Christian Grothoff, Nathan Evans
+ */
+#ifndef DV_QUERYMANAGER_H
+#define DV_QUERYMANAGER_H
+
+#include "gnunet_util.h"
+#include "gnunet_core.h"
+#include "ecrs_core.h"
+#include "shared.h"
+
+int GNUNET_DV_FS_QUERYMANAGER_init (GNUNET_CoreAPIForPlugins * capi);
+
+int GNUNET_DV_FS_QUERYMANAGER_done (void);
+
+
+/**
+ * A client is asking us to run a query. The query should be issued
+ * until either a unique response has been obtained, the client
+ * requests us to stop or until the client disconnects.
+ *
+ * @param target peer known to have the content, maybe NULL.
+ * @param have_more do we have more results in our local datastore?
+ */
+void
+GNUNET_DV_FS_QUERYMANAGER_start_query (const GNUNET_HashCode * query,
+ unsigned int key_count,
+ unsigned int anonymityLevel,
+ unsigned int type,
+ struct GNUNET_ClientHandle *client,
+ const GNUNET_PeerIdentity * target,
+ const struct GNUNET_MultiHashMap *seen,
+ int have_more);
+
+/**
+ * A client is asking us to stop running a query (without disconnect).
+ */
+int
+GNUNET_DV_FS_QUERYMANAGER_stop_query (const GNUNET_HashCode * query,
+ unsigned int key_count,
+ unsigned int anonymityLevel,
+ unsigned int type,
+ struct GNUNET_ClientHandle *client);
+
+/**
+ * Handle the given response (by forwarding it to
+ * other peers as necessary).
+ *
+ * @param sender who send the response (good too know
+ * for future routing decisions)
+ * @param primary_query hash code used for lookup
+ * (note that namespace membership may
+ * require additional verification that has
+ * not yet been performed; checking the
+ * signature has already been done)
+ * @param size size of the data
+ * @param data the data itself (a GNUNET_EC_DBlock)
+ * @return how much was this content worth to us?
+ */
+unsigned int
+GNUNET_DV_FS_QUERYMANAGER_handle_response (const GNUNET_PeerIdentity * sender,
+ const GNUNET_HashCode * primary_query,
+ GNUNET_CronTime expirationTime,
+ unsigned int size,
+ const GNUNET_EC_DBlock * data);
+
+
+#endif
Added: GNUnet/src/applications/fs/gap/fs_dv_dht.c
===================================================================
--- GNUnet/src/applications/fs/gap/fs_dv_dht.c (rev 0)
+++ GNUnet/src/applications/fs/gap/fs_dv_dht.c 2009-10-01 18:10:27 UTC (rev
9053)
@@ -0,0 +1,291 @@
+/*
+ This file is part of GNUnet
+ (C) 2001 - 2009 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file fs/gap/fs_dv_dht.c
+ * @brief integration of file-sharing with the DHT
+ * infrastructure
+ * @author Christian Grothoff, Nathan Evans
+ */
+
+#include "platform.h"
+#include "gnunet_dv_dht_service.h"
+#include "gnunet_sqstore_service.h"
+#include "gnunet_stats_service.h"
+#include "gnunet_protocols.h"
+#include "ecrs_core.h"
+#include "fs.h"
+#include "shared.h"
+#include "fs_dv_dht.h"
+#include "dv_querymanager.h"
+
+/**
+ * Linked list containing the DHT get handles
+ * of our active requests.
+ */
+struct ActiveRequestRecords
+{
+
+ struct ActiveRequestRecords *next;
+
+ struct GNUNET_DV_DHT_GetHandle *handle;
+
+ GNUNET_CronTime end_time;
+
+ unsigned int type;
+
+};
+
+static GNUNET_DV_DHT_ServiceAPI *dv_dht;
+
+static GNUNET_SQstore_ServiceAPI *sqstore;
+
+static GNUNET_CoreAPIForPlugins *coreAPI;
+
+static GNUNET_Stats_ServiceAPI *stats;
+
+static int stat_push_count;
+
+static struct ActiveRequestRecords *records;
+
+/**
+ * Thread that does the pushing.
+ */
+static struct GNUNET_ThreadHandle *thread;
+
+/**
+ * Should the thread terminate?
+ */
+static int shutdown_requested;
+
+/**
+ * Total number of entries with anonymity 0.
+ * Used to calculate how long we should wait
+ * between iterations.
+ */
+static unsigned int total;
+
+
+/**
+ * Cancel all requests with the DHT that
+ * are older than a certain time limit.
+ */
+static void
+purge_old_records (GNUNET_CronTime limit)
+{
+ struct ActiveRequestRecords *pos;
+ struct ActiveRequestRecords *prev;
+
+ prev = NULL;
+ pos = records;
+ while (pos != NULL)
+ {
+ if (pos->end_time < limit)
+ {
+ if (prev == NULL)
+ records = pos->next;
+ else
+ prev->next = pos->next;
+ dv_dht->get_stop (pos->handle);
+ GNUNET_free (pos);
+ if (prev == NULL)
+ pos = records;
+ else
+ pos = prev->next;
+ }
+ else
+ {
+ prev = pos;
+ pos = pos->next;
+ }
+ }
+}
+
+
+/**
+ * We got a result from the DHT. Check that it is valid
+ * and pass to our clients.
+ *
+ * @param key the current key
+ * @param value the current value
+ * @param cls argument passed for context (closure)
+ * @return GNUNET_OK to continue with iteration, GNUNET_SYSERR to abort
+ */
+static int
+response_callback (const GNUNET_HashCode * key,
+ unsigned int type,
+ unsigned int size, const char *value, void *cls)
+{
+ struct ActiveRequestRecords *record = cls;
+ const GNUNET_EC_DBlock *dblock;
+ GNUNET_HashCode hc;
+
+ dblock = (const GNUNET_EC_DBlock *) value;
+ if ((GNUNET_SYSERR ==
+ GNUNET_EC_file_block_check_and_get_query (size,
+ dblock,
+ GNUNET_YES,
+ &hc)) ||
+ (0 != memcmp (key, &hc, sizeof (GNUNET_HashCode))))
+ {
+ GNUNET_GE_BREAK_OP (NULL, 0);
+ return GNUNET_OK;
+ }
+ GNUNET_DV_FS_QUERYMANAGER_handle_response (NULL, &hc, 0, size, dblock);
+ if (record->type == GNUNET_ECRS_BLOCKTYPE_DATA)
+ {
+ record->end_time = 0; /* delete ASAP */
+ return GNUNET_SYSERR; /* no more! */
+ }
+ return GNUNET_OK;
+}
+
+/**
+ * Execute a GAP query. Determines where to forward
+ * the query and when (and captures state for the response).
+ * May also have to check the local datastore.
+ *
+ * @param type type of content requested
+ * @param query hash code of the query
+ */
+void
+GNUNET_FS_DV_DHT_execute_query (unsigned int type, const GNUNET_HashCode *
query)
+{
+ struct ActiveRequestRecords *record;
+ GNUNET_CronTime now;
+
+ if (dv_dht == NULL)
+ return;
+ now = GNUNET_get_time ();
+ record = GNUNET_malloc (sizeof (struct ActiveRequestRecords));
+ record->end_time = now + GNUNET_GAP_MAX_DHT_DELAY;
+ record->type = type;
+ record->handle = dv_dht->get_start (type, query, &response_callback, record);
+ if (record->handle == NULL)
+ {
+ GNUNET_free (record);
+ return; /* failed in DHT */
+ }
+ GNUNET_mutex_lock (GNUNET_FS_lock);
+ record->next = records;
+ records = record;
+ purge_old_records (now);
+ GNUNET_mutex_unlock (GNUNET_FS_lock);
+}
+
+/**
+ * Callback invoked on zero-anonymity content
+ * (used to push that content into the DHT).
+ */
+static int
+push_callback (const GNUNET_HashCode * key,
+ const GNUNET_DatastoreValue * value, void *closure,
+ unsigned long long uid)
+{
+ GNUNET_CronTime delay;
+
+ if (GNUNET_YES == shutdown_requested)
+ return GNUNET_SYSERR;
+ /* try pushing out everything every 6h,
+ but do not push more often than every 5s */
+ delay = 6 * GNUNET_CRON_HOURS / total;
+ if (delay < 5 * GNUNET_CRON_SECONDS)
+ delay = 5 * GNUNET_CRON_SECONDS;
+ if (delay > 60 * GNUNET_CRON_SECONDS)
+ delay = 60 * GNUNET_CRON_SECONDS;
+ GNUNET_thread_sleep (delay);
+ if (GNUNET_YES == shutdown_requested)
+ return GNUNET_SYSERR;
+
+ dv_dht->put (key,
+ ntohl (value->type),
+ ntohl (value->size) - sizeof (GNUNET_DatastoreValue),
+ (const char *) &value[1]);
+ if (stats != NULL)
+ stats->change (stat_push_count, 1);
+ if (GNUNET_YES == shutdown_requested)
+ return GNUNET_SYSERR;
+ return GNUNET_OK;
+}
+
+/**
+ * Main method of the thread responsible for pushing
+ * out the content.
+ */
+static void *
+push_thread (void *cls)
+{
+ while ((shutdown_requested == GNUNET_NO) &&
+ (dv_dht != NULL) && (sqstore != NULL))
+ {
+ if (total == 0)
+ total = 1;
+ total = sqstore->iterateNonAnonymous (0, &push_callback, NULL);
+ if ((shutdown_requested == GNUNET_NO) && (total == 0))
+ GNUNET_thread_sleep (5 * GNUNET_CRON_MINUTES);
+ }
+ return NULL;
+}
+
+
+int
+GNUNET_FS_DV_DHT_init (GNUNET_CoreAPIForPlugins * capi)
+{
+ coreAPI = capi;
+ dv_dht = capi->service_request ("dv_dht");
+ sqstore = capi->service_request ("sqstore");
+ stats = capi->service_request ("stats");
+ if (stats != NULL)
+ stat_push_count
+ = stats->create (gettext_noop ("# blocks pushed into DHT"));
+ if ((dv_dht != NULL) && (sqstore != NULL))
+ {
+ shutdown_requested = GNUNET_NO;
+ thread = GNUNET_thread_create (&push_thread, NULL, 1024 * 128);
+ }
+ return 0;
+}
+
+int
+GNUNET_FS_DV_DHT_done ()
+{
+ void *unused;
+
+ purge_old_records (-1);
+ if (thread != NULL)
+ {
+ shutdown_requested = GNUNET_YES;
+ GNUNET_thread_stop_sleep (thread);
+ GNUNET_thread_join (thread, &unused);
+ }
+ if (stats != NULL)
+ {
+ coreAPI->service_release (stats);
+ stats = NULL;
+ }
+ if (dv_dht != NULL)
+ coreAPI->service_release (dv_dht);
+ dv_dht = NULL;
+ if (sqstore != NULL)
+ coreAPI->service_release (sqstore);
+ sqstore = NULL;
+ coreAPI = NULL;
+ return 0;
+}
Added: GNUnet/src/applications/fs/gap/fs_dv_dht.h
===================================================================
--- GNUnet/src/applications/fs/gap/fs_dv_dht.h (rev 0)
+++ GNUnet/src/applications/fs/gap/fs_dv_dht.h 2009-10-01 18:10:27 UTC (rev
9053)
@@ -0,0 +1,48 @@
+/*
+ This file is part of GNUnet
+ (C) 2001 - 2009 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file fs/gap/fs_dv_dht.h
+ * @brief integration of file-sharing with the DV_DHT
+ * infrastructure
+ * @author Christian Grothoff, Nathan Evans
+ */
+#ifndef FS_DV_DHT_H
+#define FS_DV_DHT_H
+
+#include "gnunet_util.h"
+
+int GNUNET_FS_DV_DHT_init (GNUNET_CoreAPIForPlugins * capi);
+
+int GNUNET_FS_DV_DHT_done (void);
+
+/**
+ * Execute a GAP query. Determines where to forward
+ * the query and when (and captures state for the response).
+ * May also have to check the local datastore.
+ *
+ * @param type type of content requested
+ * @param query hash code of the query
+ */
+void
+GNUNET_FS_DV_DHT_execute_query (unsigned int type,
+ const GNUNET_HashCode * query);
+
+#endif
Added: GNUnet/src/applications/fs/gap/test_multi_results_dv.c
===================================================================
--- GNUnet/src/applications/fs/gap/test_multi_results_dv.c
(rev 0)
+++ GNUnet/src/applications/fs/gap/test_multi_results_dv.c 2009-10-01
18:10:27 UTC (rev 9053)
@@ -0,0 +1,227 @@
+/*
+ This file is part of GNUnet.
+ (C) 2005, 2006, 2007, 2008 Christian Grothoff (and other contributing
authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+*/
+
+/**
+ * @file applications/gap/test_multi_results.c
+ * @brief GAP routing testcase, linear topology
+ * @author Christian Grothoff
+ */
+
+#include "platform.h"
+#include "gnunet_protocols.h"
+#include "gnunet_ecrs_lib.h"
+#include "gnunet_testing_lib.h"
+#include "gnunet_stats_lib.h"
+#include "gnunet_util.h"
+#include "gnunet_stats_lib.h"
+#include "../ecrs/ecrs.h"
+
+#define START_PEERS 1
+#define DEBUG 1
+#define PEER_COUNT 2
+
+/**
+ * How many search results are there?
+ */
+#define TOTAL 40
+
+static struct GNUNET_GE_Context *ectx;
+
+static struct GNUNET_GC_Configuration *cfg;
+
+static struct GNUNET_ECRS_URI *uris[TOTAL];
+
+static struct GNUNET_ECRS_URI *key;
+
+static unsigned int found;
+
+static int
+testTerminate (void *unused)
+{
+ /* wait for us to find 90% */
+ return (found > (TOTAL * 90) / 100) ? GNUNET_SYSERR : GNUNET_OK;
+}
+
+static char *
+makeName (unsigned int i)
+{
+ char *fn;
+
+ fn = GNUNET_malloc (strlen ("/tmp/gaptest/GAPTEST") + 14);
+ GNUNET_snprintf (fn,
+ strlen ("/tmp/gaptest/GAPTEST") + 14,
+ "/tmp/gaptest/GAPTEST%u", i);
+ GNUNET_disk_directory_create_for_file (NULL, fn);
+ return fn;
+}
+
+static struct GNUNET_ECRS_URI *
+uploadFile (int size)
+{
+ int ret;
+ char *name;
+ int fd;
+ char *buf;
+ struct GNUNET_ECRS_URI *uri;
+
+ name = makeName (size);
+ fd =
+ GNUNET_disk_file_open (ectx, name, O_WRONLY | O_CREAT, S_IWUSR | S_IRUSR);
+ if (fd == -1)
+ {
+ GNUNET_free (name);
+ return NULL;
+ }
+ buf = GNUNET_malloc (size);
+ memset (buf, size % 255, size);
+ WRITE (fd, buf, size);
+ GNUNET_free (buf);
+ GNUNET_disk_file_close (ectx, name, fd);
+ ret = GNUNET_ECRS_file_upload (ectx, cfg, name, GNUNET_YES, /* index */
+ 1, /* anonymous */
+ 0, /* priority */
+ GNUNET_get_time () + 100 *
GNUNET_CRON_MINUTES, /* expire */
+ NULL, NULL, &testTerminate, NULL, &uri);
+ if (ret != GNUNET_SYSERR)
+ {
+ struct GNUNET_MetaData *meta;
+
+ meta = GNUNET_meta_data_create ();
+ ret = GNUNET_ECRS_publish_under_keyword (ectx, cfg, key, 0, 0,
GNUNET_get_time () + 100 * GNUNET_CRON_MINUTES, /* expire */
+ uri, meta);
+ GNUNET_meta_data_destroy (meta);
+ GNUNET_free (name);
+ if (ret == GNUNET_OK)
+ return uri;
+ GNUNET_ECRS_uri_destroy (uri);
+ return NULL;
+ }
+ else
+ {
+ GNUNET_ECRS_uri_destroy (uri);
+ GNUNET_free (name);
+ return NULL;
+ }
+}
+
+static int
+searchCB (const GNUNET_ECRS_FileInfo * fi,
+ const GNUNET_HashCode * key, int isRoot, void *closure)
+{
+ int i;
+
+#if DEBUG
+ if (fi->uri->type == loc)
+ fprintf(stdout, "Got location information from search, great!\n");
+#endif
+
+ for (i = 0; i < TOTAL; i++)
+ {
+ if ((uris[i] != NULL) &&
+ (GNUNET_ECRS_uri_test_equal (uris[i], fi->uri)))
+ {
+ GNUNET_ECRS_uri_destroy (uris[i]);
+ uris[i] = NULL;
+ found++;
+ fprintf (stderr, ".");
+ return GNUNET_OK;
+ }
+ }
+ return GNUNET_OK;
+}
+
+#define CHECK(a) if (!(a)) { ret = 1; GNUNET_GE_BREAK(ectx, 0); goto FAILURE; }
+
+/**
+ * Testcase to test gap routing (2 peers only).
+ * @return 0: ok, -1: error
+ */
+int
+main (int argc, char **argv)
+{
+#if START_PEERS
+ struct GNUNET_TESTING_DaemonContext *peers;
+#endif
+ int ret;
+ int i;
+ char buf[128];
+ char *tmp;
+ ret = 0;
+ cfg = GNUNET_GC_create ();
+ if (-1 == GNUNET_GC_parse_configuration (cfg, "check.conf"))
+ {
+ GNUNET_GC_free (cfg);
+ return -1;
+ }
+#if START_PEERS
+ peers = GNUNET_TESTING_start_daemons ("tcp",
+ "advertising topology dv_fs stats",
+ "/tmp/gap-dv-multi-results-test",
+ 2087, 10, PEER_COUNT);
+ if (peers == NULL)
+ {
+ fprintf (stderr, "Failed to start the gnunetd daemons!\n");
+ GNUNET_GC_free (cfg);
+ return -1;
+ }
+#endif
+ for (i = 1; i < PEER_COUNT; i++)
+ {
+ if (GNUNET_OK != GNUNET_TESTING_connect_daemons (2077 + (10 * i),
+ 2087 + (10 * i)))
+ {
+#if START_PEERS
+ GNUNET_TESTING_stop_daemons (peers);
+#endif
+ fprintf (stderr, "Failed to connect the peers!\n");
+ GNUNET_GC_free (cfg);
+ return -1;
+ }
+ }
+ key = GNUNET_ECRS_keyword_string_to_uri (NULL, "GAPTEST");
+ fprintf (stderr, "Uploading...");
+ for (i = 0; i < TOTAL; i++)
+ {
+ uris[i] = uploadFile (i + 1);
+ CHECK (uris[i] != NULL);
+ tmp = GNUNET_ECRS_uri_to_string(uris[i]);
+ //fprintf (stderr, "URI is %s\n", tmp);
+ GNUNET_free(tmp);
+ }
+ GNUNET_thread_sleep(360 * GNUNET_CRON_SECONDS);
+ fprintf (stderr, "\nSearching...");
+ GNUNET_snprintf (buf, 128, "localhost:%u", 2077 + PEER_COUNT * 10);
+ GNUNET_GC_set_configuration_value_string (cfg, ectx, "NETWORK", "HOST",
+ buf);
+
+ GNUNET_ECRS_search (ectx,
+ cfg, key, 0, &searchCB, NULL, &testTerminate, NULL);
+ fprintf (stderr, "\n");
+ CHECK (found > (TOTAL * 90) / 100);
+FAILURE:
+#if START_PEERS
+ GNUNET_TESTING_stop_daemons (peers);
+#endif
+ GNUNET_ECRS_uri_destroy (key);
+ GNUNET_GC_free (cfg);
+ return ret;
+}
+
+/* end of test_multi_results_dv.c */
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r9053 - GNUnet/src/applications/fs/gap,
gnunet <=