mldonkey-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Mldonkey-commits] Changes to mldonkey/src/utils/net/tcpBufferedSocket.m


From: mldonkey-commits
Subject: [Mldonkey-commits] Changes to mldonkey/src/utils/net/tcpBufferedSocket.ml
Date: Thu, 14 Jul 2005 11:34:23 -0400

Index: mldonkey/src/utils/net/tcpBufferedSocket.ml
diff -u mldonkey/src/utils/net/tcpBufferedSocket.ml:1.27 
mldonkey/src/utils/net/tcpBufferedSocket.ml:1.28
--- mldonkey/src/utils/net/tcpBufferedSocket.ml:1.27    Thu Jul 14 14:02:28 2005
+++ mldonkey/src/utils/net/tcpBufferedSocket.ml Thu Jul 14 15:34:15 2005
@@ -21,7 +21,7 @@
 open Printf2
 open BasicSocket
 
-  
+
 let latencies = Hashtbl.create 2131
 
 let max_opened_connections = ref (fun () -> maxi 20 (MlUnix.max_sockets - 50))
@@ -76,8 +76,8 @@
     mutable buf : string;
     mutable pos : int;
     mutable len : int;
-    mutable max_buf_size:int;
-    mutable min_buf_size: int
+    mutable max_buf_size : int;
+    mutable min_buf_size : int
   }
 
 type t = {
@@ -107,7 +107,7 @@
     mutable connecting : bool;
     mutable host : Ip.t;
     mutable connect_time : float;
-    
+
     mutable token : token;
 
     mutable compression : (
@@ -132,7 +132,7 @@
     mutable moved_bytes : int64;
     mutable lost_bytes : int array; (* 3600 samples*)
     mutable forecast_bytes : int;
-    
+
     mutable ndone_last_second : int;
   }
 
@@ -181,11 +181,11 @@
   let max_wanted = !max_opened_connections () in
   let current_connections = !opened_connections in
   let max_connections_per_second = !max_connections_per_second () in
-  
+
   let rec iter todo_managers done_managers =
 
 (*
-    lprintf "todo_managers %d done_managers %d\n" 
+    lprintf "todo_managers %d done_managers %d\n"
 (List.length todo_managers) (List.length done_managers);
   *)
     match todo_managers with
@@ -231,22 +231,21 @@
 
 let reset_connection_scheduler _ =
   if !verbose_bandwidth > 0 then begin
-      lprintf "[BW1 %6d] Connections opened this second : %d/%d total %d/%d\n" 
+      lprintf "[BW1 %6d] Connections opened this second : %d/%d total %d/%d\n"
       (last_time ()) !opened_connections_this_second 
(!max_connections_per_second ()) !opened_connections (!max_opened_connections 
());
-          
     end;
   List.iter (fun cm ->
       if !verbose_bandwidth > 0 then begin
           if cm.nconnections_last_second > 0 then
             lprintf "[BW1 %6d]    %s opened %d connections last second\n"
               (last_time ()) cm.cm_name cm.nconnections_last_second;
-          if cm.nwaiting_connections > 0 then 
+          if cm.nwaiting_connections > 0 then
             lprintf "[BW1 %6d]    %s still waits for %d connections\n"
               (last_time ()) cm.cm_name cm.nwaiting_connections;
-       end;    
+       end;
       cm.nconnections_last_second <- 0;
   ) !connection_managers;
-  
+
   opened_connections_this_second := 0
 
 let use_token token fd =
@@ -273,36 +272,36 @@
     let delay = int_of_float delayf in
     let delay = if delay > 65000 then 65000 else delay in
 (*
-lprintf "add_connect_latency %s -> %d (%f)\n" 
+lprintf "add_connect_latency %s -> %d (%f)\n"
 (Ip.to_string ip) delay delayf;
   *)
     try
       let latency, samples = Hashtbl.find latencies ip in
       incr samples;
       if !latency > delay then latency := delay
-    with _ -> 
+    with _ ->
         Hashtbl.add latencies ip (ref delay, ref 1)
-        
+
 let forecast_bytes t nbytes =
   match t with
     None -> ()
-  | Some bc ->       
+  | Some bc ->
       let nip_packets = 1 + nbytes / !mtu_packet_size in
       let nbytes = nbytes + nip_packets * !ip_packet_size in
       let nframes = 1 + nbytes / packet_frame_size in
-      bc.forecast_bytes <- bc.forecast_bytes + 
+      bc.forecast_bytes <- bc.forecast_bytes +
         (nframes * packet_frame_size)
-  
+
 let register_bytes t nbytes =
   match t with
     None -> ()
-  | Some bc ->       
+  | Some bc ->
       let nip_packets = 1 + nbytes / !mtu_packet_size in
       let nbytes = nbytes + nip_packets * !ip_packet_size in
       let nframes = 1 + nbytes / packet_frame_size in
-      bc.remaining_bytes <- bc.remaining_bytes - 
+      bc.remaining_bytes <- bc.remaining_bytes -
       (nframes * packet_frame_size)
-  
+
 let forecast_download t n =
   forecast_bytes t.read_control n
 
@@ -334,10 +333,10 @@
 (*************************************************************************)
 
 let copy_read_buffer = ref true
-  
+
 let big_buffer_len = 65536
 let big_buffer = String.create big_buffer_len
-  
+
 let min_buffer_read = 2000
 let min_read_size = min_buffer_read - 100
 
@@ -459,7 +458,7 @@
 let nread t = t.nread
 let nwritten t = t.nwrite
 let can_write t =  t.wbuf.len = 0
-let can_write_len t len = 
+let can_write_len t len =
   let b = t.wbuf.max_buf_size > t.wbuf.len + len in
 (*  if not b then
     lprintf "can_write_len failed: %d < %d + %d\n"
@@ -468,7 +467,7 @@
 let not_buffer_more t max =  t.wbuf.len < max
 let can_fill t = t.wbuf.len < (t.wbuf.max_buf_size / 2)
 let get_rtimeout t = get_rtimeout t.sock_in
-let max_refill t = t.wbuf.max_buf_size - t.wbuf.len 
+let max_refill t = t.wbuf.max_buf_size - t.wbuf.len
 
 let close t s =
 (*
@@ -507,7 +506,7 @@
       lprintf "shutdown\n";
 end;
   *)
-  (try 
+  (try
       BasicSocket.shutdown t.sock_out s;
       if t.sock_in != t.sock_out then
         BasicSocket.shutdown t.sock_in s;
@@ -543,11 +542,11 @@
 (*       lprintf "WRITE [%s]\n" (String.escaped (String.sub s pos1 len)); *)
           let nw = MlUnix.write fd s pos1 len in
           if !verbose_bandwidth > 1 then begin
-              lprintf "[BW2 %6d] immediate write %d/%d on %s:%d\n" (last_time 
()) 
-              nw len 
+              lprintf "[BW2 %6d] immediate write %d/%d on %s:%d\n" (last_time 
())
+              nw len
                 t.name (sock_num t.sock_out);
             end;
-          
+
           register_upload t len;
           forecast_download t 0;
 
@@ -561,12 +560,12 @@
                 bc.moved_bytes <-
                   Int64.add bc.moved_bytes (Int64.of_int nw));
           if t.nwrite = 0 then begin
-(*              if t.connecting then 
+(*              if t.connecting then
                 lprintf "WRITE BEFORE CONNECTION.......\n";
               lprintf "add_connect_latency at %f\n" (current_time ()); *)
               add_connect_latency t.host t.connect_time;
             end;
-          
+
           t.nwrite <- t.nwrite + nw;
           if nw = 0 then (close t Closed_by_peer; pos2) else
             pos1 + nw
@@ -608,7 +607,7 @@
       big_buffer, 0, big_buffer_len
     else
     let can_write_in_buffer =
-      if b.buf = "" then 
+      if b.buf = "" then
         if b.min_buf_size <= min_buffer_read then begin
             b.buf <- new_string ();
             min_buffer_read
@@ -674,14 +673,14 @@
         lprintf "READ LIMITED BY BW CONTROL: %d\n" nread;
       end;
 *)
-    
+
     if !verbose_bandwidth > 1 then begin
-        lprintf "[BW2 %6d] %sread %d/%d/%d on %s:%d\n" (last_time ()) 
-        (if old_len > 0 then "completing " else "") nread can_read max_len 
+        lprintf "[BW2 %6d] %sread %d/%d/%d on %s:%d\n" (last_time ())
+        (if old_len > 0 then "completing " else "") nread can_read max_len
           t.name (sock_num t.sock_in);
       end;
-    
-    
+
+
     if !copy_read_buffer then begin
 (*        lprintf "Copying %d bytes\n" nread; *)
         buf_add t b big_buffer 0 nread
@@ -698,7 +697,7 @@
         lprintf "    given by buffer: %d\n" buffer_len;
     end;
 *)
-    
+
     tcp_downloaded_bytes := Int64.add !tcp_downloaded_bytes (Int64.of_int 
nread);
     (match t.read_control with
         None -> () | Some bc ->
@@ -714,7 +713,6 @@
       close t Closed_by_peer;
     end else begin
 
-      
       try
 (*              if t.monitored then
    (lprintf "event handler READ DONE\n"; ); *)
@@ -747,14 +745,14 @@
           try
 (*     lprintf "try write %d/%d\n" max_len t.wbuf.len; *)
             let fd = fd sock in
-(*            lprintf "WRITE [%s]\n" (String.escaped 
+(*            lprintf "WRITE [%s]\n" (String.escaped
               (String.sub b.buf b.pos max_len)); *)
             let nw = MlUnix.write fd b.buf b.pos max_len in
             if !verbose_bandwidth > 1 then begin
-                lprintf "[BW2 %6d] postponed %swrite %d/%d/%d on %s:%d\n" 
(last_time ()) (if max_len < b.len then "partial " else "") nw max_len b.len 
+                lprintf "[BW2 %6d] postponed %swrite %d/%d/%d on %s:%d\n" 
(last_time ()) (if max_len < b.len then "partial " else "") nw max_len b.len
                   t.name (sock_num t.sock_out);
               end;
-            
+
 (*            if t.monitored then
 (lprintf "written %d\n" nw; ); *)
         tcp_uploaded_bytes := Int64.add !tcp_uploaded_bytes (Int64.of_int nw);
@@ -819,10 +817,10 @@
               end
           end
   end
-  
+
 let get_latencies verbose =
   let b = Buffer.create 300 in
-  let counter = ref 0 in  
+  let counter = ref 0 in
   Hashtbl.iter (fun ip (latency, samples) ->
       incr counter;
   ) latencies;
@@ -835,7 +833,7 @@
   ) latencies;
   Hashtbl.clear latencies;
   Buffer.contents b
-  
+
 let tcp_handler t event =
   match event with
   | CAN_READ ->
@@ -862,7 +860,7 @@
         if t.nwrite = 0 && t.noproxy && t.connecting then  begin
             t.connecting <- false;
             t.event_handler t CONNECTED;
-            t.nwrite = 0 
+            t.nwrite = 0
           end else true
       in
       if can_write then tcp_handler_write t t.sock_out
@@ -931,10 +929,10 @@
       bc.remaining_bytes_user bc.total_bytes bc.remaining_bytes;
       bc.remaining_bytes <- bc.total_bytes - bc.forecast_bytes;
       if !verbose_bandwidth > 0 && bc.ndone_last_second > 0 then
-        lprintf "[BW1 %6d] %s read %d/%d last second\n" (last_time ()) 
+        lprintf "[BW1 %6d] %s read %d/%d last second\n" (last_time ())
           bc.bc_name bc.ndone_last_second bc.total_bytes;
       if !verbose_bandwidth > 0 && bc.forecast_bytes > 0 then
-        lprintf "[BW1 %6d] %s forecast read %d bytes for next second\n" 
(last_time ()) 
+        lprintf "[BW1 %6d] %s forecast read %d bytes for next second\n" 
(last_time ())
           bc.bc_name bc.forecast_bytes;
       bc.forecast_bytes <- 0;
       bc.ndone_last_second <- 0;
@@ -945,10 +943,10 @@
       bc.remaining_bytes_user bc.total_bytes bc.remaining_bytes;
       bc.remaining_bytes <- bc.total_bytes - bc.forecast_bytes;
       if !verbose_bandwidth > 0 && bc.ndone_last_second > 0 then
-        lprintf "[BW1 %6d] %s wrote %d/%d last second\n" (last_time ()) 
+        lprintf "[BW1 %6d] %s wrote %d/%d last second\n" (last_time ())
           bc.bc_name bc.ndone_last_second bc.total_bytes;
       if !verbose_bandwidth > 0 && bc.forecast_bytes > 0 then
-        lprintf "[BW1 %6d] %s forecast write %d bytes for next second\n" 
(last_time ()) 
+        lprintf "[BW1 %6d] %s forecast write %d bytes for next second\n" 
(last_time ())
           bc.bc_name bc.forecast_bytes;
       bc.forecast_bytes <- 0;
       bc.ndone_last_second <- 0;
@@ -1027,7 +1025,7 @@
     set_handler t WRITE_DONE (fun t ->
 (*      lprintf "close_after_write: CLOSE\n";  - log output removed  *)
         shutdown t Closed_by_user)
-    
+
 exception Http_proxy_error of string
 let http_proxy = ref None
 
@@ -1051,7 +1049,7 @@
               let rstr_pos = 12 (*String.index_from b.buf (rcode_pos+1) ' '*) 
in
               let rstr_end = String.index_from b.buf (rstr_pos+1) '\n' in
               let rstr = String.sub b.buf (rstr_pos+1) (rstr_end-rstr_pos-1) in
-              lprintf "From proxy for %s: %s %s\n" 
+              lprintf "From proxy for %s: %s %s\n"
                 (Ip.to_string sock.host) rcode rstr;
               rcode, rstr, rstr_end
             with _ ->
@@ -1102,7 +1100,7 @@
 
 let set_connected t f =
   set_handler t CONNECTED f
-    
+
 (*************************************************************************)
 (*                                                                       *)
 (*                         Socket Configuration                          *)
@@ -1123,7 +1121,7 @@
 
 let set_monitored t b = t.monitored <- b
 let monitored t = t.monitored
-  
+
 let set_rtimeout s t = set_rtimeout s.sock_in t
 let set_wtimeout s t = set_wtimeout s.sock_out t
 
@@ -1240,15 +1238,15 @@
       connect_time = 0.;
       compression = None;
     } in
-  
+
   let fname = (fun _ ->
         Printf.sprintf "%s (nread: %d nwritten: %d) [U %s,D %s]" name
           t.nread t.nwrite (string_of_bool (t.read_control <> None))
         (string_of_bool (t.write_control <> None));
         ;
     ) in
-  
-  
+
+
   let sock_in = BasicSocket.create name fd_in (fun _ event ->
         tcp_handler t event) in
   if !debug then begin
@@ -1261,7 +1259,7 @@
         tcp_handler t event) in
   set_printer sock_out fname;
   set_dump_info sock_out (dump_socket t);
-  
+
   t.sock_in <- sock_in;
   t.sock_out <- sock_out;
   t
@@ -1336,7 +1334,7 @@
 (* connect to proxy in blocking mode, so we sure, connections established when 
we send CONNECT *)
         lprintf "via proxy\n";
         Unix.connect s (Unix.ADDR_INET(Ip.to_inet_addr proxy_ip, proxy_port));
-        
+
         let buf = Buffer.create 200 in
         let dotted_host = Unix.string_of_inet_addr host in
         Printf.bprintf buf "CONNECT %s:%d HTTP/1.1\n" dotted_host port;
@@ -1351,14 +1349,14 @@
         ()
       end;
     let t = create token name s handler in
-    
+
     if !verbose_bandwidth > 1 then begin
         lprintf "[BW2 %6d] connect on %s:%d\n" (last_time ()) t.name (sock_num 
t.sock_out);
       end;
-    
-    token.connection_manager.nconnections_last_second <- 
+
+    token.connection_manager.nconnections_last_second <-
       token.connection_manager.nconnections_last_second + 1;
-    
+
     must_write t.sock_out true;
     try
       t.host <- ip;
@@ -1368,7 +1366,7 @@
           t.noproxy <- false;
         end else
         Unix.connect s (Unix.ADDR_INET(host,port));
-      
+
       t.connecting <- true;
       register_upload t 0;    (* The TCP SYN packet *)
       forecast_download t 0;  (* The TCP ACK packet *)
@@ -1392,7 +1390,7 @@
       lprintf "EXCEPTION %s  before connect to host %s:%d\n"
           (Printexc2.to_string e) (Unix.string_of_inet_addr host) port;
       raise e
-      
+
 
 
 (*************************************************************************)
@@ -1421,13 +1419,13 @@
       match Unix.getpeername fd with
         Unix.ADDR_INET (ip, port) ->
           let ip = Ip.of_inet_addr ip in
-          t.peer_addr <- Some (ip, port); 
+          t.peer_addr <- Some (ip, port);
           ip, port
   | _ -> raise Not_found
 
 let peer_ip t = fst (peer_addr t)
 let peer_port t = snd (peer_addr t)
-          
+
           (*
 let host t =
   let fd = fd t.sock_out in
@@ -1510,7 +1508,7 @@
 
 let set_max_output_buffer  t len =
   t.wbuf.max_buf_size <- len
-  
+
 (*************************************************************************)
 (*                                                                       *)
 (*                         Compression                                   *)
@@ -1618,7 +1616,7 @@
       reset_connection_scheduler ();
       deflate_timer ();
   );
-  
+
   set_before_select_hook (fun _ ->
       schedule_connections ();
       List.iter (fun bc ->
@@ -1636,17 +1634,17 @@
   );
 
 (*
-Some ideas: 
+Some ideas:
 
 * sort the connections so that we try to read and write as
   much as possible to/from connections on which we already have read/written
-  a lot. 
+  a lot.
 * postpone reads and writes for a few seconds so that we have more to
   read/write.
 
   dhcppc3:~#  ping computer.domain.org -f -s <packet_size> -c 1000
  where computer.domain.org is a nearby computer on a 100 Mbs link.
-  
+
 On my cable link, making <packet_size> vary from 1 to 2000 shows:
   * packets > 1450 are 100% lost
   * 300 > packets > 1 are 5% lost but exactly the same time (14.5 s)
@@ -1660,26 +1658,26 @@
 , pipe 6, ipg/ewma 14.205/0.000 ms
 
   *)
-  
-  
+
+
   set_after_select_hook (fun _ ->
       List.iter (fun bc ->
           if bc.remaining_bytes > 0 then begin
-              
+
               bc.connections <- List.sort (fun t1 t2 ->
                   let w1 = t1.nread in
                   let w2 = t2.nread in
                   compare w1 w2
               ) bc.connections;
-              
+
               if !verbose_bandwidth > 2 && bc.nconnections > 0 then begin
                   lprintf "[BW3 %6d] %d read-waiting connections for %d 
allowed\n" (last_time ()) bc.nconnections bc.remaining_bytes;
                   List.iter (fun t ->
                       lprintf "[BW3 %6d]   %s:%d [buffered %d]\n" (last_time 
()) t.name (sock_num t.sock_in) ((buf t).len);
                   ) bc.connections;
-                end;              
-                            
-              
+                end;
+
+
               List.iter (fun t ->
                   if bc.remaining_bytes > 0 then
                     let nconnections = maxi bc.nconnections 1 in
@@ -1689,7 +1687,7 @@
                     (try
 (*                    lprintf "allow to read %d\n" can_read; *)
                         can_read_handler t t.sock_in can_read
-                      with e -> 
+                      with e ->
 (*                          lprintf "Exception %s in can_read_handler %s:%d\n"
                             (Printexc2.to_string e)
                           t.name (sock_num t.sock_in) *) ()
@@ -1699,35 +1697,35 @@
                     bc.nconnections <- bc.nconnections - t.read_power
                   else
                   if !verbose_bandwidth > 2 then begin
-                      lprintf "[BW3 %6d]    %s:%d could not read\n" (last_time 
()) 
-                        t.name (sock_num t.sock_out)                   
+                      lprintf "[BW3 %6d]    %s:%d could not read\n" (last_time 
())
+                        t.name (sock_num t.sock_out)
                     end
               ) bc.connections;
 (*              if bc.remaining_bytes > 0 then bc.allow_io := false; *)
             end;
-          
+
           if !verbose_bandwidth > 2 then begin
-              
-              if bc.remaining_bytes > 0 then 
+
+              if bc.remaining_bytes > 0 then
                 lprintf "[BW3 %6d] still %d bytes to read\n" (last_time ()) 
bc.remaining_bytes;
-              
+
               if bc.nconnections > 0 then
                 lprintf "[BW3 %6d] still %d read-waiting connections after 
loop\n" (last_time ()) bc.nconnections;
             end;
-          
-          
+
+
           bc.connections <- [];
           bc.nconnections <- 0;
       ) !read_bandwidth_controlers;
       List.iter (fun bc ->
           if bc.remaining_bytes > 0 then begin
-              
+
               bc.connections <- List.sort (fun t1 t2 ->
                   let w1 = remaining_to_write t1 in
                   let w2 = remaining_to_write t2 in
                   compare w1 w2
               ) bc.connections;
-              
+
               if !verbose_bandwidth > 2 && bc.nconnections > 0 then begin
                   lprintf "[BW3 %6d] %d write-waiting connections for %d 
allowed\n" (last_time ()) bc.nconnections bc.remaining_bytes;
                   List.iter (fun t ->
@@ -1750,24 +1748,24 @@
                     bc.nconnections <- bc.nconnections - t.write_power;
                   else
                   if !verbose_bandwidth > 2 then begin
-                      lprintf "[BW3 %6d]    %s:%d could not write buffered %d 
bytes\n" (last_time ()) 
+                      lprintf "[BW3 %6d]    %s:%d could not write buffered %d 
bytes\n" (last_time ())
                         t.name (sock_num t.sock_out) (remaining_to_write t)
                     end
               ) bc.connections;
 (*          if bc.remaining_bytes > 0 then bc.allow_io := false; *)
             end;
           if !verbose_bandwidth > 2 then begin
-              
+
               if bc.remaining_bytes > 0 then begin
                   lprintf "[BW3 %6d] still %d bytes to write\n" (last_time ()) 
bc.remaining_bytes;
                   List.iter (fun t ->
                       let len = remaining_to_write t in
                       if len > 0 then
-                        lprintf "[BW3 %6d]    %s:%d could write %d\n" 
(last_time ()) 
+                        lprintf "[BW3 %6d]    %s:%d could write %d\n" 
(last_time ())
                           t.name (sock_num t.sock_out) len
                   ) bc.connections
                 end;
-              
+
               if bc.nconnections > 0 then
                 lprintf "[BW3 %6d] still %d write-waiting connections after 
loop\n" (last_time ()) bc.nconnections;
             end;
@@ -1775,11 +1773,11 @@
           bc.nconnections <- 0;
       ) !write_bandwidth_controlers
   )
-  
+
 let prevent_close t =
-  prevent_close t.sock_in;  
+  prevent_close t.sock_in;
   prevent_close t.sock_out
-  
+
 let must_write t bool = must_write t.sock_out bool
 let output_buffered t = t.wbuf.len
 
@@ -1808,7 +1806,7 @@
               end;
       ) fields_titles fields_values
   | _ -> assert false
-  
+
 let load_stats filename =
   try
     let ic = open_in filename in
@@ -1819,32 +1817,32 @@
       let values = input_line ic in
       join_stats titles values;
       iter_first ic
-    
+
     in
     try
       iter_first ic
-    with 
+    with
       End_of_file -> close_in ic
-    | e -> 
+    | e ->
         lprintf "[BWS] Error %s reading %s\n" (Printexc2.to_string e) filename;
         close_in ic; proc_net_fs := false
-  with 
-  | e -> 
+  with
+  | e ->
       lprintf "[BWS] Error %s opening %s\n" (Printexc2.to_string e) filename;
       proc_net_fs := false
-      
+
 let proc_net_timer _ =
   if !proc_net_fs && !verbose_bandwidth > 0 then begin
       load_stats "/proc/net/netstat";
       load_stats "/proc/net/snmp";
     end
-    
-  
+
+
 let _ =
   Heap.add_memstat "tcpBufferedSocket" (fun level buf ->
       Printf.bprintf buf "  %d latencies\n" (Hashtbl.length latencies);
       Printf.bprintf buf "  %d entries in net_stats\n" (Hashtbl.length 
net_stats);
   );
-  add_infinite_timer 1.0 proc_net_timer 
+  add_infinite_timer 1.0 proc_net_timer
 
 let packet_frame_size = ref 250




reply via email to

[Prev in Thread] Current Thread [Next in Thread]