gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r21063 - gnunet-java/src/org/gnunet/statistics


From: gnunet
Subject: [GNUnet-SVN] r21063 - gnunet-java/src/org/gnunet/statistics
Date: Sun, 22 Apr 2012 12:05:43 +0200

Author: dold
Date: 2012-04-22 12:05:43 +0200 (Sun, 22 Apr 2012)
New Revision: 21063

Modified:
   gnunet-java/src/org/gnunet/statistics/Statistics.java
Log:
improved the statistics API

Modified: gnunet-java/src/org/gnunet/statistics/Statistics.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/Statistics.java       2012-04-22 
06:38:47 UTC (rev 21062)
+++ gnunet-java/src/org/gnunet/statistics/Statistics.java       2012-04-22 
10:05:43 UTC (rev 21063)
@@ -6,13 +6,17 @@
 
 package org.gnunet.statistics;
 
-import org.gnunet.construct.*;
+import org.gnunet.construct.UInt32;
+import org.gnunet.construct.UInt64;
+import org.gnunet.construct.UnionCase;
+import org.gnunet.construct.ZeroTerminatedString;
 import org.gnunet.util.*;
 import org.gnunet.util.getopt.Option;
 import org.gnunet.util.getopt.OptionAction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
 import java.util.LinkedList;
 
 /**
@@ -24,7 +28,12 @@
     private static final Logger logger = LoggerFactory
             .getLogger(Statistics.class);
 
+    private static final RelativeTime PUT_TIMEOUT = 
RelativeTime.SECOND.multiply(5);
+
     private Client client;
+    private LinkedList<StatisticsRequest> requests = new 
LinkedList<StatisticsRequest>();
+    private Cancelable currentTransmit;
+    private boolean disconnectRequested = false;
 
     @UnionCase(169)
     public static class RequestMessage implements GnunetMessage.Body {
@@ -51,6 +60,8 @@
         // empty
     }
 
+    private final static int SETFLAG_PERSIST = 2;
+
     @UnionCase(168)
     public static class SetMessage implements GnunetMessage.Body {
         @UInt32
@@ -74,6 +85,7 @@
     private class StatisticsGetRequest extends StatisticsRequest implements 
Cancelable {
         @Override
         public void cancel() {
+            requests.remove(this);
         }
 
         @Override
@@ -118,22 +130,29 @@
 
     private class StatisticsPutRequest extends StatisticsRequest implements 
Cancelable {
         public long value;
+        public int flags;
 
         @Override
         public void cancel() {
+            requests.remove(this);
         }
 
         @Override
         public void transmit(Connection.MessageSink sink) {
+            SetMessage sm = new SetMessage();
+            sm.statisticName = name;
+            sm.subsystemName = subsystem;
+            sm.value = value;
+            sm.flags = flags;
+            sink.send(sm);
         }
 
         @Override
         public void handleError() {
+            throw new RuntimeException("unexpected transmit error");
         }
     }
 
-    private LinkedList<StatisticsRequest> requests = new 
LinkedList<StatisticsRequest>();
-
     public Statistics(Configuration cfg) {
         client = new Client("statistics", cfg);
     }
@@ -171,8 +190,22 @@
         return getRequest;
     }
 
+    /**
+     * todo: should this be abstracted into a common request queue?
+     */
     private void handleNextRequest() {
+        if (currentTransmit != null) {
+            return;
+        }
+        StatisticsRequest request = requests.poll();
+        if (request == null) {
+            if (disconnectRequested) {
+                client.disconnect();
+            }
+            return;
+        }
 
+        currentTransmit = 
client.notifyTransmitReady(request.deadline.getRemaining(), true, 0, request);
     }
 
     public Cancelable get(RelativeTime timeout, StatisticsReceiver srh) {
@@ -190,49 +223,38 @@
      */
     public Cancelable set(final String subsystem, final String name, final 
long value, boolean persist) {
         StatisticsPutRequest putRequest = new StatisticsPutRequest();
-        putRequest.deadline =
+        putRequest.deadline = PUT_TIMEOUT.toAbsolute();
         putRequest.subsystem = subsystem;
         putRequest.name = name;
         putRequest.value = value;
+        putRequest.flags = persist ? SETFLAG_PERSIST : 0;
 
-        client.notifyTransmitReady(timeout, true, 0, new MessageTransmitter() {
+        requests.add(putRequest);
 
-            @Override
-            public void transmit(Connection.MessageSink sink) {
-                if (sink == null) {
-                    cb.onTimeout();
-                }
-                SetMessage sm = new SetMessage();
-                sm.statisticName = name;
-                sm.subsystemName = subsystem;
-                sm.value = value;
+        handleNextRequest();
 
-                sink.send(sm);
-
-                cb.onCompleted();
-            }
-
-            @Override
-            public void handleError() {
-                throw new RuntimeException("unexpected transmit error");
-            }
-        });
-        return new Cancelable() {
-            @Override
-            public void cancel() {
-                throw new UnsupportedOperationException("not yet implementd");
-            }
-        };
+        return putRequest;
     }
 
-
     /**
      * Destroy handle to the statistics service.
      *
      * @param putPending wait until all pending put requests have been 
submitted or timed out
      */
     public void destroy(boolean putPending) {
-        client.disconnect();
+        if (putPending) {
+            // throw away all get requests!
+            Iterator<StatisticsRequest> it = requests.listIterator();
+            while (it.hasNext()) {
+                StatisticsRequest r = it.next();
+                if (r instanceof StatisticsGetRequest) {
+                    it.remove();
+                }
+            }
+            disconnectRequested = true;
+        } else {
+            client.disconnect();
+        }
     }
 
     public static void main(String[] args) {
@@ -257,7 +279,7 @@
             String subsystemName = "";
 
             public void run() {
-                Statistics statistics = new Statistics(cfg);
+                final Statistics statistics = new Statistics(cfg);
                 if (test) {
                     if (subsystemName.isEmpty() || statisticsName.isEmpty()) {
                         System.err.println("must specify non-empty subsystem 
and name");
@@ -274,21 +296,10 @@
                         System.err.println("invalid value (not a long)");
                         return;
                     }
-                    statistics.set(RelativeTime.SECOND, subsystemName, 
statisticsName, value, new SetCompleted() {
-                        @Override
-                        public void onCompleted() {
-                            System.out.println("done.");
-                        }
-
-                        @Override
-                        public void onTimeout() {
-                            System.out.println("timeout while setting");
-                        }
-                    });
+                    statistics.set(subsystemName, statisticsName, value, 
false);
+                    statistics.destroy(true);
                 } else {
-                    if (unprocessedArgs.length != 0) {
-                        System.err.println("dumping statistics does not take 
any positional parameters");
-                    } else {
+                    if (unprocessedArgs.length == 0) {
                         statistics.get(RelativeTime.SECOND, subsystemName, 
statisticsName, new StatisticsReceiver() {
                             @Override
                             public void onReceive(String subsystem, String 
name, long value) {
@@ -298,13 +309,17 @@
                             @Override
                             public void onTimeout() {
                                 logger.error("timeout while getting 
statistics");
+                                statistics.destroy(false);
                             }
 
                             @Override
                             public void onDone() {
                                 logger.info("done getting statistics");
+                                statistics.destroy(false);
                             }
                         });
+                    } else {
+                        System.err.println("dumping statistics does not take 
any positional parameters");
                     }
                 }
             }




reply via email to

[Prev in Thread] Current Thread [Next in Thread]