[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r21020 - in gnunet-java: . src/org/gnunet/construct src/org
From: |
gnunet |
Subject: |
[GNUnet-SVN] r21020 - in gnunet-java: . src/org/gnunet/construct src/org/gnunet/construct/parsers src/org/gnunet/core src/org/gnunet/dht src/org/gnunet/nse src/org/gnunet/statistics src/org/gnunet/util |
Date: |
Thu, 19 Apr 2012 11:54:09 +0200 |
Author: dold
Date: 2012-04-19 11:54:09 +0200 (Thu, 19 Apr 2012)
New Revision: 21020
Removed:
gnunet-java/src/org/gnunet/construct/Integer.java
gnunet-java/src/org/gnunet/util/IOContinuation.java
Modified:
gnunet-java/.classpath
gnunet-java/ISSUES
gnunet-java/src/org/gnunet/construct/Construct.java
gnunet-java/src/org/gnunet/construct/FrameSize.java
gnunet-java/src/org/gnunet/construct/MessageLoader.java
gnunet-java/src/org/gnunet/construct/MsgMap.txt
gnunet-java/src/org/gnunet/construct/ReflectUtil.java
gnunet-java/src/org/gnunet/construct/parsers/NestedParser.java
gnunet-java/src/org/gnunet/core/Core.java
gnunet-java/src/org/gnunet/dht/DistributedHashTable.java
gnunet-java/src/org/gnunet/nse/NetworkSizeEstimation.java
gnunet-java/src/org/gnunet/statistics/Statistics.java
gnunet-java/src/org/gnunet/util/Client.java
gnunet-java/src/org/gnunet/util/GnunetMessage.java
gnunet-java/src/org/gnunet/util/MessageTransmitter.java
gnunet-java/src/org/gnunet/util/PeerIdentity.java
gnunet-java/src/org/gnunet/util/Program.java
gnunet-java/src/org/gnunet/util/Resolver.java
gnunet-java/src/org/gnunet/util/Scheduler.java
gnunet-java/src/org/gnunet/util/Server.java
gnunet-java/src/org/gnunet/util/Strings.java
Log:
core now working, several fixes in construct, started implementing the
server/service
Modified: gnunet-java/.classpath
===================================================================
--- gnunet-java/.classpath 2012-04-19 08:53:46 UTC (rev 21019)
+++ gnunet-java/.classpath 2012-04-19 09:54:09 UTC (rev 21020)
@@ -11,7 +11,7 @@
<classpathentry kind="con"
path="org.eclipse.jdt.USER_LIBRARY/commons-io-2.1"/>
<classpathentry kind="lib" path="lib/log4j-1.2.16.jar"/>
<classpathentry kind="lib" path="lib/slf4j-log4j12-1.6.4.jar"/>
- <classpathentry kind="lib"
path="/home/dold/gnunet-java/lib/commons-io-2.1.jar"/>
+ <classpathentry kind="lib" path="lib/commons-io-2.1.jar"/>
<classpathentry kind="con"
path="org.eclipse.jdt.USER_LIBRARY/commons-io-2.2"/>
<classpathentry kind="output" path="build"/>
</classpath>
Modified: gnunet-java/ISSUES
===================================================================
--- gnunet-java/ISSUES 2012-04-19 08:53:46 UTC (rev 21019)
+++ gnunet-java/ISSUES 2012-04-19 09:54:09 UTC (rev 21020)
@@ -36,8 +36,6 @@
-
-
* why do we even need transmit_ready_notify? can't we just queue the message?
* do we need to be able to send stuff that is not a message?
@@ -452,3 +450,64 @@
* when handling signals through the pipe, how is the service / server
notified?
+---------------------------------------------------------------------------------------
+* questions on https://gnunet.org/bugs/view.php?id=2277
+
+* if bufferbloat is bad, why do we keep reimplementing action queues / buffers
and in almost every
+ service api, just not on the lowest level (client)? (c.f. the ControlMessage
queue)
+
+* Construct: due to some changes in construct (esp. the addition of unions and
optional nested messages)
+ parser caching does not work correctly anymore, since parsers have now
context, and not just a
+ target class.
+
+
+* DHT:
+ * monitor: is my observation correct that MonitorMessage is used to report a
change back to the
+ client *and* to initiate a request(with some fields left at 0)? what are
the different types of monitor requests?
+ * monitor_stop is wrong / doesn't do anything to cancel the monitor request
+ * it's also used nowhere in the entire code
+
+* Scheduler (had to be touched to implement the server)
+ * where do we need prerequisite tasks?
+ * according to the source, nowhere (except two times for memory deallocation)
+ * is there ever a case where a task should depend on events on a set of FDs?
+ * the can be emulated by multiple tasks
+ * do two tasks ever wait for the same event on the same FD?
+ * wouldn't this indicate a bug most of the time?
+ * currently the above to things are implemented, but complicate/slow down the
implementation
+
+* slight design problem with PeerIdentity: data should be private final, but
is public for Construct
+ * gnunet-java doesn't really care about access modifiers anyway, see runabout
;-)
+
+* I'm still not entirely convinced that core connect needs no timout
+ * what about command line tools?
+ * they'd need an extra timeout task
+
+
+* core
+ * why is priority/timeout in SendMessageRequest and SendMessage?
+ * there is no comment indicating that SendMessage has the message to send as
trailing data
+ * is 0 the highest or lowest priority? in what range are its values typically?
+ * when can we cancel a transmit?
+ * testing/test_testing_large_topology.c calls CORE_notify_transmit_ready with
wrong number of arguments
+ (found by greping)
+ * PeerStatusNotify message unused
+
+
+* java reflection APIs have the worst error messages (and checked exceptions),
+ Construct.ReflectUtil now provides better error messages / exceptions
+
+* is it legal to write multiple messages in the callback to
Core.notifyTransmitReady?
+ * currently this is illegal in gnunet-java
+
+* --with-sqlite=PFX base of SQLite installation
+ * misleading/confusing
+
+
+* server:
+ * is there an elegent way to reuse code from Client.java in
Server.ClientHandle?
+ * GNUNET_SERVER_receive_done (struct GNUNET_SERVER_Client *client, int
success)
+ * comment about success wrong
+ * why is there the client_disconnect function, and receive_done(client,
false)?
+ * convenience/performance
+ * comment on client_persist
\ No newline at end of file
Modified: gnunet-java/src/org/gnunet/construct/Construct.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/Construct.java 2012-04-19 08:53:46 UTC
(rev 21019)
+++ gnunet-java/src/org/gnunet/construct/Construct.java 2012-04-19 09:54:09 UTC
(rev 21020)
@@ -22,7 +22,23 @@
private static HashMap<Class<? extends Message>, Parser> parserCache = new
HashMap<Class<? extends Message>, Parser>(100);
+
/**
+ * Information the root of the parser, if the target is nested in another
message.
+ */
+ public static class ParserContext {
+
+ }
+
+ public static class RootedParser {
+ // null if target itself is root
+ ParserContext root;
+ Parser parser;
+ }
+
+
+
+ /**
* Given a byte buffer with a message, parse it into an object of type c.
The
* fields of the class are expected to be annotated with annotations from
* the construct package.
@@ -77,8 +93,6 @@
SequenceParser parser = new SequenceParser();
-
-
if (!Modifier.isPublic(c.getModifiers())) {
throw new InterfaceViolationException(String.format("Construct
Message %s not declared public", c));
}
@@ -111,7 +125,6 @@
return getParser(cls, pg);
}
-
// has to be public, accessed by Runabout (todo: can we do something about
this scope issue?)
@SuppressWarnings("UnusedDeclaration")
@@ -221,9 +234,6 @@
parser = new IntegerParser(8, IntegerParser.SIGNED, field);
}
- public void visit(Integer i) {
- parser = new IntegerParser(i.byteSize(), i.signed(), field);
- }
public void visit(ZeroTerminatedString zts) {
parser = new StringParser(zts.charset(), zts.optional(), field);
@@ -234,26 +244,34 @@
}
public void visit(NestedMessage n) {
+ if (!Message.class.isAssignableFrom(field.getType())) {
+ throw new AssertionError("@NestedMessage only works on
messages, " + field.getType()
+ + " is not a message (origin: " + c +
")");
+ }
- Class<? extends Message> ct;
- //noinspection unchecked
- ct = (Class<? extends Message>) field
- .getType();
+ Field nestedField = field;
- Field old_f = field;
+ if (n.newFrame()) {
+ ParserGenerator pg = new ParserGenerator();
+ Parser p = getParser((Class<Message>) nestedField.getType(),
pg);
- List<Field> old_path = new ArrayList<Field>(path);
+ parser = new NestedParser(p, pg.frameSizePath, n.optional(),
nestedField, true);
- path.add(field);
+ } else {
+ Field old_f = field;
+ List<Field> old_path = new ArrayList<Field>(path);
+ Class old_c = c;
- Class old_c = c;
+ path.add(field);
- Parser p = getParser(ct, this);
+ Parser p = getParser((Class<Message>) nestedField.getType(),
this);
- path = old_path;
- c = old_c;
- LinkedList<Field> copy = frameSizePath == null ? null : new
LinkedList<Field>(frameSizePath);
- parser = new NestedParser(p, copy, n.optional(), old_f);
+ path = old_path;
+ c = old_c;
+ LinkedList<Field> copy = frameSizePath == null ? null : new
LinkedList<Field>(frameSizePath);
+
+ parser = new NestedParser(p, copy, n.optional(), old_f, false);
+ }
}
public void visit(ByteFill bf) {
Modified: gnunet-java/src/org/gnunet/construct/FrameSize.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/FrameSize.java 2012-04-19 08:53:46 UTC
(rev 21019)
+++ gnunet-java/src/org/gnunet/construct/FrameSize.java 2012-04-19 09:54:09 UTC
(rev 21020)
@@ -1,9 +1,6 @@
package org.gnunet.construct;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
+import java.lang.annotation.*;
/**
* Marker for the field storing the size of the enclosing frame in bytes.
@@ -14,5 +11,4 @@
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface FrameSize {
-
}
Deleted: gnunet-java/src/org/gnunet/construct/Integer.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/Integer.java 2012-04-19 08:53:46 UTC
(rev 21019)
+++ gnunet-java/src/org/gnunet/construct/Integer.java 2012-04-19 09:54:09 UTC
(rev 21020)
@@ -1,22 +0,0 @@
-package org.gnunet.construct;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * An integer with static size. May be signed or unsigned.
- *
- * Allowed target fields: byte, short, int, long, BigInteger
- *
- * @author Florian Dold
- *
- */
address@hidden(RetentionPolicy.RUNTIME)
address@hidden(ElementType.FIELD)
-public @interface Integer {
- int byteSize();
-
- boolean signed();
-}
Modified: gnunet-java/src/org/gnunet/construct/MessageLoader.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/MessageLoader.java 2012-04-19
08:53:46 UTC (rev 21019)
+++ gnunet-java/src/org/gnunet/construct/MessageLoader.java 2012-04-19
09:54:09 UTC (rev 21020)
@@ -41,7 +41,6 @@
private static final Logger logger = LoggerFactory
.getLogger(MessageLoader.class);
-
/**
* Maps a class and tag to the corresponding union case.
* <p/>
@@ -58,7 +57,6 @@
static {
- // XXX: should MessageLoader be a singleton?
loadMessageMap();
}
Modified: gnunet-java/src/org/gnunet/construct/MsgMap.txt
===================================================================
--- gnunet-java/src/org/gnunet/construct/MsgMap.txt 2012-04-19 08:53:46 UTC
(rev 21019)
+++ gnunet-java/src/org/gnunet/construct/MsgMap.txt 2012-04-19 09:54:09 UTC
(rev 21020)
@@ -19,8 +19,9 @@
org.gnunet.util.GnunetMessage$Body|76=org.gnunet.core.Core$SendMessage
org.gnunet.util.GnunetMessage$Body|74=org.gnunet.core.Core$SendMessageRequest
org.gnunet.util.GnunetMessage$Body|75=org.gnunet.core.Core$SendMessageReady
+org.gnunet.util.GnunetMessage$Body|42001=org.gnunet.core.Core$MyMessage
org.gnunet.util.GnunetMessage$Body|323=org.gnunet.nse.NetworkSizeEstimation$UpdateMessage
org.gnunet.util.GnunetMessage$Body|321=org.gnunet.nse.NetworkSizeEstimation$StartMessage
org.gnunet.util.GnunetMessage$Body|144=org.gnunet.dht.DistributedHashTable$DHTClientGetStopMessage
org.gnunet.util.GnunetMessage$Body|145=org.gnunet.dht.DistributedHashTable$DHTClientResultMessage
-# generated 2012/04/11 15:38:56
+# generated 2012/04/18 23:26:44
Modified: gnunet-java/src/org/gnunet/construct/ReflectUtil.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/ReflectUtil.java 2012-04-19
08:53:46 UTC (rev 21019)
+++ gnunet-java/src/org/gnunet/construct/ReflectUtil.java 2012-04-19
09:54:09 UTC (rev 21020)
@@ -168,7 +168,9 @@
return f.get(obj);
} catch (IllegalAccessException e) {
throw new InterfaceViolationException(
- String.format("Cannot access private field %s in class
%s", f, obj.getClass()));
+ String.format("Cannot access private field '%s' in class
%s", f, obj.getClass()));
+ } catch (IllegalArgumentException e) {
+ throw new AssertionError("Cannot access field '" + f.getName() +
"' in class " + obj.getClass());
}
}
Modified: gnunet-java/src/org/gnunet/construct/parsers/NestedParser.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/parsers/NestedParser.java
2012-04-19 08:53:46 UTC (rev 21019)
+++ gnunet-java/src/org/gnunet/construct/parsers/NestedParser.java
2012-04-19 09:54:09 UTC (rev 21020)
@@ -14,14 +14,16 @@
private final Parser nestedParser;
private List<Field> frameSizePath;
+ private boolean newFrame;
boolean optional;
- public NestedParser(final Parser p, List<Field> frameSizePath, boolean
optional, final Field f) {
+ public NestedParser(final Parser p, List<Field> frameSizePath, boolean
optional, final Field f, boolean newFrame) {
targetField = f;
this.optional = optional;
this.nestedParser = p;
this.frameSizePath = frameSizePath;
+ this.newFrame = newFrame;
}
@Override
@@ -37,6 +39,10 @@
@Override
public int parse(final ByteBuffer srcBuf, int frameOffset, Message
frameObj, final Message dstObj) {
+ if (newFrame) {
+ frameObj = dstObj;
+ frameOffset = 0;
+ }
if (optional) {
@@ -66,9 +72,13 @@
@Override
public void patch(Message m, int frameSize, Message frameObj) {
- // todo: nested/opaque frames
- nestedParser.patch(m, frameSize, frameObj);
+ Message nestedMessage = (Message) ReflectUtil.justGet(m, targetField);
+ if (newFrame) {
+ nestedParser.patch(nestedMessage,
nestedParser.getSize(nestedMessage), m);
+ } else {
+ nestedParser.patch(nestedMessage, frameSize, frameObj);
+ }
}
}
Modified: gnunet-java/src/org/gnunet/core/Core.java
===================================================================
--- gnunet-java/src/org/gnunet/core/Core.java 2012-04-19 08:53:46 UTC (rev
21019)
+++ gnunet-java/src/org/gnunet/core/Core.java 2012-04-19 09:54:09 UTC (rev
21020)
@@ -2,14 +2,56 @@
import org.gnunet.construct.*;
import org.gnunet.util.*;
+import org.grothoff.Runabout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
public class Core {
private static final Logger logger = LoggerFactory
.getLogger(Core.class);
+ private final Client client;
+ /**
+ * set to null once connected for the first time
+ */
+ private InitCallback init;
+ private boolean currentlyDown = true;
+
+ private HeaderNotify notifyOutboundHeaders;
+ private HeaderNotify notifyInboundHeaders;
+
+ private MessageNotify notifyOutboundMessages;
+ private MessageNotify notifyInboundMessages;
+
+ private int initOptions;
+
+ private ConnectHandler connectHandler;
+ private DisconnectHandler disconnectHandler;
+
+ private PeerIdentity myIdentity;
+
+ private final CoreReceiver coreReceiver = new CoreReceiver();
+
+ // per default we are interested in all messages => specific interest set
is empty
+ private int[] interested = new int[0];
+ private Runabout messageHandler;
+
+ HashMap<PeerIdentity, ConnectedPeerInfo> connectedPeers = new
HashMap<PeerIdentity, ConnectedPeerInfo>(10);
+
+ LinkedList<TransmitHandle> pending = new LinkedList<TransmitHandle>();
+
+ LinkedList<SendMessage> approvedSendMessages = new
LinkedList<SendMessage>();
+
+
+ private Client.TransmitHandle currentClientTransmitHandle;
+
enum CoreOption {
NOTHING(0),
SEND_FULL_INBOUND(8),
@@ -60,12 +102,17 @@
@NestedMessage
public PeerIdentity peer;
- /**
- * First of the ATS information blocks (we must have at least
- * one due to the 0-termination requirement).
- */
@NestedMessage
- public ATSInformation atsInformation;
+ public ATSInformation atsFirst;
+
+ @VariableSizeArray(lengthField = "ats_count")
+ public ATSInformation[] atsRest;
+
+ @NestedMessage(newFrame = true)
+ public GnunetMessage.Header payloadHeader;
+
+ @Union(tag = "payloadHeader.messageType", optional = true)
+ public GnunetMessage.Body payloadBody;
}
@@ -84,14 +131,17 @@
@NestedMessage
public PeerIdentity peer;
- /**
- * First of the ATS information blocks (we must have at least
- * one due to the 0-termination requirement).
- */
- //@NestedMessage
- //public ATSInformation atsInformation;
- @ByteFill
- public byte[] ats;
+ @NestedMessage
+ public ATSInformation atsFirst;
+
+ @VariableSizeArray(lengthField = "ats_count")
+ public ATSInformation[] atsRest;
+
+ @NestedMessage(newFrame = true)
+ public GnunetMessage.Header payloadHeader;
+
+ @Union(tag = "payloadHeader.messageType", optional = true)
+ public GnunetMessage.Body payloadBody;
}
@@ -205,10 +255,10 @@
public int size;
/**
- * smr_id from the request.
+ * smrId from the request.
*/
@UInt16
- public int smr_id;
+ public int smrId;
/**
* Identity of the intended target.
@@ -234,7 +284,7 @@
* message transmitted?
*/
@NestedMessage
- public AbsoluteTime deadline;
+ public AbsoluteTimeMessage deadline;
/**
* Identity of the intended receiver.
@@ -254,51 +304,32 @@
@UInt64
public int reserved;
- }
+ @NestedMessage(newFrame = true)
+ public GnunetMessage payloadMessage;
- public interface MessageHandler {
-
}
- public interface Continuation {
-
- }
-
+ /**
+ * Called once the handshake with core was successful.
+ */
public interface InitCallback {
void onInit(PeerIdentity myIdentity);
}
+ /**
+ * Called when a new peer (with a compatible set of messages) connects to
core
+ */
public interface ConnectHandler {
void onConnect(PeerIdentity peerIdentity);
}
+ /**
+ * Called when a peer disconnects from the core.
+ */
public interface DisconnectHandler {
void onDisconnect(PeerIdentity peerIdentity);
-
}
- private final Client client;
- /**
- * set to null once connected for the first time
- */
- private InitCallback init;
-
- private HeaderNotify notifyOutboundHeaders;
- private HeaderNotify notifyInboundHeaders;
-
- private MessageNotify notifyOutboundMessages;
- private MessageNotify notifyInboundMessages;
-
- private int initOptions;
-
- private ConnectHandler connectHandler;
- private DisconnectHandler disconnectHandler;
-
- private PeerIdentity myIdentity;
-
- private final CoreReceiver coreReceiver = new CoreReceiver();
-
-
public interface HeaderNotify {
void notify(GnunetMessage.Header header);
}
@@ -307,131 +338,127 @@
void notify(GnunetMessage messageBody);
}
- public static class Builder {
- /**
- * Configuration for a core connection.
- *
- * @param cfg Mandatory configuration
- */
- final private Configuration cfg;
- private InitCallback init;
- private HeaderNotify notifyOutboundHeaders;
- private HeaderNotify notifyInboundHeaders;
- private MessageNotify notifyOutboundMessages;
- private MessageNotify notifyInboundMessages;
- private ConnectHandler connectHandler;
- private DisconnectHandler disconnectHandler;
- private PeerIdentity myIdentity;
+ /**
+ * Represents a request for transmission.
+ */
+ public class TransmitHandle {
+ private PeerIdentity peerIdentity;
+ private MessageTransmitter transmitter;
+ private int smrId;
+ private Cancelable timeoutHandle;
+ private int size;
+ public AbsoluteTime deadline;
+ }
- public Builder(Configuration cfg) {
- this.cfg = cfg;
- }
+ private class ConnectedPeerInfo {
+ public Map<Integer, TransmitHandle> requestsToPeer = new
HashMap<Integer, TransmitHandle>(1);
+ public int nextSmrId;
+ }
- public Builder withInit(InitCallback init) {
- this.init = init;
- return this;
- }
-
- public Builder addHandler(MessageHandler handler) {
- return this;
- }
-
- public Builder withNotifyOutboundFull(MessageNotify h) {
- return this;
- }
-
- public Builder withNotifyOutboundHeaders(HeaderNotify h) {
- notifyOutboundHeaders = h;
- return this;
- }
-
- public Builder withNotifyConnect(ConnectHandler h) {
- connectHandler = h;
- return this;
- }
-
- public Builder withNotifyDisconnect(DisconnectHandler h) {
- disconnectHandler = h;
- return this;
- }
-
- public Core build() {
- return new Core(this);
- }
-
+ private Core(Configuration cfg) {
+ client = new Client("core", cfg);
}
- private Core(Builder b) {
- client = new Client("core", b.cfg);
- this.notifyOutboundHeaders = b.notifyOutboundHeaders;
-
- initOptions = 0;
-
- if (notifyOutboundHeaders != null) {
- //initOptions |= CoreOption.SEND_HDR_OUTBOUND.val;
- }
- this.init = b.init;
-
-
- // XXX: only temporary for debugging
- initOptions |= CoreOption.SEND_FULL_INBOUND.val;
- initOptions |= CoreOption.SEND_HDR_OUTBOUND.val;
-
- this.connectHandler = b.connectHandler;
- this.disconnectHandler = b.disconnectHandler;
-
+ public void init(InitCallback initCallback) {
+ this.init = initCallback;
reconnect();
}
void reconnect() {
InitMessage initMessage = new InitMessage();
- initMessage.interested = new int[0];
- initMessage.options = initOptions;
+ initMessage.interested = interested;
+ initMessage.options = 0;
+ for (int i : interested) {
+ logger.info("we are interested in " + i);
+ }
+
client.transmitAndGetResponse(initMessage, RelativeTime.FOREVER, true,
new CoreReceiver());
}
public class CoreReceiver extends RunaboutMessageReceiver {
- public void visit(ConnectNotifyMessage m) {
- logger.info("got connectnotify");
- if (connectHandler != null) {
- connectHandler.onConnect(m.peer);
- }
- client.receive(RelativeTime.FOREVER, this);
- }
-
public void visit(InitReplyMessage m) {
- logger.info("got initreply");
+ myIdentity = m.myIdentity;
+ connectedPeers.put(myIdentity, new ConnectedPeerInfo());
- myIdentity = m.myIdentity;
if (init != null) {
init.onInit(m.myIdentity);
init = null;
}
+
client.receive(RelativeTime.FOREVER, this);
}
+
+ public void visit(ConnectNotifyMessage m) {
+ if (connectHandler != null) {
+ connectHandler.onConnect(m.peer);
+ }
+
+ client.receive(RelativeTime.FOREVER, this);
+ }
+
public void visit(NotifyInboundTrafficMessage m) {
- logger.info("got NotifyInbount");
+ boolean found = false;
+ for (int i : interested) {
+ if (i == m.payloadHeader.messageType) {
+ found = true;
+ break;
+ }
+ }
+ if (found) {
+ messageHandler.visitAppropriate(m.payloadBody);
+ } else {
+ // do stuff
+ }
client.receive(RelativeTime.FOREVER, this);
}
public void visit(NotifyOutboundTrafficMessage m) {
- logger.info("got NotifyOutbound");
client.receive(RelativeTime.FOREVER, this);
}
public void visit(SendMessageReady m) {
- logger.info("got SendMessageReady");
+ ConnectedPeerInfo targetPeerInfo = connectedPeers.get(m.peer);
- // todo: ...
+ if (targetPeerInfo == null) {
+ logger.error("SendMessage ready received for unconnected peer
" + m.peer);
+ client.receive(RelativeTime.FOREVER, this);
+ return;
+ }
+
+ TransmitHandle transmitHandle =
targetPeerInfo.requestsToPeer.get(m.smrId);
+
+ final SendMessage sendMessage = new SendMessage();
+
+
+ sendMessage.deadline = transmitHandle.deadline.asMessage();
+ sendMessage.cork = 0;
+ sendMessage.peer = transmitHandle.peerIdentity;
+ sendMessage.priority = 0;
+
+ transmitHandle.transmitter.transmit(new Client.MessageSink() {
+ @Override
+ public void send(GnunetMessage.Body m) {
+ if (sendMessage.payloadMessage != null) {
+ throw new AssertionError("can only send one payload
message");
+ }
+ logger.info("sink called");
+ sendMessage.payloadMessage = GnunetMessage.fromBody(m);
+ }
+ });
+
+ approvedSendMessages.add(sendMessage);
+
+ transmitPending();
+
client.receive(RelativeTime.FOREVER, this);
}
@@ -443,44 +470,157 @@
@Override
public void handleError() {
+ throw new AssertionError("unexpected");
}
}
+ /**
+ * Ask the core to call "notify" once it is ready to transmit the
+ * given number of bytes to the specified "target". Must only be
+ * called after a connection to the respective peer has been
+ * established (and the client has been informed about this).
+ *
+ * @param priority how important is the message?
+ * @param maxdelay maximum time until transmitter gets called
+ * @param target the identity of the receiver
+ * @param size the size of the message we want to transmit
+ * @param transmitter called once the core service is ready to send message
+ * @return
+ */
+ public Cancelable notifyTransmitReady(long priority, RelativeTime maxdelay,
+ PeerIdentity targetIdentity, int
size, final MessageTransmitter transmitter) {
+ if (!connectedPeers.containsKey(targetIdentity)) {
+ throw new AssertionError("trying to send message to unconnected
peer");
+ }
- public Cancelable notifyTransmitReady(boolean cork, long priority,
RelativeTime maxdelay,
- PeerIdentity target, int size,
MessageTransmitter transmitter) {
- final SendMessageRequest smr = new SendMessageRequest();
- smr.deadline = maxdelay.toAbsolute().asMessage();
- if (target == null) {
- smr.peer = myIdentity;
- } else {
- smr.peer = target;
+ ConnectedPeerInfo cpi = connectedPeers.get(targetIdentity);
+
+ final TransmitHandle transmitHandle = new TransmitHandle();
+
+ transmitHandle.peerIdentity = targetIdentity;
+ transmitHandle.size = size;
+ transmitHandle.smrId = cpi.nextSmrId;
+ transmitHandle.transmitter = transmitter;
+ cpi.nextSmrId += 1;
+ transmitHandle.deadline = maxdelay.toAbsolute();
+
+ cpi.requestsToPeer.put(transmitHandle.smrId, transmitHandle);
+
+ if (!maxdelay.isForever()) {
+ transmitHandle.timeoutHandle = Scheduler.add(maxdelay, new
Scheduler.Task() {
+ @Override
+ public void run(Scheduler.RunContext ctx) {
+ transmitter.handleError();
+ }
+ });
}
- smr.queueSize = 10;
- smr.smrId = 1;
- smr.size = size;
- client.notifyTransmitReady(maxdelay, false, new MessageTransmitter() {
+ // todo: add so that 'pending' is ordered by maxdelay
+ pending.add(transmitHandle);
+
+ transmitPending();
+
+ return new Cancelable() {
@Override
- public void transmit(Client.MessageSink sink) {
- sink.send(smr);
+ public void cancel() {
+ pending.remove(transmitHandle);
+ if (transmitHandle.timeoutHandle != null) {
+ transmitHandle.timeoutHandle.cancel();
+ }
}
+ };
- @Override
- public void handleError(Client.TransmitError error) {
- throw new RuntimeException("client error");
- }
- });
- return null;
+
}
+ /**
+ * Send the next message to core.
+ */
+ private void transmitPending() {
+ logger.info("transmit pending called 1");
+ // todo: check if not connected, connect then first!
+ if (currentClientTransmitHandle != null) {
+ logger.info("returning");
+ return;
+ }
- public Cancelable transmitMessage(GnunetMessage.Body messageBody,
RelativeTime timeout,
- PeerIdentity target, Continuation cont) {
- return null;
+ logger.info("transmit pending called 2");
+ final TransmitHandle transmitHandle = pending.poll();
+ if (transmitHandle != null) {
+ logger.info("calling NTR for transmitHandle");
+ currentClientTransmitHandle =
client.notifyTransmitReady(RelativeTime.FOREVER, false,
+ new MessageTransmitter() {
+ @Override
+ public void transmit(Client.MessageSink sink) {
+ logger.info("transmitHandle NTR done");
+ currentClientTransmitHandle = null;
+
+ SendMessageRequest request = new
SendMessageRequest();
+ request.peer = transmitHandle.peerIdentity;
+ request.priority = 0;
+ request.queueSize = 5;
+ request.smrId = transmitHandle.smrId;
+ request.size = transmitHandle.size;
+
+ request.deadline =
transmitHandle.deadline.asMessage();
+
+ sink.send(request);
+
+ transmitPending();
+ }
+
+ @Override
+ public void handleError() {
+ logger.error("error in SendMessageTransmitter");
+ transmitPending();
+ }
+ });
+
+ logger.info("left at transmithandle" +
(currentClientTransmitHandle == null));
+ return;
+ }
+
+ final SendMessage sm = approvedSendMessages.poll();
+ if (sm != null) {
+ logger.info("calling NTR for apprvedSendMessages");
+ currentClientTransmitHandle =
client.notifyTransmitReady(RelativeTime.FOREVER, false,
+ new MessageTransmitter() {
+ @Override
+ public void transmit(Client.MessageSink sink) {
+ currentClientTransmitHandle = null;
+ sink.send(sm);
+ logger.info("transmitted SendMessage!");
+ transmitPending();
+ }
+
+ @Override
+ public void handleError() {
+ throw new AssertionError("unexpected");
+ }
+ });
+ return;
+ }
+ logger.info("transmitPending did nothing");
}
+
+ public void observeOutboundHeaders(HeaderNotify h) {
+ }
+
+ public void observeConnect(ConnectHandler connectHandler) {
+ this.connectHandler = connectHandler;
+ }
+
+
+ public void handleMessages(Runabout runabout) {
+ if (messageHandler != null) {
+ throw new AssertionError("Core can have only on message handler");
+ }
+ messageHandler = runabout;
+ interested = getRunaboutMessageTypes(runabout);
+ }
+
/**
* Disconnect from the core service. This function can only
* be called *after* all pending notifyTransmitReady
@@ -490,48 +630,93 @@
}
+ private static ArrayList<Class> getRunaboutVisitees(Runabout r) {
+ Class rc = r.getClass();
+ ArrayList<Class> ret = new ArrayList<Class>(5);
+ for (Method m : rc.getMethods()) {
+ if (!(m.getName().equals("visit") && m.getParameterTypes().length
== 1)) {
+ continue;
+ }
+ ret.add(m.getParameterTypes()[0]);
+ }
+ return ret;
+ }
+
+ private static int[] getRunaboutMessageTypes(Runabout r) {
+ ArrayList<Class> visitees = getRunaboutVisitees(r);
+ int[] msgtypes = new int[visitees.size()];
+ for (int i = 0; i < visitees.size(); ++i) {
+ msgtypes[i] = MessageLoader.getUnionTag(GnunetMessage.Body.class,
visitees.get(i));
+ }
+ return msgtypes;
+ }
+
+
+ public static class BlaRunabout extends Runabout {
+ public void visit(SendMessage x) {
+
+ }
+
+ public void visit(SendMessageReady x) {
+
+ }
+ }
+
+ @UnionCase(42001)
+ public static class MyMessage implements GnunetMessage.Body {
+ @UInt32
+ public int number;
+ @ZeroTerminatedString
+ public String message;
+ }
+
+ public static class MyMessageHandler extends Runabout {
+ public void visit(MyMessage msg) {
+ System.out.println("got my message!!");
+ System.out.println(msg.message);
+ System.out.println(msg.number);
+ }
+ }
+
public static void main(String[] args) {
new Program(args) {
- Core core;
-
public void run() {
- Core.Builder b = new Core.Builder(cfg);
+ final Core core = new Core(this.getConfiguration());
- b.withInit(new InitCallback() {
+ core.observeConnect(new ConnectHandler() {
@Override
- public void onInit(PeerIdentity myIdentity) {
- System.out.print("Hello, I'm ");
-
System.out.println(Strings.dataToString(myIdentity.data));
-
- core.notifyTransmitReady(true, 0, RelativeTime.SECOND,
null, 4, new MessageTransmitter() {
- @Override
- public void transmit(Client.MessageSink sink) {
- System.out.println("ready to transmit");
- }
-
- @Override
- public void handleError(Client.TransmitError
error) {
- }
- });
+ public void onConnect(PeerIdentity peerIdentity) {
+
System.out.println(Strings.dataToString(peerIdentity.data));
}
});
- b.withNotifyOutboundHeaders(new HeaderNotify() {
+
+ core.handleMessages(new MyMessageHandler());
+
+
+ final MyMessage myMessage = new MyMessage();
+ myMessage.message = "Hello, gnunet";
+ myMessage.number = 42;
+
+ core.init(new InitCallback() {
@Override
- public void notify(GnunetMessage.Header header) {
- System.out.println("my peer sent message of type " +
header.messageType);
- }
- });
+ public void onInit(PeerIdentity myIdentity) {
+ System.out.print("Hello, I'm ");
+
System.out.println(Strings.dataToString(myIdentity.data));
+ core.notifyTransmitReady(0, RelativeTime.SECOND,
myIdentity, Construct.getSize(myMessage),
+ new MessageTransmitter() {
+ @Override
+ public void transmit(Client.MessageSink
sink) {
+ sink.send(myMessage);
+ }
- b.withNotifyConnect(new ConnectHandler() {
- @Override
- public void onConnect(PeerIdentity peerIdentity) {
- System.out.println("connected to " +
Strings.dataToString(peerIdentity.data));
+ @Override
+ public void handleError() {
+ }
+ });
}
});
-
- core = b.build();
}
}.start();
}
Modified: gnunet-java/src/org/gnunet/dht/DistributedHashTable.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/DistributedHashTable.java 2012-04-19
08:53:46 UTC (rev 21019)
+++ gnunet-java/src/org/gnunet/dht/DistributedHashTable.java 2012-04-19
09:54:09 UTC (rev 21020)
@@ -239,7 +239,7 @@
}
@Override
- public void handleError(Client.TransmitError error) {
+ public void handleError() {
cont.call(PutResult.TIMEOUT);
}
});
@@ -280,7 +280,7 @@
}
@Override
- public void handleError(Client.TransmitError error) {
+ public void handleError() {
}
});
}
@@ -365,7 +365,7 @@
}
@Override
- public void handleError(Client.TransmitError error) {
+ public void handleError() {
if (!request.canceled) {
request.cancel();
}
@@ -375,7 +375,7 @@
}
});
- Scheduler.add(new Scheduler.Task() {
+ Scheduler.add(timeout, new Scheduler.Task() {
@Override
public void run(Scheduler.RunContext ctx) {
logger.debug("timeout task has been run");
@@ -384,16 +384,19 @@
request.cancel();
}
}
- }, timeout);
+ });
return request;
}
+ public Cancelable startMonitor() {
+ return null;
+ }
+
public void destroy() {
client.disconnect();
}
-
public static void main(String[] args) {
new Program(args) {
@Option(action = OptionAction.SET,
Modified: gnunet-java/src/org/gnunet/nse/NetworkSizeEstimation.java
===================================================================
--- gnunet-java/src/org/gnunet/nse/NetworkSizeEstimation.java 2012-04-19
08:53:46 UTC (rev 21019)
+++ gnunet-java/src/org/gnunet/nse/NetworkSizeEstimation.java 2012-04-19
09:54:09 UTC (rev 21020)
@@ -86,7 +86,7 @@
}
@Override
- public void handleError(Client.TransmitError error) {
+ public void handleError() {
throw new RuntimeException("unexpected transmitt error");
}
}
Modified: gnunet-java/src/org/gnunet/statistics/Statistics.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/Statistics.java 2012-04-19
08:53:46 UTC (rev 21019)
+++ gnunet-java/src/org/gnunet/statistics/Statistics.java 2012-04-19
09:54:09 UTC (rev 21020)
@@ -113,7 +113,7 @@
}
@Override
- public void handleError(Client.TransmitError error) {
+ public void handleError() {
throw new RuntimeException("unexpected transmit error");
}
}
@@ -154,7 +154,7 @@
}
@Override
- public void handleError(Client.TransmitError error) {
+ public void handleError() {
throw new RuntimeException("unexpected transmit error");
}
});
Modified: gnunet-java/src/org/gnunet/util/Client.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Client.java 2012-04-19 08:53:46 UTC (rev
21019)
+++ gnunet-java/src/org/gnunet/util/Client.java 2012-04-19 09:54:09 UTC (rev
21020)
@@ -264,7 +264,7 @@
b.withTask(new Task() {
@Override
public void run(Scheduler.RunContext ctx) {
- transmitter.handleError(TransmitError.TIMEOUT);
+ transmitter.handleError();
}
});
b.withTimeout(notifyTimeout);
@@ -562,7 +562,7 @@
* if the caller does not care about temporary
connection errors,
* for example because the protocol is stateless
* @param transmitter the MessageTransmitter object to call once the
client is ready to transmit or
- * when the timeout is over.
+ * when the timeout is over. Guaranteed to be called
*after* notifyTransmitReady has returned.
* @return a handle that can be used to cancel the transmit request
*/
public TransmitHandle notifyTransmitReady(RelativeTime timeout,
@@ -574,7 +574,7 @@
// negative timeout
if (timeout.getMilliseconds() <= 0) {
- transmitter.handleError(TransmitError.TIMEOUT);
+ transmitter.handleError();
return new TransmitHandle() {
@Override
public void cancel() {
@@ -591,7 +591,12 @@
if (currentTransmitHelper == null) {
currentTransmitHelper = transmit;
if (chan.isConnected()) {
- currentTransmitHelper.start();
+ Scheduler.add(new Task() {
+ @Override
+ public void run(Scheduler.RunContext ctx) {
+ currentTransmitHelper.start();
+ }
+ });
}
} else {
nextTransmitHelper = transmit;
@@ -620,7 +625,7 @@
}
@Override
- public void handleError(TransmitError error) {
+ public void handleError() {
receiver.handleError();
}
});
Modified: gnunet-java/src/org/gnunet/util/GnunetMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/util/GnunetMessage.java 2012-04-19 08:53:46 UTC
(rev 21019)
+++ gnunet-java/src/org/gnunet/util/GnunetMessage.java 2012-04-19 09:54:09 UTC
(rev 21020)
@@ -24,7 +24,8 @@
GnunetMessage msg = new GnunetMessage();
msg.header = new Header();
msg.header.messageSize = Header.SIZE + Construct.getSize(b);
-
+ msg.header.messageType =
MessageLoader.getUnionTag(GnunetMessage.Body.class, b.getClass());
+ msg.body = b;
return msg;
}
Deleted: gnunet-java/src/org/gnunet/util/IOContinuation.java
===================================================================
--- gnunet-java/src/org/gnunet/util/IOContinuation.java 2012-04-19 08:53:46 UTC
(rev 21019)
+++ gnunet-java/src/org/gnunet/util/IOContinuation.java 2012-04-19 09:54:09 UTC
(rev 21020)
@@ -1,29 +0,0 @@
-/*
- *
- * This file is part of GNUnet.
- * (C) 2011 Christian Grothoff (and other contributing authors)
- *
- * GNUnet is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published
- * by the Free Software Foundation; either version 2, or (at your
- * option) any later version.
- *
- * GNUnet is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with GNUnet; see the file COPYING. If not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- *
- */
-
-package org.gnunet.util;
-
-public interface IOContinuation {
- void handleSuccess();
- void handleTimeout();
-
-}
Modified: gnunet-java/src/org/gnunet/util/MessageTransmitter.java
===================================================================
--- gnunet-java/src/org/gnunet/util/MessageTransmitter.java 2012-04-19
08:53:46 UTC (rev 21019)
+++ gnunet-java/src/org/gnunet/util/MessageTransmitter.java 2012-04-19
09:54:09 UTC (rev 21020)
@@ -9,5 +9,5 @@
*/
public void transmit(Client.MessageSink sink);
- void handleError(Client.TransmitError error);
+ void handleError();
}
Modified: gnunet-java/src/org/gnunet/util/PeerIdentity.java
===================================================================
--- gnunet-java/src/org/gnunet/util/PeerIdentity.java 2012-04-19 08:53:46 UTC
(rev 21019)
+++ gnunet-java/src/org/gnunet/util/PeerIdentity.java 2012-04-19 09:54:09 UTC
(rev 21020)
@@ -4,6 +4,8 @@
import org.gnunet.construct.FixedSizeByteArray;
import org.gnunet.construct.Message;
+import java.util.Arrays;
+
public class PeerIdentity implements Message {
@FixedSizeByteArray(length = 64)
@@ -19,4 +21,18 @@
}
return hex.toString();
}
+
+ public String toString() {
+ return Strings.dataToString(data);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return Arrays.equals(((PeerIdentity) obj).data, this.data);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(data);
+ }
}
Modified: gnunet-java/src/org/gnunet/util/Program.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Program.java 2012-04-19 08:53:46 UTC
(rev 21019)
+++ gnunet-java/src/org/gnunet/util/Program.java 2012-04-19 09:54:09 UTC
(rev 21020)
@@ -150,5 +150,9 @@
* Override to implement the behavior of the Program.
*/
public abstract void run();
+
+ public final Configuration getConfiguration() {
+ return cfg;
+ }
}
\ No newline at end of file
Modified: gnunet-java/src/org/gnunet/util/Resolver.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Resolver.java 2012-04-19 08:53:46 UTC
(rev 21019)
+++ gnunet-java/src/org/gnunet/util/Resolver.java 2012-04-19 09:54:09 UTC
(rev 21020)
@@ -362,7 +362,7 @@
}
@Override
- public void handleError(Client.TransmitError error) {
+ public void handleError() {
throw new RuntimeException("unexpected transmit
error");
}
});
Modified: gnunet-java/src/org/gnunet/util/Scheduler.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Scheduler.java 2012-04-19 08:53:46 UTC
(rev 21019)
+++ gnunet-java/src/org/gnunet/util/Scheduler.java 2012-04-19 09:54:09 UTC
(rev 21020)
@@ -46,21 +46,19 @@
}
public enum Reason {
- STARTUP, SHUTDOWN, TIMEOUT, READ_READY, WRITE_READY, CONNECT_READY,
PREREQ_DONE
+ STARTUP, SHUTDOWN, TIMEOUT, READ_READY, WRITE_READY, ACCEPT_READY,
CONNECT_READY
}
-
/**
* The context of a task that is ready to run.
*/
public static class RunContext {
Set<Reason> reasons = EnumSet.noneOf(Reason.class);
- Set<Channel> readableSet = null;
- Set<Channel> writeableSet = null;
- Set<Channel> connectedSet = null;
+ Channel eventChannel;
+
public RunContext() {
+ }
- }
public RunContext(Set<Reason> reasons) {
this.reasons = reasons;
}
@@ -80,6 +78,7 @@
Set<TaskIdentifier> read = new TreeSet<TaskIdentifier>();
Set<TaskIdentifier> write = new TreeSet<TaskIdentifier>();
Set<TaskIdentifier> connect = new TreeSet<TaskIdentifier>();
+ Set<TaskIdentifier> accept = new TreeSet<TaskIdentifier>();
}
/**
@@ -87,21 +86,19 @@
*/
public static class TaskIdentifier implements Comparable<TaskIdentifier>,
Cancelable {
private final Task task;
- private TaskIdentifier prereq;
private RunContext ctx = new RunContext();
private boolean lifeness;
private final Priority priority;
private final AbsoluteTime deadline;
- private Set<SelectableChannel> rs=null, ws=null, cs=null;
+ private Set<SelectableChannel> rs = null, ws = null, cs = null, as =
null;
TaskIdentifier(Task t, Priority priority,
- boolean liveness, TaskIdentifier prereq, RelativeTime
timeout,
+ boolean liveness, RelativeTime timeout,
Set<SelectableChannel> rs, Set<SelectableChannel> ws,
Set<SelectableChannel> cs) {
this.task = t;
this.priority = (activeTask == null) ? Priority.DEFAULT :
activeTask.priority;
this.lifeness = liveness;
- this.prereq = prereq;
if (timeout.getMilliseconds() < 0) {
throw new InterfaceViolationException("timeout must be (>=0)");
@@ -128,9 +125,16 @@
registerSelect(sc, SelectionKey.OP_CONNECT);
}
}
+ if (as != null) {
+ for (SelectableChannel sc : as) {
+ logger.debug("registering for OP_CONNECT");
+ registerSelect(sc, SelectionKey.OP_ACCEPT);
+ }
+ }
this.ws = new TreeSet<SelectableChannel>(ws == null ?
Collections.EMPTY_SET : ws);
this.rs = new TreeSet<SelectableChannel>(rs == null ?
Collections.EMPTY_SET : rs);
this.cs = new TreeSet<SelectableChannel>(cs == null ?
Collections.EMPTY_SET : cs);
+ this.as = new TreeSet<SelectableChannel>(as == null ?
Collections.EMPTY_SET : as);
boolean selectEmpty = (ws == null || ws.isEmpty()) && (rs == null
|| rs.isEmpty())
&& (cs == null || cs.isEmpty());
@@ -209,6 +213,9 @@
if ((op & SelectionKey.OP_CONNECT) != 0) {
subscribers.connect.add(this);
}
+ if ((op & SelectionKey.OP_ACCEPT) != 0) {
+ subscribers.accept.add(this);
+ }
}
private void deregisterOne(SelectableChannel sc, int op) {
@@ -224,8 +231,11 @@
subscribers.write.remove(this);
}
if ((op & SelectionKey.OP_CONNECT) != 0) {
- subscribers.write.remove(this);
+ subscribers.connect.remove(this);
}
+ if ((op & SelectionKey.OP_ACCEPT) != 0) {
+ subscribers.accept.remove(this);
+ }
if (subscribers.write.isEmpty()) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
@@ -235,6 +245,9 @@
if (subscribers.connect.isEmpty()) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
}
+ if (subscribers.accept.isEmpty()) {
+ key.interestOps(key.interestOps() & ~SelectionKey.OP_ACCEPT);
+ }
}
@@ -254,28 +267,27 @@
deregisterOne(sc, SelectionKey.OP_CONNECT);
}
}
+ if (this.as != null) {
+ for (SelectableChannel sc : this.as) {
+ deregisterOne(sc, SelectionKey.OP_ACCEPT);
+ }
+ }
}
}
public static class TaskBuilder {
private Task task = null;
- private TaskIdentifier prereq = null;
private boolean lifeness = true;
private Priority prio = null;
private RelativeTime timeout = RelativeTime.ZERO;
- private Set<SelectableChannel> rs=null, ws=null, cs=null;
+ private Set<SelectableChannel> rs = null, ws = null, cs = null, as =
null;
public TaskBuilder withLifeness(boolean lifeness) {
this.lifeness = lifeness;
return this;
}
- public TaskBuilder withPrereq(TaskIdentifier prereq) {
- this.prereq = prereq;
- return this;
- }
-
public TaskBuilder withPriority(Priority prio) {
this.prio = prio;
return this;
@@ -317,13 +329,18 @@
return this;
}
+ public TaskBuilder withSelectAccept(SelectableChannel c) {
+ this.as = Collections.singleton(c);
+ return this;
+ }
+
public TaskBuilder withSelectConnectSet(Set<SelectableChannel> cs) {
this.cs = cs;
return this;
}
private TaskIdentifier build() {
- return new TaskIdentifier(task, prio, lifeness, prereq, timeout,
rs, ws, cs);
+ return new TaskIdentifier(task, prio, lifeness, timeout, rs, ws,
cs);
}
}
@@ -338,7 +355,8 @@
private static int readyCount = 0;
// for every priority, there is a list of tasks that is definitely ready
to run
- final private static ArrayList<LinkedList<TaskIdentifier>> ready = new
ArrayList<LinkedList<TaskIdentifier>>(Priority.size);
+ final private static ArrayList<LinkedList<TaskIdentifier>> ready = new
ArrayList<LinkedList<TaskIdentifier>>
+ (Priority.size);
static {
for (int i = 0; i < Priority.size; ++i) {
@@ -363,12 +381,6 @@
return (activeTask == null) || activeTask.lifeness;
}
- public static TaskIdentifier addAfter(final TaskIdentifier prereq,
- final Task t) {
- return addSelect(Priority.KEEP, prereq, RelativeTime.ZERO, null, null,
- t);
- }
-
/**
* Run the task regardless of any prerequisites, before any other task of
* the same priority.
@@ -395,13 +407,13 @@
* started!
*/
public static TaskIdentifier add(Task task) {
- return addSelect(Priority.KEEP, null, RelativeTime.ZERO, null, null,
+ return addSelect(Priority.KEEP, RelativeTime.ZERO, null, null,
task);
}
- public static TaskIdentifier add(Task task, RelativeTime delay) {
- return addSelect(Priority.KEEP, null, delay, null, null, task);
+ public static TaskIdentifier add(RelativeTime delay, Task task) {
+ return addSelect(Priority.KEEP, delay, null, null, task);
}
@@ -437,30 +449,29 @@
* @return unique task identifier for the job
* only valid until "task" is started!
*/
- public static TaskIdentifier addSelect(Priority p,
- TaskIdentifier prereq, RelativeTime
delay,
+ public static TaskIdentifier addSelect(Priority p, RelativeTime delay,
Set<SelectableChannel> rs,
Set<SelectableChannel> ws,
Task t) {
- return new TaskIdentifier(t, p, getCurrentLifeness(), prereq, delay,
rs, ws, null);
+ return new TaskIdentifier(t, p, getCurrentLifeness(), delay, rs, ws,
null);
}
public static TaskIdentifier addRead(RelativeTime timeout,
SelectableChannel chan, Task t) {
- return Scheduler.addSelect(Priority.KEEP, null, timeout,
+ return Scheduler.addSelect(Priority.KEEP, timeout,
Collections.singleton(chan), null, t);
}
public static TaskIdentifier addWrite(RelativeTime timeout,
SelectableChannel chan, Task t) {
logger.debug("scheduling write");
- return Scheduler.addSelect(Priority.KEEP, null, timeout, null,
+ return Scheduler.addSelect(Priority.KEEP, timeout, null,
Collections.singleton(chan), t);
}
public static TaskIdentifier addWithPriority(Priority prio,
Task t) {
- return addSelect(prio, null, RelativeTime.ZERO, null, null, t);
+ return addSelect(prio, RelativeTime.ZERO, null, null, t);
}
@@ -534,7 +545,7 @@
private static void handleSelect(RelativeTime timeout) {
try {
- // selector.select(0) would block indefinitely
+ // selector.select(0) would block indefinitely (conterintuitive,
java's fault)
if (timeout.getMilliseconds() == 0) {
selector.selectNow();
} else if (timeout.isForever()) {
@@ -553,7 +564,7 @@
Object obj = sk.attachment();
assert (obj instanceof SubscriberSet);
SubscriberSet ss = (SubscriberSet) obj;
-
+
Channel c = sk.channel();
if (sk.isReadable()) {
@@ -590,6 +601,17 @@
}
}
}
+ if (sk.isAcceptable()) {
+ logger.debug("adding isAcceptable() task");
+ for (TaskIdentifier tt : ss.accept) {
+ executableTasks.add(tt);
+ if (tt.ctx.reasons == null) {
+ tt.ctx.reasons = EnumSet.of(Reason.ACCEPT_READY);
+ } else {
+ tt.ctx.reasons.add(Reason.ACCEPT_READY);
+ }
+ }
+ }
}
for (TaskIdentifier tt : executableTasks) {
// tasks must do this themselve to cancel subscriptions to other
channels
Modified: gnunet-java/src/org/gnunet/util/Server.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Server.java 2012-04-19 08:53:46 UTC (rev
21019)
+++ gnunet-java/src/org/gnunet/util/Server.java 2012-04-19 09:54:09 UTC (rev
21020)
@@ -6,20 +6,57 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
public class Server {
+
+
+ private RelativeTime idleTimeout;
+ private boolean requireFound;
+ List<ServerSocketChannel> listenSockets;
+ List<ClientHandle> clients;
+
+
public class ClientHandle {
- public void notifyTransmitReady() {
+ private SocketChannel sock;
+
+ private int referenceCount = 0;
+
+ public ClientHandle(SocketChannel accept) {
+
}
- public void persist() {
+ /**
+ * Notify us when the server has enough space to transmit
+ * a message of the given size to the given client.
+ *
+ * @param client client to transmit message to
+ * @param size requested amount of buffer space
+ * @param timeout after how long should we give up (and call
+ * notify with buf NULL and size 0)?
+ * @param callback function to call when space is available
+ * @param callback_cls closure for callback
+ * @return non-NULL if the notify callback was queued; can be used
+ * to cancel the request using
+ * GNUNET_CONNECTION_notify_transmit_ready_cancel.
+ * NULL if we are already going to notify someone else (busy)
+ */
+ public void notifyTransmitReady(int size, RelativeTime timeout,
MessageTransmitter transmitter) {
+
}
- public void receiveDone() {
+ /**
+ * Resume receiving from this client, we are done processing the
+ * current request. This function must be called from within each
+ * GNUNET_SERVER_MessageCallback (or its respective continuations).
+ *
+ * @param keepClient false if connection to the client should be closed
+ */
+ public void receiveDone(boolean keepClient) {
}
@@ -31,6 +68,10 @@
}
+ public void disconnect() {
+
+ }
+
public void disableReceiveDoneWarning() {
}
@@ -38,51 +79,80 @@
public void injectMessage(/*msg*/) {
}
-
}
- abstract class MessageHandler {
-
- }
-
abstract class MessageRunabout extends Runabout {
+ private ClientHandle currentSender;
+ /**
+ * Allows implementors of MessageRunabout to get the Client that sent
the message
+ * currently visited.
+ *
+ * @return handle of the client whose message is currently being
visited
+ */
public ClientHandle getSender() {
return null;
}
- private void setSender() {
-
+ private void setSender(ClientHandle clientHandle) {
+ currentSender = clientHandle;
}
+ }
+ /**
+ * @param srv
+ */
+ private void doAccept(final ServerSocketChannel srv) {
+ Scheduler.TaskBuilder b = new Scheduler.TaskBuilder();
+ b.withTask(new Scheduler.Task() {
+ @Override
+ public void run(Scheduler.RunContext ctx) {
+ try {
+ ClientHandle clientHandle = new ClientHandle(srv.accept());
+ clients.add(clientHandle);
+
+ } catch (IOException e) {
+ throw new RuntimeException("accept failed", e);
+ }
+ doAccept(srv);
+ }
+ });
+ b.withSelectAccept(srv);
+ Scheduler.add(b);
}
- List<ServerSocketChannel> sockets = new ArrayList<ServerSocketChannel>(2);
- List<ClientHandle> clients;
-
+ /**
+ * Create a server listening on all specified addresses.
+ *
+ * @param addresses addresses to bind on
+ * @param idleTimeout time after a client will be disconnected if idle
+ * @param requireFound allow unknown messages to be received without
disconnecting the client in response
+ */
public Server(SocketAddress[] addresses, RelativeTime idleTimeout, boolean
requireFound) {
+ this.idleTimeout = idleTimeout;
+ this.requireFound = requireFound;
+ listenSockets = new ArrayList<ServerSocketChannel>(addresses.length);
try {
for (SocketAddress addr : addresses) {
ServerSocketChannel socket = ServerSocketChannel.open();
+ socket.socket().bind(addr);
+ listenSockets.add(socket);
+ doAccept(socket);
}
-
} catch (IOException e) {
- e.printStackTrace();
+ throw new RuntimeException("could not bind");
}
-
}
+ public void addHandler(MessageRunabout cb) {
- // todo: do overloads / parameters wrt runabout
- public void addHandler(MessageHandler cb) {
-
}
- public void addHandler(MessageRunabout cb) {
-
+ public Cancelable notifyDisconnect(/* disco handler */) {
+ return null;
}
Modified: gnunet-java/src/org/gnunet/util/Strings.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Strings.java 2012-04-19 08:53:46 UTC
(rev 21019)
+++ gnunet-java/src/org/gnunet/util/Strings.java 2012-04-19 09:54:09 UTC
(rev 21020)
@@ -1,17 +1,16 @@
package org.gnunet.util;
public class Strings {
+ private static final String encTable = "0123456789ABCDEFGHIJKLMNOPQRSTUV";
+
public static String dataToString(byte[] data) {
StringBuilder sb = new StringBuilder();
- final String encTable = "0123456789ABCDEFGHIJKLMNOPQRSTUV";
- long rpos;
- long bits;
- long vbit;
+
+ long rpos = 0;
+ long bits = 0;
+ long vbit = 0;
long size = data.length;
- vbit = 0;
- rpos = 0;
- bits = 0;
while ((rpos < size) || (vbit > 0)) {
if ((rpos < size) && (vbit < 5)) {
byte b = data[(int) rpos++];
@@ -21,7 +20,6 @@
}
if (vbit < 5) {
bits <<= (5 - vbit); /* zero-padding */
- assert (vbit == ((size * 8) % 5));
vbit = 5;
}
sb.append(encTable.charAt((int) (bits >>> (vbit - 5)) & 31));
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r21020 - in gnunet-java: . src/org/gnunet/construct src/org/gnunet/construct/parsers src/org/gnunet/core src/org/gnunet/dht src/org/gnunet/nse src/org/gnunet/statistics src/org/gnunet/util,
gnunet <=