gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r21020 - in gnunet-java: . src/org/gnunet/construct src/org


From: gnunet
Subject: [GNUnet-SVN] r21020 - in gnunet-java: . src/org/gnunet/construct src/org/gnunet/construct/parsers src/org/gnunet/core src/org/gnunet/dht src/org/gnunet/nse src/org/gnunet/statistics src/org/gnunet/util
Date: Thu, 19 Apr 2012 11:54:09 +0200

Author: dold
Date: 2012-04-19 11:54:09 +0200 (Thu, 19 Apr 2012)
New Revision: 21020

Removed:
   gnunet-java/src/org/gnunet/construct/Integer.java
   gnunet-java/src/org/gnunet/util/IOContinuation.java
Modified:
   gnunet-java/.classpath
   gnunet-java/ISSUES
   gnunet-java/src/org/gnunet/construct/Construct.java
   gnunet-java/src/org/gnunet/construct/FrameSize.java
   gnunet-java/src/org/gnunet/construct/MessageLoader.java
   gnunet-java/src/org/gnunet/construct/MsgMap.txt
   gnunet-java/src/org/gnunet/construct/ReflectUtil.java
   gnunet-java/src/org/gnunet/construct/parsers/NestedParser.java
   gnunet-java/src/org/gnunet/core/Core.java
   gnunet-java/src/org/gnunet/dht/DistributedHashTable.java
   gnunet-java/src/org/gnunet/nse/NetworkSizeEstimation.java
   gnunet-java/src/org/gnunet/statistics/Statistics.java
   gnunet-java/src/org/gnunet/util/Client.java
   gnunet-java/src/org/gnunet/util/GnunetMessage.java
   gnunet-java/src/org/gnunet/util/MessageTransmitter.java
   gnunet-java/src/org/gnunet/util/PeerIdentity.java
   gnunet-java/src/org/gnunet/util/Program.java
   gnunet-java/src/org/gnunet/util/Resolver.java
   gnunet-java/src/org/gnunet/util/Scheduler.java
   gnunet-java/src/org/gnunet/util/Server.java
   gnunet-java/src/org/gnunet/util/Strings.java
Log:
core now working, several fixes in construct, started implementing the 
server/service

Modified: gnunet-java/.classpath
===================================================================
--- gnunet-java/.classpath      2012-04-19 08:53:46 UTC (rev 21019)
+++ gnunet-java/.classpath      2012-04-19 09:54:09 UTC (rev 21020)
@@ -11,7 +11,7 @@
        <classpathentry kind="con" 
path="org.eclipse.jdt.USER_LIBRARY/commons-io-2.1"/>
        <classpathentry kind="lib" path="lib/log4j-1.2.16.jar"/>
        <classpathentry kind="lib" path="lib/slf4j-log4j12-1.6.4.jar"/>
-       <classpathentry kind="lib" 
path="/home/dold/gnunet-java/lib/commons-io-2.1.jar"/>
+       <classpathentry kind="lib" path="lib/commons-io-2.1.jar"/>
        <classpathentry kind="con" 
path="org.eclipse.jdt.USER_LIBRARY/commons-io-2.2"/>
        <classpathentry kind="output" path="build"/>
 </classpath>

Modified: gnunet-java/ISSUES
===================================================================
--- gnunet-java/ISSUES  2012-04-19 08:53:46 UTC (rev 21019)
+++ gnunet-java/ISSUES  2012-04-19 09:54:09 UTC (rev 21020)
@@ -36,8 +36,6 @@
 
 
 
-
-
 * why do we even need transmit_ready_notify? can't we just queue the message?
 * do we need to be able to send stuff that is not a message?
 
@@ -452,3 +450,64 @@
  * when handling signals through the pipe, how is the service / server 
notified?
 
 
+---------------------------------------------------------------------------------------
+* questions on https://gnunet.org/bugs/view.php?id=2277
+
+* if bufferbloat is bad, why do we keep reimplementing action queues / buffers 
and in almost every
+  service api, just not on the lowest level (client)? (c.f. the ControlMessage 
queue)
+
+* Construct: due to some changes in construct (esp. the addition of unions and 
optional nested messages)
+  parser caching does not work correctly anymore, since parsers have now 
context, and not just a
+  target class.
+
+
+* DHT:
+ * monitor: is my observation correct that MonitorMessage is used to report a 
change back to the
+   client *and* to initiate a request(with some fields left at 0)? what are 
the different types of monitor requests?
+ * monitor_stop is wrong / doesn't do anything to cancel the monitor request
+  * it's also used nowhere in the entire code
+
+* Scheduler (had to be touched to implement the server)
+ * where do we need prerequisite tasks?
+  * according to the source, nowhere (except two times for memory deallocation)
+ * is there ever a case where a task should depend on events on a set of FDs?
+  * the can be emulated by multiple tasks
+ * do two tasks ever wait for the same event on the same FD?
+  * wouldn't this indicate a bug most of the time?
+ * currently the above to things are implemented, but complicate/slow down the 
implementation
+
+* slight design problem with PeerIdentity: data should be private final, but 
is public for Construct
+ * gnunet-java doesn't really care about access modifiers anyway, see runabout 
;-)
+
+* I'm still not entirely convinced that core connect needs no timout
+ * what about command line tools?
+ * they'd need an extra timeout task
+
+
+* core
+ * why is priority/timeout in SendMessageRequest and SendMessage?
+ * there is no comment indicating that SendMessage has the message to send as 
trailing data
+ * is 0 the highest or lowest priority? in what range are its values typically?
+ * when can we cancel a transmit?
+ * testing/test_testing_large_topology.c calls CORE_notify_transmit_ready with 
wrong number of arguments
+  (found by greping)
+ * PeerStatusNotify message unused
+
+
+* java reflection APIs have the worst error messages (and checked exceptions),
+  Construct.ReflectUtil now provides better error messages / exceptions
+
+* is it legal to write multiple messages in the callback to 
Core.notifyTransmitReady?
+ * currently this is illegal in gnunet-java
+
+* --with-sqlite=PFX       base of SQLite installation
+ * misleading/confusing
+
+
+* server:
+ * is there an elegent way to reuse code from Client.java in 
Server.ClientHandle?
+ * GNUNET_SERVER_receive_done (struct GNUNET_SERVER_Client *client, int 
success)
+  * comment about success wrong
+ * why is there the client_disconnect function, and receive_done(client, 
false)?
+  * convenience/performance
+ * comment on client_persist
\ No newline at end of file

Modified: gnunet-java/src/org/gnunet/construct/Construct.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/Construct.java 2012-04-19 08:53:46 UTC 
(rev 21019)
+++ gnunet-java/src/org/gnunet/construct/Construct.java 2012-04-19 09:54:09 UTC 
(rev 21020)
@@ -22,7 +22,23 @@
 
     private static HashMap<Class<? extends Message>, Parser> parserCache = new 
HashMap<Class<? extends Message>, Parser>(100);
 
+
     /**
+     * Information the root of the parser, if the target is nested in another 
message.
+     */
+    public static class ParserContext {
+
+    }
+
+    public static class RootedParser {
+        // null if target itself is root
+        ParserContext root;
+        Parser parser;
+    }
+
+
+
+    /**
      * Given a byte buffer with a message, parse it into an object of type c. 
The
      * fields of the class are expected to be annotated with annotations from
      * the construct package.
@@ -77,8 +93,6 @@
 
         SequenceParser parser = new SequenceParser();
 
-
-
         if (!Modifier.isPublic(c.getModifiers())) {
             throw new InterfaceViolationException(String.format("Construct 
Message %s not declared public", c));
         }
@@ -111,7 +125,6 @@
         return getParser(cls, pg);
     }
 
-
     // has to be public, accessed by Runabout (todo: can we do something about 
this scope issue?)
 
     @SuppressWarnings("UnusedDeclaration")
@@ -221,9 +234,6 @@
             parser = new IntegerParser(8, IntegerParser.SIGNED, field);
         }
 
-        public void visit(Integer i) {
-            parser = new IntegerParser(i.byteSize(), i.signed(), field);
-        }
 
         public void visit(ZeroTerminatedString zts) {
             parser = new StringParser(zts.charset(), zts.optional(), field);
@@ -234,26 +244,34 @@
         }
 
         public void visit(NestedMessage n) {
+            if (!Message.class.isAssignableFrom(field.getType())) {
+                throw new AssertionError("@NestedMessage only works on 
messages, " + field.getType()
+                                         + " is not a message (origin: " + c + 
")");
+            }
 
-            Class<? extends Message> ct;
-            //noinspection unchecked
-            ct = (Class<? extends Message>) field
-                    .getType();
+            Field nestedField = field;
 
-            Field old_f = field;
+            if (n.newFrame()) {
+                ParserGenerator pg = new ParserGenerator();
+                Parser p = getParser((Class<Message>) nestedField.getType(), 
pg);
 
-            List<Field> old_path = new ArrayList<Field>(path);
+                parser = new NestedParser(p, pg.frameSizePath, n.optional(), 
nestedField, true);
 
-            path.add(field);
+            } else {
+                Field old_f = field;
+                List<Field> old_path = new ArrayList<Field>(path);
+                Class old_c = c;
 
-            Class old_c = c;
+                path.add(field);
 
-            Parser p = getParser(ct, this);
+                Parser p = getParser((Class<Message>) nestedField.getType(), 
this);
 
-            path = old_path;
-            c = old_c;
-            LinkedList<Field> copy = frameSizePath == null ? null : new 
LinkedList<Field>(frameSizePath);
-            parser = new NestedParser(p, copy, n.optional(), old_f);
+                path = old_path;
+                c = old_c;
+                LinkedList<Field> copy = frameSizePath == null ? null : new 
LinkedList<Field>(frameSizePath);
+
+                parser = new NestedParser(p, copy, n.optional(), old_f, false);
+            }
         }
 
         public void visit(ByteFill bf) {

Modified: gnunet-java/src/org/gnunet/construct/FrameSize.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/FrameSize.java 2012-04-19 08:53:46 UTC 
(rev 21019)
+++ gnunet-java/src/org/gnunet/construct/FrameSize.java 2012-04-19 09:54:09 UTC 
(rev 21020)
@@ -1,9 +1,6 @@
 package org.gnunet.construct;
 
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
+import java.lang.annotation.*;
 
 /**
  * Marker for the field storing the size of the enclosing frame in bytes.
@@ -14,5 +11,4 @@
 @Retention(RetentionPolicy.RUNTIME)
 @Target(ElementType.FIELD)
 public @interface FrameSize {
-
 }

Deleted: gnunet-java/src/org/gnunet/construct/Integer.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/Integer.java   2012-04-19 08:53:46 UTC 
(rev 21019)
+++ gnunet-java/src/org/gnunet/construct/Integer.java   2012-04-19 09:54:09 UTC 
(rev 21020)
@@ -1,22 +0,0 @@
-package org.gnunet.construct;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * An integer with static size. May be signed or unsigned.
- * 
- * Allowed target fields: byte, short, int, long, BigInteger
- * 
- * @author Florian Dold
- * 
- */
address@hidden(RetentionPolicy.RUNTIME)
address@hidden(ElementType.FIELD)
-public @interface Integer {
-    int byteSize();
-
-    boolean signed();
-}

Modified: gnunet-java/src/org/gnunet/construct/MessageLoader.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/MessageLoader.java     2012-04-19 
08:53:46 UTC (rev 21019)
+++ gnunet-java/src/org/gnunet/construct/MessageLoader.java     2012-04-19 
09:54:09 UTC (rev 21020)
@@ -41,7 +41,6 @@
     private static final Logger logger = LoggerFactory
             .getLogger(MessageLoader.class);
 
-
     /**
      * Maps a class and tag to the corresponding union case.
      * <p/>
@@ -58,7 +57,6 @@
 
 
     static {
-        // XXX: should MessageLoader be a singleton?
         loadMessageMap();
     }
 

Modified: gnunet-java/src/org/gnunet/construct/MsgMap.txt
===================================================================
--- gnunet-java/src/org/gnunet/construct/MsgMap.txt     2012-04-19 08:53:46 UTC 
(rev 21019)
+++ gnunet-java/src/org/gnunet/construct/MsgMap.txt     2012-04-19 09:54:09 UTC 
(rev 21020)
@@ -19,8 +19,9 @@
 org.gnunet.util.GnunetMessage$Body|76=org.gnunet.core.Core$SendMessage
 org.gnunet.util.GnunetMessage$Body|74=org.gnunet.core.Core$SendMessageRequest
 org.gnunet.util.GnunetMessage$Body|75=org.gnunet.core.Core$SendMessageReady
+org.gnunet.util.GnunetMessage$Body|42001=org.gnunet.core.Core$MyMessage
 
org.gnunet.util.GnunetMessage$Body|323=org.gnunet.nse.NetworkSizeEstimation$UpdateMessage
 
org.gnunet.util.GnunetMessage$Body|321=org.gnunet.nse.NetworkSizeEstimation$StartMessage
 
org.gnunet.util.GnunetMessage$Body|144=org.gnunet.dht.DistributedHashTable$DHTClientGetStopMessage
 
org.gnunet.util.GnunetMessage$Body|145=org.gnunet.dht.DistributedHashTable$DHTClientResultMessage
-# generated 2012/04/11 15:38:56
+# generated 2012/04/18 23:26:44

Modified: gnunet-java/src/org/gnunet/construct/ReflectUtil.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/ReflectUtil.java       2012-04-19 
08:53:46 UTC (rev 21019)
+++ gnunet-java/src/org/gnunet/construct/ReflectUtil.java       2012-04-19 
09:54:09 UTC (rev 21020)
@@ -168,7 +168,9 @@
             return f.get(obj);
         } catch (IllegalAccessException e) {
             throw new InterfaceViolationException(
-                    String.format("Cannot access private field %s in class 
%s", f, obj.getClass()));
+                    String.format("Cannot access private field '%s' in class 
%s", f, obj.getClass()));
+        } catch (IllegalArgumentException e) {
+            throw new AssertionError("Cannot access field '" + f.getName() + 
"' in class " + obj.getClass());
         }
     }
 

Modified: gnunet-java/src/org/gnunet/construct/parsers/NestedParser.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/parsers/NestedParser.java      
2012-04-19 08:53:46 UTC (rev 21019)
+++ gnunet-java/src/org/gnunet/construct/parsers/NestedParser.java      
2012-04-19 09:54:09 UTC (rev 21020)
@@ -14,14 +14,16 @@
 
     private final Parser nestedParser;
     private List<Field> frameSizePath;
+    private boolean newFrame;
 
     boolean optional;
 
-    public NestedParser(final Parser p, List<Field> frameSizePath, boolean 
optional, final Field f) {
+    public NestedParser(final Parser p, List<Field> frameSizePath, boolean 
optional, final Field f, boolean newFrame) {
         targetField = f;
         this.optional = optional;
         this.nestedParser = p;
         this.frameSizePath = frameSizePath;
+        this.newFrame = newFrame;
     }
 
     @Override
@@ -37,6 +39,10 @@
 
     @Override
     public int parse(final ByteBuffer srcBuf, int frameOffset, Message 
frameObj, final Message dstObj) {
+        if (newFrame) {
+            frameObj = dstObj;
+            frameOffset = 0;
+        }
 
 
         if (optional) {
@@ -66,9 +72,13 @@
 
     @Override
     public void patch(Message m, int frameSize, Message frameObj) {
-        // todo: nested/opaque frames
-        nestedParser.patch(m, frameSize, frameObj);
+        Message nestedMessage = (Message) ReflectUtil.justGet(m, targetField);
 
+        if (newFrame) {
+            nestedParser.patch(nestedMessage, 
nestedParser.getSize(nestedMessage), m);
+        } else {
+            nestedParser.patch(nestedMessage, frameSize, frameObj);
+        }
     }
 
 }

Modified: gnunet-java/src/org/gnunet/core/Core.java
===================================================================
--- gnunet-java/src/org/gnunet/core/Core.java   2012-04-19 08:53:46 UTC (rev 
21019)
+++ gnunet-java/src/org/gnunet/core/Core.java   2012-04-19 09:54:09 UTC (rev 
21020)
@@ -2,14 +2,56 @@
 
 import org.gnunet.construct.*;
 import org.gnunet.util.*;
+import org.grothoff.Runabout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
 public class Core {
     private static final Logger logger = LoggerFactory
             .getLogger(Core.class);
 
+    private final Client client;
+    /**
+     * set to null once connected for the first time
+     */
+    private InitCallback init;
 
+    private boolean currentlyDown = true;
+
+    private HeaderNotify notifyOutboundHeaders;
+    private HeaderNotify notifyInboundHeaders;
+
+    private MessageNotify notifyOutboundMessages;
+    private MessageNotify notifyInboundMessages;
+
+    private int initOptions;
+
+    private ConnectHandler connectHandler;
+    private DisconnectHandler disconnectHandler;
+
+    private PeerIdentity myIdentity;
+
+    private final CoreReceiver coreReceiver = new CoreReceiver();
+
+    // per default we are interested in all messages => specific interest set 
is empty
+    private int[] interested = new int[0];
+    private Runabout messageHandler;
+
+    HashMap<PeerIdentity, ConnectedPeerInfo> connectedPeers = new 
HashMap<PeerIdentity, ConnectedPeerInfo>(10);
+
+    LinkedList<TransmitHandle> pending = new LinkedList<TransmitHandle>();
+
+    LinkedList<SendMessage> approvedSendMessages = new 
LinkedList<SendMessage>();
+
+
+    private Client.TransmitHandle currentClientTransmitHandle;
+
     enum CoreOption {
         NOTHING(0),
         SEND_FULL_INBOUND(8),
@@ -60,12 +102,17 @@
         @NestedMessage
         public PeerIdentity peer;
 
-        /**
-         * First of the ATS information blocks (we must have at least
-         * one due to the 0-termination requirement).
-         */
         @NestedMessage
-        public ATSInformation atsInformation;
+        public ATSInformation atsFirst;
+
+        @VariableSizeArray(lengthField = "ats_count")
+        public ATSInformation[] atsRest;
+
+        @NestedMessage(newFrame = true)
+        public GnunetMessage.Header payloadHeader;
+
+        @Union(tag = "payloadHeader.messageType", optional = true)
+        public GnunetMessage.Body payloadBody;
     }
 
 
@@ -84,14 +131,17 @@
         @NestedMessage
         public PeerIdentity peer;
 
-        /**
-         * First of the ATS information blocks (we must have at least
-         * one due to the 0-termination requirement).
-         */
-        //@NestedMessage
-        //public ATSInformation atsInformation;
-        @ByteFill
-        public byte[] ats;
+        @NestedMessage
+        public ATSInformation atsFirst;
+
+        @VariableSizeArray(lengthField = "ats_count")
+        public ATSInformation[] atsRest;
+
+        @NestedMessage(newFrame = true)
+        public GnunetMessage.Header payloadHeader;
+
+        @Union(tag = "payloadHeader.messageType", optional = true)
+        public GnunetMessage.Body payloadBody;
     }
 
 
@@ -205,10 +255,10 @@
         public int size;
 
         /**
-         * smr_id from the request.
+         * smrId from the request.
          */
         @UInt16
-        public int smr_id;
+        public int smrId;
 
         /**
          * Identity of the intended target.
@@ -234,7 +284,7 @@
          * message transmitted?
          */
         @NestedMessage
-        public AbsoluteTime deadline;
+        public AbsoluteTimeMessage deadline;
 
         /**
          * Identity of the intended receiver.
@@ -254,51 +304,32 @@
         @UInt64
         public int reserved;
 
-    }
+        @NestedMessage(newFrame = true)
+        public GnunetMessage payloadMessage;
 
-    public interface MessageHandler {
-
     }
 
-    public interface Continuation {
-
-    }
-
+    /**
+     * Called once the handshake with core was successful.
+     */
     public interface InitCallback {
         void onInit(PeerIdentity myIdentity);
     }
 
+    /**
+     * Called when a new peer (with a compatible set of messages) connects to 
core
+     */
     public interface ConnectHandler {
         void onConnect(PeerIdentity peerIdentity);
     }
 
+    /**
+     * Called when a peer disconnects from the core.
+     */
     public interface DisconnectHandler {
         void onDisconnect(PeerIdentity peerIdentity);
-
     }
 
-    private final Client client;
-    /**
-     * set to null once connected for the first time
-     */
-    private InitCallback init;
-
-    private HeaderNotify notifyOutboundHeaders;
-    private HeaderNotify notifyInboundHeaders;
-
-    private MessageNotify notifyOutboundMessages;
-    private MessageNotify notifyInboundMessages;
-
-    private int initOptions;
-
-    private ConnectHandler connectHandler;
-    private DisconnectHandler disconnectHandler;
-
-    private PeerIdentity myIdentity;
-
-    private final CoreReceiver coreReceiver = new CoreReceiver();
-
-
     public interface HeaderNotify {
         void notify(GnunetMessage.Header header);
     }
@@ -307,131 +338,127 @@
         void notify(GnunetMessage messageBody);
     }
 
-    public static class Builder {
-        /**
-         * Configuration for a core connection.
-         *
-         * @param cfg Mandatory configuration
-         */
-        final private Configuration cfg;
-        private InitCallback init;
-        private HeaderNotify notifyOutboundHeaders;
-        private HeaderNotify notifyInboundHeaders;
 
-        private MessageNotify notifyOutboundMessages;
-        private MessageNotify notifyInboundMessages;
-        private ConnectHandler connectHandler;
-        private DisconnectHandler disconnectHandler;
-        private PeerIdentity myIdentity;
+    /**
+     * Represents a request for transmission.
+     */
+    public class TransmitHandle {
+        private PeerIdentity peerIdentity;
+        private MessageTransmitter transmitter;
+        private int smrId;
+        private Cancelable timeoutHandle;
+        private int size;
+        public AbsoluteTime deadline;
+    }
 
-        public Builder(Configuration cfg) {
-            this.cfg = cfg;
-        }
+    private class ConnectedPeerInfo {
+        public Map<Integer, TransmitHandle> requestsToPeer = new 
HashMap<Integer, TransmitHandle>(1);
+        public int nextSmrId;
+    }
 
-        public Builder withInit(InitCallback init) {
-            this.init = init;
-            return this;
-        }
-
-        public Builder addHandler(MessageHandler handler) {
-            return this;
-        }
-
-        public Builder withNotifyOutboundFull(MessageNotify h) {
-            return this;
-        }
-
-        public Builder withNotifyOutboundHeaders(HeaderNotify h) {
-            notifyOutboundHeaders = h;
-            return this;
-        }
-
-        public Builder withNotifyConnect(ConnectHandler h) {
-            connectHandler = h;
-            return this;
-        }
-
-        public Builder withNotifyDisconnect(DisconnectHandler h) {
-            disconnectHandler = h;
-            return this;
-        }
-
-        public Core build() {
-            return new Core(this);
-        }
-
+    private Core(Configuration cfg) {
+        client = new Client("core", cfg);
     }
 
-    private Core(Builder b) {
-        client = new Client("core", b.cfg);
-        this.notifyOutboundHeaders = b.notifyOutboundHeaders;
-
-        initOptions = 0;
-
-        if (notifyOutboundHeaders != null) {
-            //initOptions |= CoreOption.SEND_HDR_OUTBOUND.val;
-        }
-        this.init = b.init;
-
-
-        // XXX: only temporary for debugging
-        initOptions |= CoreOption.SEND_FULL_INBOUND.val;
-        initOptions |= CoreOption.SEND_HDR_OUTBOUND.val;
-
-        this.connectHandler = b.connectHandler;
-        this.disconnectHandler = b.disconnectHandler;
-
+    public void init(InitCallback initCallback) {
+        this.init = initCallback;
         reconnect();
     }
 
     void reconnect() {
         InitMessage initMessage = new InitMessage();
 
-        initMessage.interested = new int[0];
-        initMessage.options = initOptions;
+        initMessage.interested = interested;
+        initMessage.options = 0;
 
+        for (int i : interested) {
+            logger.info("we are interested in " + i);
+        }
+
         client.transmitAndGetResponse(initMessage, RelativeTime.FOREVER, true, 
new CoreReceiver());
     }
 
     public class CoreReceiver extends RunaboutMessageReceiver {
 
-        public void visit(ConnectNotifyMessage m) {
-            logger.info("got connectnotify");
 
-            if (connectHandler != null) {
-                connectHandler.onConnect(m.peer);
-            }
-            client.receive(RelativeTime.FOREVER, this);
-        }
-
         public void visit(InitReplyMessage m) {
-            logger.info("got initreply");
+            myIdentity = m.myIdentity;
+            connectedPeers.put(myIdentity, new ConnectedPeerInfo());
 
-            myIdentity = m.myIdentity;
             if (init != null) {
                 init.onInit(m.myIdentity);
                 init = null;
             }
+
             client.receive(RelativeTime.FOREVER, this);
         }
 
+
+        public void visit(ConnectNotifyMessage m) {
+            if (connectHandler != null) {
+                connectHandler.onConnect(m.peer);
+            }
+
+            client.receive(RelativeTime.FOREVER, this);
+        }
+
         public void visit(NotifyInboundTrafficMessage m) {
-            logger.info("got NotifyInbount");
+            boolean found = false;
+            for (int i : interested) {
+                if (i == m.payloadHeader.messageType) {
+                    found = true;
+                    break;
+                }
+            }
+            if (found) {
+                messageHandler.visitAppropriate(m.payloadBody);
+            } else {
+                // do stuff
+            }
 
             client.receive(RelativeTime.FOREVER, this);
         }
 
         public void visit(NotifyOutboundTrafficMessage m) {
-            logger.info("got NotifyOutbound");
 
             client.receive(RelativeTime.FOREVER, this);
         }
 
         public void visit(SendMessageReady m) {
-            logger.info("got SendMessageReady");
+            ConnectedPeerInfo targetPeerInfo = connectedPeers.get(m.peer);
 
-            // todo: ...
+            if (targetPeerInfo == null) {
+                logger.error("SendMessage ready received for unconnected peer 
" + m.peer);
 
+                client.receive(RelativeTime.FOREVER, this);
+                return;
+            }
+
+            TransmitHandle transmitHandle = 
targetPeerInfo.requestsToPeer.get(m.smrId);
+
+            final SendMessage sendMessage = new SendMessage();
+
+
+            sendMessage.deadline = transmitHandle.deadline.asMessage();
+            sendMessage.cork = 0;
+            sendMessage.peer = transmitHandle.peerIdentity;
+            sendMessage.priority = 0;
+
+            transmitHandle.transmitter.transmit(new Client.MessageSink() {
+                @Override
+                public void send(GnunetMessage.Body m) {
+                    if (sendMessage.payloadMessage != null) {
+                        throw new AssertionError("can only send one payload 
message");
+                    }
+                    logger.info("sink called");
+                    sendMessage.payloadMessage = GnunetMessage.fromBody(m);
+                }
+            });
+
+            approvedSendMessages.add(sendMessage);
+
+            transmitPending();
+
             client.receive(RelativeTime.FOREVER, this);
         }
 
@@ -443,44 +470,157 @@
 
         @Override
         public void handleError() {
+            throw new AssertionError("unexpected");
         }
     }
 
+    /**
+     * Ask the core to call "notify" once it is ready to transmit the
+     * given number of bytes to the specified "target".    Must only be
+     * called after a connection to the respective peer has been
+     * established (and the client has been informed about this).
+     *
+     * @param priority    how important is the message?
+     * @param maxdelay    maximum time until transmitter gets called
+     * @param target      the identity of the receiver
+     * @param size        the size of the message we want to transmit
+     * @param transmitter called once the core service is ready to send message
+     * @return
+     */
+    public Cancelable notifyTransmitReady(long priority, RelativeTime maxdelay,
+                                          PeerIdentity targetIdentity, int 
size, final MessageTransmitter transmitter) {
+        if (!connectedPeers.containsKey(targetIdentity)) {
+            throw new AssertionError("trying to send message to unconnected 
peer");
+        }
 
-    public Cancelable notifyTransmitReady(boolean cork, long priority, 
RelativeTime maxdelay,
-                                          PeerIdentity target, int size, 
MessageTransmitter transmitter) {
-        final SendMessageRequest smr = new SendMessageRequest();
-        smr.deadline = maxdelay.toAbsolute().asMessage();
-        if (target == null) {
-            smr.peer = myIdentity;
-        } else {
-            smr.peer = target;
+        ConnectedPeerInfo cpi = connectedPeers.get(targetIdentity);
+
+        final TransmitHandle transmitHandle = new TransmitHandle();
+
+        transmitHandle.peerIdentity = targetIdentity;
+        transmitHandle.size = size;
+        transmitHandle.smrId = cpi.nextSmrId;
+        transmitHandle.transmitter = transmitter;
+        cpi.nextSmrId += 1;
+        transmitHandle.deadline = maxdelay.toAbsolute();
+
+        cpi.requestsToPeer.put(transmitHandle.smrId, transmitHandle);
+
+        if (!maxdelay.isForever()) {
+            transmitHandle.timeoutHandle = Scheduler.add(maxdelay, new 
Scheduler.Task() {
+                @Override
+                public void run(Scheduler.RunContext ctx) {
+                    transmitter.handleError();
+                }
+            });
         }
-        smr.queueSize = 10;
-        smr.smrId = 1;
-        smr.size = size;
 
-        client.notifyTransmitReady(maxdelay, false, new MessageTransmitter() {
+        // todo: add so that 'pending' is ordered by maxdelay
+        pending.add(transmitHandle);
+
+        transmitPending();
+
+        return new Cancelable() {
             @Override
-            public void transmit(Client.MessageSink sink) {
-                sink.send(smr);
+            public void cancel() {
+                pending.remove(transmitHandle);
+                if (transmitHandle.timeoutHandle != null) {
+                    transmitHandle.timeoutHandle.cancel();
+                }
             }
+        };
 
-            @Override
-            public void handleError(Client.TransmitError error) {
-                throw new RuntimeException("client error");
-            }
-        });
-        return null;
+
     }
 
+    /**
+     * Send the next message to core.
+     */
+    private void transmitPending() {
+        logger.info("transmit pending called 1");
+        // todo: check if not connected, connect then first!
+        if (currentClientTransmitHandle != null) {
+            logger.info("returning");
+            return;
+        }
 
-    public Cancelable transmitMessage(GnunetMessage.Body messageBody, 
RelativeTime timeout,
-                                      PeerIdentity target, Continuation cont) {
-        return null;
+        logger.info("transmit pending called 2");
 
+        final TransmitHandle transmitHandle = pending.poll();
+        if (transmitHandle != null) {
+            logger.info("calling NTR for transmitHandle");
+            currentClientTransmitHandle = 
client.notifyTransmitReady(RelativeTime.FOREVER, false,
+                    new MessageTransmitter() {
+                        @Override
+                        public void transmit(Client.MessageSink sink) {
+                            logger.info("transmitHandle NTR done");
+                            currentClientTransmitHandle = null;
+
+                            SendMessageRequest request = new 
SendMessageRequest();
+                            request.peer = transmitHandle.peerIdentity;
+                            request.priority = 0;
+                            request.queueSize = 5;
+                            request.smrId = transmitHandle.smrId;
+                            request.size = transmitHandle.size;
+
+                            request.deadline = 
transmitHandle.deadline.asMessage();
+
+                            sink.send(request);
+
+                            transmitPending();
+                        }
+
+                        @Override
+                        public void handleError() {
+                            logger.error("error in SendMessageTransmitter");
+                            transmitPending();
+                        }
+                    });
+
+            logger.info("left at transmithandle" + 
(currentClientTransmitHandle == null));
+            return;
+        }
+
+        final SendMessage sm = approvedSendMessages.poll();
+        if (sm != null) {
+            logger.info("calling NTR for apprvedSendMessages");
+            currentClientTransmitHandle = 
client.notifyTransmitReady(RelativeTime.FOREVER, false,
+                    new MessageTransmitter() {
+                        @Override
+                        public void transmit(Client.MessageSink sink) {
+                            currentClientTransmitHandle = null;
+                            sink.send(sm);
+                            logger.info("transmitted SendMessage!");
+                            transmitPending();
+                        }
+
+                        @Override
+                        public void handleError() {
+                            throw new AssertionError("unexpected");
+                        }
+                    });
+            return;
+        }
+        logger.info("transmitPending did nothing");
     }
 
+
+    public void observeOutboundHeaders(HeaderNotify h) {
+    }
+
+    public void observeConnect(ConnectHandler connectHandler) {
+        this.connectHandler = connectHandler;
+    }
+
+
+    public void handleMessages(Runabout runabout) {
+        if (messageHandler != null) {
+            throw new AssertionError("Core can have only on message handler");
+        }
+        messageHandler = runabout;
+        interested = getRunaboutMessageTypes(runabout);
+    }
+
     /**
      * Disconnect from the core service.    This function can only
      * be called *after* all pending notifyTransmitReady
@@ -490,48 +630,93 @@
 
     }
 
+    private static ArrayList<Class> getRunaboutVisitees(Runabout r) {
+        Class rc = r.getClass();
+        ArrayList<Class> ret = new ArrayList<Class>(5);
+        for (Method m : rc.getMethods()) {
+            if (!(m.getName().equals("visit") && m.getParameterTypes().length 
== 1)) {
+                continue;
+            }
+            ret.add(m.getParameterTypes()[0]);
+        }
+        return ret;
+    }
+
+    private static int[] getRunaboutMessageTypes(Runabout r) {
+        ArrayList<Class> visitees = getRunaboutVisitees(r);
+        int[] msgtypes = new int[visitees.size()];
+        for (int i = 0; i < visitees.size(); ++i) {
+            msgtypes[i] = MessageLoader.getUnionTag(GnunetMessage.Body.class, 
visitees.get(i));
+        }
+        return msgtypes;
+    }
+
+
+    public static class BlaRunabout extends Runabout {
+        public void visit(SendMessage x) {
+
+        }
+
+        public void visit(SendMessageReady x) {
+
+        }
+    }
+
+    @UnionCase(42001)
+    public static class MyMessage implements GnunetMessage.Body {
+        @UInt32
+        public int number;
+        @ZeroTerminatedString
+        public String message;
+    }
+
+    public static class MyMessageHandler extends Runabout {
+        public void visit(MyMessage msg) {
+            System.out.println("got my message!!");
+            System.out.println(msg.message);
+            System.out.println(msg.number);
+        }
+    }
+
     public static void main(String[] args) {
         new Program(args) {
-            Core core;
-
             public void run() {
-                Core.Builder b = new Core.Builder(cfg);
+                final Core core = new Core(this.getConfiguration());
 
-                b.withInit(new InitCallback() {
+                core.observeConnect(new ConnectHandler() {
                     @Override
-                    public void onInit(PeerIdentity myIdentity) {
-                        System.out.print("Hello, I'm ");
-                        
System.out.println(Strings.dataToString(myIdentity.data));
-
-                        core.notifyTransmitReady(true, 0, RelativeTime.SECOND, 
null, 4, new MessageTransmitter() {
-                            @Override
-                            public void transmit(Client.MessageSink sink) {
-                                System.out.println("ready to transmit");
-                            }
-
-                            @Override
-                            public void handleError(Client.TransmitError 
error) {
-                            }
-                        });
+                    public void onConnect(PeerIdentity peerIdentity) {
+                        
System.out.println(Strings.dataToString(peerIdentity.data));
                     }
                 });
 
-                b.withNotifyOutboundHeaders(new HeaderNotify() {
+
+                core.handleMessages(new MyMessageHandler());
+
+
+                final MyMessage myMessage = new MyMessage();
+                myMessage.message = "Hello, gnunet";
+                myMessage.number = 42;
+
+                core.init(new InitCallback() {
                     @Override
-                    public void notify(GnunetMessage.Header header) {
-                        System.out.println("my peer sent message of type " + 
header.messageType);
-                    }
-                });
+                    public void onInit(PeerIdentity myIdentity) {
+                        System.out.print("Hello, I'm ");
+                        
System.out.println(Strings.dataToString(myIdentity.data));
 
+                        core.notifyTransmitReady(0, RelativeTime.SECOND, 
myIdentity, Construct.getSize(myMessage),
+                                new MessageTransmitter() {
+                                    @Override
+                                    public void transmit(Client.MessageSink 
sink) {
+                                        sink.send(myMessage);
+                                    }
 
-                b.withNotifyConnect(new ConnectHandler() {
-                    @Override
-                    public void onConnect(PeerIdentity peerIdentity) {
-                        System.out.println("connected to " + 
Strings.dataToString(peerIdentity.data));
+                                    @Override
+                                    public void handleError() {
+                                    }
+                                });
                     }
                 });
-
-                core = b.build();
             }
         }.start();
     }

Modified: gnunet-java/src/org/gnunet/dht/DistributedHashTable.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/DistributedHashTable.java    2012-04-19 
08:53:46 UTC (rev 21019)
+++ gnunet-java/src/org/gnunet/dht/DistributedHashTable.java    2012-04-19 
09:54:09 UTC (rev 21020)
@@ -239,7 +239,7 @@
                     }
 
                     @Override
-                    public void handleError(Client.TransmitError error) {
+                    public void handleError() {
                         cont.call(PutResult.TIMEOUT);
                     }
                 });
@@ -280,7 +280,7 @@
                 }
 
                 @Override
-                public void handleError(Client.TransmitError error) {
+                public void handleError() {
                 }
             });
         }
@@ -365,7 +365,7 @@
             }
 
             @Override
-            public void handleError(Client.TransmitError error) {
+            public void handleError() {
                 if (!request.canceled) {
                     request.cancel();
                 }
@@ -375,7 +375,7 @@
             }
         });
         
-        Scheduler.add(new Scheduler.Task() {
+        Scheduler.add(timeout, new Scheduler.Task() {
             @Override
             public void run(Scheduler.RunContext ctx) {
                 logger.debug("timeout task has been run");
@@ -384,16 +384,19 @@
                     request.cancel();
                 }
             }
-        }, timeout);
+        });
 
         return request;
     }
 
+    public Cancelable startMonitor() {
+        return null;
+    }
+
     public void destroy() {
         client.disconnect();
     }
 
-
     public static void main(String[] args) {
         new Program(args) {
             @Option(action = OptionAction.SET,

Modified: gnunet-java/src/org/gnunet/nse/NetworkSizeEstimation.java
===================================================================
--- gnunet-java/src/org/gnunet/nse/NetworkSizeEstimation.java   2012-04-19 
08:53:46 UTC (rev 21019)
+++ gnunet-java/src/org/gnunet/nse/NetworkSizeEstimation.java   2012-04-19 
09:54:09 UTC (rev 21020)
@@ -86,7 +86,7 @@
         }
 
         @Override
-        public void handleError(Client.TransmitError error) {
+        public void handleError() {
             throw new RuntimeException("unexpected transmitt error");
         }
     }

Modified: gnunet-java/src/org/gnunet/statistics/Statistics.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/Statistics.java       2012-04-19 
08:53:46 UTC (rev 21019)
+++ gnunet-java/src/org/gnunet/statistics/Statistics.java       2012-04-19 
09:54:09 UTC (rev 21020)
@@ -113,7 +113,7 @@
             }
 
             @Override
-            public void handleError(Client.TransmitError error) {
+            public void handleError() {
                 throw new RuntimeException("unexpected transmit error");
             }
         }
@@ -154,7 +154,7 @@
             }
 
             @Override
-            public void handleError(Client.TransmitError error) {
+            public void handleError() {
                 throw new RuntimeException("unexpected transmit error");
             }
         });

Modified: gnunet-java/src/org/gnunet/util/Client.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Client.java 2012-04-19 08:53:46 UTC (rev 
21019)
+++ gnunet-java/src/org/gnunet/util/Client.java 2012-04-19 09:54:09 UTC (rev 
21020)
@@ -264,7 +264,7 @@
             b.withTask(new Task() {
                 @Override
                 public void run(Scheduler.RunContext ctx) {
-                    transmitter.handleError(TransmitError.TIMEOUT);
+                    transmitter.handleError();
                 }
             });
             b.withTimeout(notifyTimeout);
@@ -562,7 +562,7 @@
      *                    if the caller does not care about temporary 
connection errors,
      *                    for example because the protocol is stateless
      * @param transmitter the MessageTransmitter object to call once the 
client is ready to transmit or
-     *                    when the timeout is over.
+     *                    when the timeout is over. Guaranteed to be called 
*after* notifyTransmitReady has returned.
      * @return a handle that can be used to cancel the transmit request
      */
     public TransmitHandle notifyTransmitReady(RelativeTime timeout,
@@ -574,7 +574,7 @@
 
         // negative timeout
         if (timeout.getMilliseconds() <= 0) {
-            transmitter.handleError(TransmitError.TIMEOUT);
+            transmitter.handleError();
             return new TransmitHandle() {
                 @Override
                 public void cancel() {
@@ -591,7 +591,12 @@
         if (currentTransmitHelper == null) {
             currentTransmitHelper = transmit;
             if (chan.isConnected()) {
-                currentTransmitHelper.start();
+                Scheduler.add(new Task() {
+                    @Override
+                    public void run(Scheduler.RunContext ctx) {
+                        currentTransmitHelper.start();
+                    }
+                });
             }
         } else {
             nextTransmitHelper = transmit;
@@ -620,7 +625,7 @@
             }
 
             @Override
-            public void handleError(TransmitError error) {
+            public void handleError() {
                 receiver.handleError();
             }
         });

Modified: gnunet-java/src/org/gnunet/util/GnunetMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/util/GnunetMessage.java  2012-04-19 08:53:46 UTC 
(rev 21019)
+++ gnunet-java/src/org/gnunet/util/GnunetMessage.java  2012-04-19 09:54:09 UTC 
(rev 21020)
@@ -24,7 +24,8 @@
         GnunetMessage msg = new GnunetMessage();
         msg.header = new Header();
         msg.header.messageSize = Header.SIZE + Construct.getSize(b);
-
+        msg.header.messageType = 
MessageLoader.getUnionTag(GnunetMessage.Body.class, b.getClass());
+        msg.body = b;
         return msg;
     }
 

Deleted: gnunet-java/src/org/gnunet/util/IOContinuation.java
===================================================================
--- gnunet-java/src/org/gnunet/util/IOContinuation.java 2012-04-19 08:53:46 UTC 
(rev 21019)
+++ gnunet-java/src/org/gnunet/util/IOContinuation.java 2012-04-19 09:54:09 UTC 
(rev 21020)
@@ -1,29 +0,0 @@
-/*
- *
- * This file is part of GNUnet.
- * (C) 2011 Christian Grothoff (and other contributing authors)
- *
- * GNUnet is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published
- * by the Free Software Foundation; either version 2, or (at your
- * option) any later version.
- *
- * GNUnet is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with GNUnet; see the file COPYING.  If not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- *
- */
-
-package org.gnunet.util;
-
-public interface IOContinuation {
-    void handleSuccess();
-    void handleTimeout();
-
-}

Modified: gnunet-java/src/org/gnunet/util/MessageTransmitter.java
===================================================================
--- gnunet-java/src/org/gnunet/util/MessageTransmitter.java     2012-04-19 
08:53:46 UTC (rev 21019)
+++ gnunet-java/src/org/gnunet/util/MessageTransmitter.java     2012-04-19 
09:54:09 UTC (rev 21020)
@@ -9,5 +9,5 @@
      */
     public void transmit(Client.MessageSink sink);
 
-    void handleError(Client.TransmitError error);
+    void handleError();
 }

Modified: gnunet-java/src/org/gnunet/util/PeerIdentity.java
===================================================================
--- gnunet-java/src/org/gnunet/util/PeerIdentity.java   2012-04-19 08:53:46 UTC 
(rev 21019)
+++ gnunet-java/src/org/gnunet/util/PeerIdentity.java   2012-04-19 09:54:09 UTC 
(rev 21020)
@@ -4,6 +4,8 @@
 import org.gnunet.construct.FixedSizeByteArray;
 import org.gnunet.construct.Message;
 
+import java.util.Arrays;
+
 public class PeerIdentity implements Message {
 
     @FixedSizeByteArray(length = 64)
@@ -19,4 +21,18 @@
         }
         return hex.toString();
     }
+
+    public String toString() {
+        return Strings.dataToString(data);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return Arrays.equals(((PeerIdentity) obj).data, this.data);
+    }
+
+    @Override
+    public int hashCode() {
+        return Arrays.hashCode(data);
+    }
 }

Modified: gnunet-java/src/org/gnunet/util/Program.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Program.java        2012-04-19 08:53:46 UTC 
(rev 21019)
+++ gnunet-java/src/org/gnunet/util/Program.java        2012-04-19 09:54:09 UTC 
(rev 21020)
@@ -150,5 +150,9 @@
      * Override to implement the behavior of the Program.
      */
     public abstract void run();
+
+    public final Configuration getConfiguration() {
+        return cfg;
+    }
 }
  
\ No newline at end of file

Modified: gnunet-java/src/org/gnunet/util/Resolver.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Resolver.java       2012-04-19 08:53:46 UTC 
(rev 21019)
+++ gnunet-java/src/org/gnunet/util/Resolver.java       2012-04-19 09:54:09 UTC 
(rev 21020)
@@ -362,7 +362,7 @@
                     }
 
                     @Override
-                    public void handleError(Client.TransmitError error) {
+                    public void handleError() {
                         throw new RuntimeException("unexpected transmit 
error");
                     }
                 });

Modified: gnunet-java/src/org/gnunet/util/Scheduler.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Scheduler.java      2012-04-19 08:53:46 UTC 
(rev 21019)
+++ gnunet-java/src/org/gnunet/util/Scheduler.java      2012-04-19 09:54:09 UTC 
(rev 21020)
@@ -46,21 +46,19 @@
     }
 
     public enum Reason {
-        STARTUP, SHUTDOWN, TIMEOUT, READ_READY, WRITE_READY, CONNECT_READY, 
PREREQ_DONE
+        STARTUP, SHUTDOWN, TIMEOUT, READ_READY, WRITE_READY, ACCEPT_READY, 
CONNECT_READY
     }
 
-
     /**
      * The context of a task that is ready to run.
      */
     public static class RunContext {
         Set<Reason> reasons = EnumSet.noneOf(Reason.class);
-        Set<Channel> readableSet = null;
-        Set<Channel> writeableSet = null;
-        Set<Channel> connectedSet = null;
+        Channel eventChannel;
+
         public RunContext() {
+        }
 
-        }
         public RunContext(Set<Reason> reasons) {
             this.reasons = reasons;
         }
@@ -80,6 +78,7 @@
         Set<TaskIdentifier> read = new TreeSet<TaskIdentifier>();
         Set<TaskIdentifier> write = new TreeSet<TaskIdentifier>();
         Set<TaskIdentifier> connect = new TreeSet<TaskIdentifier>();
+        Set<TaskIdentifier> accept = new TreeSet<TaskIdentifier>();
     }
 
     /**
@@ -87,21 +86,19 @@
      */
     public static class TaskIdentifier implements Comparable<TaskIdentifier>, 
Cancelable {
         private final Task task;
-        private TaskIdentifier prereq;
         private RunContext ctx = new RunContext();
         private boolean lifeness;
         private final Priority priority;
         private final AbsoluteTime deadline;
-        private Set<SelectableChannel> rs=null, ws=null, cs=null;
+        private Set<SelectableChannel> rs = null, ws = null, cs = null, as = 
null;
 
 
         TaskIdentifier(Task t, Priority priority,
-                       boolean liveness, TaskIdentifier prereq, RelativeTime 
timeout,
+                       boolean liveness, RelativeTime timeout,
                        Set<SelectableChannel> rs, Set<SelectableChannel> ws, 
Set<SelectableChannel> cs) {
             this.task = t;
             this.priority = (activeTask == null) ? Priority.DEFAULT : 
activeTask.priority;
             this.lifeness = liveness;
-            this.prereq = prereq;
 
             if (timeout.getMilliseconds() < 0) {
                 throw new InterfaceViolationException("timeout must be (>=0)");
@@ -128,9 +125,16 @@
                     registerSelect(sc, SelectionKey.OP_CONNECT);
                 }
             }
+            if (as != null) {
+                for (SelectableChannel sc : as) {
+                    logger.debug("registering for OP_CONNECT");
+                    registerSelect(sc, SelectionKey.OP_ACCEPT);
+                }
+            }
             this.ws = new TreeSet<SelectableChannel>(ws == null ? 
Collections.EMPTY_SET : ws);
             this.rs = new TreeSet<SelectableChannel>(rs == null ? 
Collections.EMPTY_SET : rs);
             this.cs = new TreeSet<SelectableChannel>(cs == null ? 
Collections.EMPTY_SET : cs);
+            this.as = new TreeSet<SelectableChannel>(as == null ? 
Collections.EMPTY_SET : as);
 
             boolean selectEmpty = (ws == null || ws.isEmpty()) && (rs == null 
|| rs.isEmpty())
                     && (cs == null || cs.isEmpty());
@@ -209,6 +213,9 @@
             if ((op & SelectionKey.OP_CONNECT) != 0) {
                 subscribers.connect.add(this);
             }
+            if ((op & SelectionKey.OP_ACCEPT) != 0) {
+                subscribers.accept.add(this);
+            }
         }
 
         private void deregisterOne(SelectableChannel sc, int op) {
@@ -224,8 +231,11 @@
                 subscribers.write.remove(this);
             }
             if ((op & SelectionKey.OP_CONNECT) != 0) {
-                subscribers.write.remove(this);
+                subscribers.connect.remove(this);
             }
+            if ((op & SelectionKey.OP_ACCEPT) != 0) {
+                subscribers.accept.remove(this);
+            }
             if (subscribers.write.isEmpty()) {
                 key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
             }
@@ -235,6 +245,9 @@
             if (subscribers.connect.isEmpty()) {
                 key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
             }
+            if (subscribers.accept.isEmpty()) {
+                key.interestOps(key.interestOps() & ~SelectionKey.OP_ACCEPT);
+            }
 
         }
 
@@ -254,28 +267,27 @@
                     deregisterOne(sc, SelectionKey.OP_CONNECT);
                 }
             }
+            if (this.as != null) {
+                for (SelectableChannel sc : this.as) {
+                    deregisterOne(sc, SelectionKey.OP_ACCEPT);
+                }
+            }
         }
     }
 
 
     public static class TaskBuilder {
         private Task task = null;
-        private TaskIdentifier prereq = null;
         private boolean lifeness = true;
         private Priority prio = null;
         private RelativeTime timeout = RelativeTime.ZERO;
-        private Set<SelectableChannel> rs=null, ws=null, cs=null;
+        private Set<SelectableChannel> rs = null, ws = null, cs = null, as = 
null;
 
         public TaskBuilder withLifeness(boolean lifeness) {
             this.lifeness = lifeness;
             return this;
         }
 
-        public TaskBuilder withPrereq(TaskIdentifier prereq) {
-            this.prereq = prereq;
-            return this;
-        }
-
         public TaskBuilder withPriority(Priority prio) {
             this.prio = prio;
             return this;
@@ -317,13 +329,18 @@
             return this;
         }
 
+        public TaskBuilder withSelectAccept(SelectableChannel c) {
+            this.as = Collections.singleton(c);
+            return this;
+        }
+
         public TaskBuilder withSelectConnectSet(Set<SelectableChannel> cs) {
             this.cs = cs;
             return this;
         }
 
         private TaskIdentifier build() {
-            return new TaskIdentifier(task, prio, lifeness, prereq, timeout, 
rs, ws, cs);
+            return new TaskIdentifier(task, prio, lifeness, timeout, rs, ws, 
cs);
         }
     }
 
@@ -338,7 +355,8 @@
     private static int readyCount = 0;
 
     // for every priority, there is a list of tasks that is definitely ready 
to run
-    final private static ArrayList<LinkedList<TaskIdentifier>> ready = new 
ArrayList<LinkedList<TaskIdentifier>>(Priority.size);
+    final private static ArrayList<LinkedList<TaskIdentifier>> ready = new 
ArrayList<LinkedList<TaskIdentifier>>
+            (Priority.size);
 
     static {
         for (int i = 0; i < Priority.size; ++i) {
@@ -363,12 +381,6 @@
         return (activeTask == null) || activeTask.lifeness;
     }
 
-    public static TaskIdentifier addAfter(final TaskIdentifier prereq,
-                                          final Task t) {
-        return addSelect(Priority.KEEP, prereq, RelativeTime.ZERO, null, null,
-                t);
-    }
-
     /**
      * Run the task regardless of any prerequisites, before any other task of
      * the same priority.
@@ -395,13 +407,13 @@
      *         started!
      */
     public static TaskIdentifier add(Task task) {
-        return addSelect(Priority.KEEP, null, RelativeTime.ZERO, null, null,
+        return addSelect(Priority.KEEP, RelativeTime.ZERO, null, null,
                 task);
     }
 
 
-    public static TaskIdentifier add(Task task, RelativeTime delay) {
-        return addSelect(Priority.KEEP, null, delay, null, null, task);
+    public static TaskIdentifier add(RelativeTime delay, Task task) {
+        return addSelect(Priority.KEEP, delay, null, null, task);
     }
 
 
@@ -437,30 +449,29 @@
      * @return unique task identifier for the job
      *         only valid until "task" is started!
      */
-    public static TaskIdentifier addSelect(Priority p,
-                                           TaskIdentifier prereq, RelativeTime 
delay,
+    public static TaskIdentifier addSelect(Priority p, RelativeTime delay,
                                            Set<SelectableChannel> rs, 
Set<SelectableChannel> ws,
                                            Task t) {
 
-        return new TaskIdentifier(t, p, getCurrentLifeness(), prereq, delay, 
rs, ws, null);
+        return new TaskIdentifier(t, p, getCurrentLifeness(), delay, rs, ws, 
null);
     }
 
     public static TaskIdentifier addRead(RelativeTime timeout,
                                          SelectableChannel chan, Task t) {
-        return Scheduler.addSelect(Priority.KEEP, null, timeout,
+        return Scheduler.addSelect(Priority.KEEP, timeout,
                 Collections.singleton(chan), null, t);
     }
 
     public static TaskIdentifier addWrite(RelativeTime timeout,
                                           SelectableChannel chan, Task t) {
         logger.debug("scheduling write");
-        return Scheduler.addSelect(Priority.KEEP, null, timeout, null,
+        return Scheduler.addSelect(Priority.KEEP, timeout, null,
                 Collections.singleton(chan), t);
     }
 
     public static TaskIdentifier addWithPriority(Priority prio,
                                                  Task t) {
-        return addSelect(prio, null, RelativeTime.ZERO, null, null, t);
+        return addSelect(prio, RelativeTime.ZERO, null, null, t);
     }
 
 
@@ -534,7 +545,7 @@
 
     private static void handleSelect(RelativeTime timeout) {
         try {
-            // selector.select(0) would block indefinitely
+            // selector.select(0) would block indefinitely (conterintuitive, 
java's fault)
             if (timeout.getMilliseconds() == 0) {
                 selector.selectNow();
             } else if (timeout.isForever()) {
@@ -553,7 +564,7 @@
             Object obj = sk.attachment();
             assert (obj instanceof SubscriberSet);
             SubscriberSet ss = (SubscriberSet) obj;
-            
+
             Channel c = sk.channel();
 
             if (sk.isReadable()) {
@@ -590,6 +601,17 @@
                     }
                 }
             }
+            if (sk.isAcceptable()) {
+                logger.debug("adding isAcceptable() task");
+                for (TaskIdentifier tt : ss.accept) {
+                    executableTasks.add(tt);
+                    if (tt.ctx.reasons == null) {
+                        tt.ctx.reasons = EnumSet.of(Reason.ACCEPT_READY);
+                    } else {
+                        tt.ctx.reasons.add(Reason.ACCEPT_READY);
+                    }
+                }
+            }
         }
         for (TaskIdentifier tt : executableTasks) {
             // tasks must do this themselve to cancel subscriptions to other 
channels

Modified: gnunet-java/src/org/gnunet/util/Server.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Server.java 2012-04-19 08:53:46 UTC (rev 
21019)
+++ gnunet-java/src/org/gnunet/util/Server.java 2012-04-19 09:54:09 UTC (rev 
21020)
@@ -6,20 +6,57 @@
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.List;
 
 public class Server {
+
+
+    private RelativeTime idleTimeout;
+    private boolean requireFound;
+    List<ServerSocketChannel> listenSockets;
+    List<ClientHandle> clients;
+
+
     public class ClientHandle {
-        public void notifyTransmitReady() {
 
+        private SocketChannel sock;
+
+        private int referenceCount = 0;
+
+        public ClientHandle(SocketChannel accept) {
+
         }
 
-        public void persist() {
 
+        /**
+         * Notify us when the server has enough space to transmit
+         * a message of the given size to the given client.
+         *
+         * @param client       client to transmit message to
+         * @param size         requested amount of buffer space
+         * @param timeout      after how long should we give up (and call
+         *                     notify with buf NULL and size 0)?
+         * @param callback     function to call when space is available
+         * @param callback_cls closure for callback
+         * @return non-NULL if the notify callback was queued; can be used
+         *         to cancel the request using
+         *         GNUNET_CONNECTION_notify_transmit_ready_cancel.
+         *         NULL if we are already going to notify someone else (busy)
+         */
+        public void notifyTransmitReady(int size, RelativeTime timeout, 
MessageTransmitter transmitter) {
+
         }
 
-        public void receiveDone() {
+        /**
+         * Resume receiving from this client, we are done processing the
+         * current request.  This function must be called from within each
+         * GNUNET_SERVER_MessageCallback (or its respective continuations).
+         *
+         * @param keepClient false if connection to the client should be closed
+         */
+        public void receiveDone(boolean keepClient) {
 
         }
 
@@ -31,6 +68,10 @@
 
         }
 
+        public void disconnect() {
+
+        }
+
         public void disableReceiveDoneWarning() {
 
         }
@@ -38,51 +79,80 @@
         public void injectMessage(/*msg*/) {
 
         }
-
     }
 
 
-    abstract class MessageHandler {
-
-    }
-
     abstract class MessageRunabout extends Runabout {
+        private ClientHandle currentSender;
+        /**
+         * Allows implementors of MessageRunabout to get the Client that sent 
the message
+         * currently visited.
+         *
+         * @return handle of the client whose message is currently being 
visited
+         */
         public ClientHandle getSender() {
             return null;
         }
 
-        private void setSender() {
-
+        private void setSender(ClientHandle clientHandle) {
+            currentSender = clientHandle;
         }
+    }
 
 
+    /**
+     * @param srv
+     */
+    private void doAccept(final ServerSocketChannel srv) {
+        Scheduler.TaskBuilder b = new Scheduler.TaskBuilder();
+        b.withTask(new Scheduler.Task() {
+            @Override
+            public void run(Scheduler.RunContext ctx) {
+                try {
+                    ClientHandle clientHandle = new ClientHandle(srv.accept());
+                    clients.add(clientHandle);
+
+                } catch (IOException e) {
+                    throw new RuntimeException("accept failed", e);
+                }
+                doAccept(srv);
+            }
+        });
+        b.withSelectAccept(srv);
+        Scheduler.add(b);
     }
 
-    List<ServerSocketChannel> sockets = new ArrayList<ServerSocketChannel>(2);
 
-    List<ClientHandle> clients;
 
-
+    /**
+     * Create a server listening on all specified addresses.
+     *
+     * @param addresses    addresses to bind on
+     * @param idleTimeout  time after a client will be disconnected if idle
+     * @param requireFound allow unknown messages to be received without 
disconnecting the client in response
+     */
     public Server(SocketAddress[] addresses, RelativeTime idleTimeout, boolean 
requireFound) {
+        this.idleTimeout = idleTimeout;
+        this.requireFound = requireFound;
+        listenSockets = new ArrayList<ServerSocketChannel>(addresses.length);
         try {
             for (SocketAddress addr : addresses) {
                 ServerSocketChannel socket = ServerSocketChannel.open();
+                socket.socket().bind(addr);
+                listenSockets.add(socket);
+                doAccept(socket);
             }
-
         } catch (IOException e) {
-            e.printStackTrace();
+            throw new RuntimeException("could not bind");
         }
-
     }
 
+    public void addHandler(MessageRunabout cb) {
 
-    // todo: do overloads / parameters wrt runabout
-    public void addHandler(MessageHandler cb) {
-
     }
 
-    public void addHandler(MessageRunabout cb) {
-
+    public Cancelable notifyDisconnect(/* disco handler */) {
+        return null;
     }
 
 

Modified: gnunet-java/src/org/gnunet/util/Strings.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Strings.java        2012-04-19 08:53:46 UTC 
(rev 21019)
+++ gnunet-java/src/org/gnunet/util/Strings.java        2012-04-19 09:54:09 UTC 
(rev 21020)
@@ -1,17 +1,16 @@
 package org.gnunet.util;
 
 public class Strings {
+    private static final String encTable = "0123456789ABCDEFGHIJKLMNOPQRSTUV";
+
     public static String dataToString(byte[] data) {
         StringBuilder sb = new StringBuilder();
-        final String encTable = "0123456789ABCDEFGHIJKLMNOPQRSTUV";
-        long rpos;
-        long bits;
-        long vbit;
+
+        long rpos = 0;
+        long bits = 0;
+        long vbit = 0;
         long size = data.length;
 
-        vbit = 0;
-        rpos = 0;
-        bits = 0;
         while ((rpos < size) || (vbit > 0)) {
             if ((rpos < size) && (vbit < 5)) {
                 byte b = data[(int) rpos++];
@@ -21,7 +20,6 @@
             }
             if (vbit < 5) {
                 bits <<= (5 - vbit);      /* zero-padding */
-                assert (vbit == ((size * 8) % 5));
                 vbit = 5;
             }
             sb.append(encTable.charAt((int) (bits >>> (vbit - 5)) & 31));




reply via email to

[Prev in Thread] Current Thread [Next in Thread]