[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r4062 - in GNUnet: . src/applications/dht/tools src/include
From: |
grothoff |
Subject: |
[GNUnet-SVN] r4062 - in GNUnet: . src/applications/dht/tools src/include |
Date: |
Tue, 26 Dec 2006 22:30:29 -0800 (PST) |
Author: grothoff
Date: 2006-12-26 22:30:26 -0800 (Tue, 26 Dec 2006)
New Revision: 4062
Modified:
GNUnet/src/applications/dht/tools/dht_api.c
GNUnet/src/include/dht.h
GNUnet/todo
Log:
finishing DHT tools API
Modified: GNUnet/src/applications/dht/tools/dht_api.c
===================================================================
--- GNUnet/src/applications/dht/tools/dht_api.c 2006-12-27 06:00:37 UTC (rev
4061)
+++ GNUnet/src/applications/dht/tools/dht_api.c 2006-12-27 06:30:26 UTC (rev
4062)
@@ -30,7 +30,90 @@
#include "gnunet_dht_lib.h"
#include "gnunet_util_network_client.h"
+/**
+ * Data exchanged between main thread and GET thread.
+ */
+typedef struct {
+ /**
+ * Connection with gnunetd.
+ */
+ struct ClientServerConnection * sock;
+
+ /**
+ * Callback to call for each result.
+ */
+ DataProcessor processor;
+
+ /**
+ * Extra argument for processor.
+ */
+ void * closure;
+
+ /**
+ * Parent thread that is waiting for the
+ * timeout (used to notify if we are exiting
+ * early, i.e. because of gnunetd closing the
+ * connection or the processor callback requesting
+ * it).
+ */
+ struct PTHREAD * parent;
+
+ /**
+ * Are we done (for whichever reason)?
+ */
+ int aborted;
+
+ /**
+ * Total number of results obtained, or -1 on error.
+ */
+ int total;
+} GetInfo;
+
+
+static void *
+poll_thread(void * cls) {
+ GetInfo * info = cls;
+ MESSAGE_HEADER * reply;
+ CS_dht_request_put_MESSAGE * put;
+ DataContainer * cont;
+ unsigned short size;
+
+ while (info->aborted == NO) {
+ if (connection_test_open(info->sock) == 0)
+ break;
+ reply = NULL;
+ if (OK != connection_read(info->sock,
+ &reply))
+ break;
+ if ( (sizeof(CS_dht_request_put_MESSAGE) > ntohs(reply->size)) ||
+ (CS_PROTO_dht_REQUEST_PUT != ntohs(reply->type)) ) {
+ GE_BREAK(NULL, 0);
+ info->total = SYSERR;
+ break; /* invalid reply */
+ }
+
+ put = (CS_dht_request_put_MESSAGE*) reply;
+ /* re-use "expire" field of the reply (which is 0 anyway)
+ for the header of DataContainer (which fits) to avoid
+ copying -- go C pointer arithmetic! */
+ cont = (DataContainer*) &((char *) &put[1])[-sizeof(DataContainer)];
+ size = ntohs(reply->size) - sizeof(CS_dht_request_put_MESSAGE);
+ cont->size = htonl(size + sizeof(DataContainer));
+ if ( (info->processor != NULL) &&
+ (OK != info->processor(&put->key,
+ cont,
+ info->closure)) )
+ info->aborted = YES;
+ info->total++;
+ FREE(reply);
+ }
+ info->aborted = YES;
+ PTHREAD_STOP_SLEEP(info->parent);
+ return NULL;
+}
+
+
/**
* Perform a synchronous GET operation on the DHT identified by
* 'table' using 'key' as the key; store the result in 'result'. If
@@ -59,7 +142,12 @@
void * closure) {
struct ClientServerConnection * sock;
CS_dht_request_get_MESSAGE req;
- int ret;
+ struct PTHREAD * thread;
+ cron_t start;
+ cron_t now;
+ cron_t delta;
+ GetInfo info;
+ void * unused;
sock = client_connection_create(ectx,
cfg);
@@ -75,50 +163,31 @@
connection_destroy(sock);
return SYSERR;
}
-#if 0
- while (1) {
- reply = NULL;
- if (OK != connection_read(sock,
- &reply)) {
- connection_destroy(sock);
- return SYSERR;
- }
- if ( (sizeof(CS_dht_reply_ack_MESSAGE) == ntohs(reply->size)) &&
- (CS_PROTO_dht_REPLY_ACK == ntohs(reply->type)) ) {
- connection_destroy(sock);
- ret = checkACK(reply);
- FREE(reply);
- break; /* termination message, end loop! */
- }
- if ( (sizeof(CS_dht_reply_results_MESSAGE) > ntohs(reply->size)) ||
- (CS_PROTO_dht_REPLY_GET != ntohs(reply->type)) ) {
- GE_LOG(ectx,
- GE_WARNING | GE_BULK | GE_USER,
- _("Unexpected reply to `%s' operation.\n"),
- "GET");
- connection_destroy(sock);
- FREE(reply);
- return SYSERR;
- }
- /* ok, we got some replies! */
- res = (CS_dht_reply_results_MESSAGE*) reply;
- ret = ntohl(res->totalResults);
-
- size = ntohs(reply->size) - sizeof(CS_dht_reply_results_MESSAGE);
- result = MALLOC(size + sizeof(DataContainer));
- result->size = htonl(size + sizeof(DataContainer));
- memcpy(&result[1],
- &res[1],
- size);
- FREE(reply);
- processor(&keys[0],
- result,
- closure);
- FREE(result);
+ info.sock = sock;
+ info.processor = processor;
+ info.closure = closure;
+ info.parent = PTHREAD_GET_SELF();
+ info.aborted = NO;
+ info.total = 0;
+ thread = PTHREAD_CREATE(&poll_thread,
+ &info,
+ 1024 * 8);
+ start = get_time();
+ while ( (start + timeout > (now = get_time())) &&
+ (GNUNET_SHUTDOWN_TEST() == NO) &&
+ (info.aborted == NO) ) {
+ delta =(start + timeout) - now;
+ if (delta > 100 * cronMILLIS)
+ delta = 100 * cronMILLIS; /* in case we miss SIGINT
+ on CTRL-C */
+ PTHREAD_SLEEP(delta);
}
-#endif
+ info.aborted = YES;
+ connection_close_forever(sock);
+ PTHREAD_JOIN(thread, &unused);
+ PTHREAD_REL_SELF(info.parent);
connection_destroy(sock);
- return ret;
+ return info.total;
}
/**
Modified: GNUnet/src/include/dht.h
===================================================================
--- GNUnet/src/include/dht.h 2006-12-27 06:00:37 UTC (rev 4061)
+++ GNUnet/src/include/dht.h 2006-12-27 06:30:26 UTC (rev 4062)
@@ -54,10 +54,10 @@
unsigned int type; /* nbo */
+ HashCode512 key;
+
unsigned long long expire; /* nbo */
- HashCode512 key;
-
} CS_dht_request_put_MESSAGE;
/**
@@ -71,10 +71,10 @@
unsigned int type; /* nbo */
+ HashCode512 key;
+
unsigned long long timeout; /* nbo */
- HashCode512 key;
-
} CS_dht_request_get_MESSAGE;
#if 0 /* keep Emacsens' auto-indent happy */
Modified: GNUnet/todo
===================================================================
--- GNUnet/todo 2006-12-27 06:00:37 UTC (rev 4061)
+++ GNUnet/todo 2006-12-27 06:30:26 UTC (rev 4062)
@@ -31,14 +31,11 @@
0.7.2 [3'07]:
- new features:
* XFS / support for location URIs [CG]
- + dht/tools/api: complete API get implementation [RC]
+ dht/gap integration [RC]
+ ecrs/location URIs [RC]
+ fsui/location URI support [RC]
+ dstore bloomfilter (optimization)
* HTTP transport (libcurl, libmicrohttpd)
- * SMTP transport (libesmtp)
- * SMTP logger
- minor improvements:
* Scheme (scm) specification of entire configuration;
check options used in modules:
@@ -63,6 +60,9 @@
as keyword (to allow getting meta-data from URI only)
- Chat support basics [RC]
- better NAT traversal (STUN/STUNT inspired?)
+- old/new features:
+ * SMTP transport (libesmtp)
+ * SMTP logger
- Documentation:
* LJ article
- Testcases:
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r4062 - in GNUnet: . src/applications/dht/tools src/include,
grothoff <=