[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Mldonkey-commits] Changes to mldonkey/src/utils/net/tcpBufferedSocket.m
From: |
mldonkey-commits |
Subject: |
[Mldonkey-commits] Changes to mldonkey/src/utils/net/tcpBufferedSocket.ml |
Date: |
Thu, 14 Jul 2005 11:34:23 -0400 |
Index: mldonkey/src/utils/net/tcpBufferedSocket.ml
diff -u mldonkey/src/utils/net/tcpBufferedSocket.ml:1.27
mldonkey/src/utils/net/tcpBufferedSocket.ml:1.28
--- mldonkey/src/utils/net/tcpBufferedSocket.ml:1.27 Thu Jul 14 14:02:28 2005
+++ mldonkey/src/utils/net/tcpBufferedSocket.ml Thu Jul 14 15:34:15 2005
@@ -21,7 +21,7 @@
open Printf2
open BasicSocket
-
+
let latencies = Hashtbl.create 2131
let max_opened_connections = ref (fun () -> maxi 20 (MlUnix.max_sockets - 50))
@@ -76,8 +76,8 @@
mutable buf : string;
mutable pos : int;
mutable len : int;
- mutable max_buf_size:int;
- mutable min_buf_size: int
+ mutable max_buf_size : int;
+ mutable min_buf_size : int
}
type t = {
@@ -107,7 +107,7 @@
mutable connecting : bool;
mutable host : Ip.t;
mutable connect_time : float;
-
+
mutable token : token;
mutable compression : (
@@ -132,7 +132,7 @@
mutable moved_bytes : int64;
mutable lost_bytes : int array; (* 3600 samples*)
mutable forecast_bytes : int;
-
+
mutable ndone_last_second : int;
}
@@ -181,11 +181,11 @@
let max_wanted = !max_opened_connections () in
let current_connections = !opened_connections in
let max_connections_per_second = !max_connections_per_second () in
-
+
let rec iter todo_managers done_managers =
(*
- lprintf "todo_managers %d done_managers %d\n"
+ lprintf "todo_managers %d done_managers %d\n"
(List.length todo_managers) (List.length done_managers);
*)
match todo_managers with
@@ -231,22 +231,21 @@
let reset_connection_scheduler _ =
if !verbose_bandwidth > 0 then begin
- lprintf "[BW1 %6d] Connections opened this second : %d/%d total %d/%d\n"
+ lprintf "[BW1 %6d] Connections opened this second : %d/%d total %d/%d\n"
(last_time ()) !opened_connections_this_second
(!max_connections_per_second ()) !opened_connections (!max_opened_connections
());
-
end;
List.iter (fun cm ->
if !verbose_bandwidth > 0 then begin
if cm.nconnections_last_second > 0 then
lprintf "[BW1 %6d] %s opened %d connections last second\n"
(last_time ()) cm.cm_name cm.nconnections_last_second;
- if cm.nwaiting_connections > 0 then
+ if cm.nwaiting_connections > 0 then
lprintf "[BW1 %6d] %s still waits for %d connections\n"
(last_time ()) cm.cm_name cm.nwaiting_connections;
- end;
+ end;
cm.nconnections_last_second <- 0;
) !connection_managers;
-
+
opened_connections_this_second := 0
let use_token token fd =
@@ -273,36 +272,36 @@
let delay = int_of_float delayf in
let delay = if delay > 65000 then 65000 else delay in
(*
-lprintf "add_connect_latency %s -> %d (%f)\n"
+lprintf "add_connect_latency %s -> %d (%f)\n"
(Ip.to_string ip) delay delayf;
*)
try
let latency, samples = Hashtbl.find latencies ip in
incr samples;
if !latency > delay then latency := delay
- with _ ->
+ with _ ->
Hashtbl.add latencies ip (ref delay, ref 1)
-
+
let forecast_bytes t nbytes =
match t with
None -> ()
- | Some bc ->
+ | Some bc ->
let nip_packets = 1 + nbytes / !mtu_packet_size in
let nbytes = nbytes + nip_packets * !ip_packet_size in
let nframes = 1 + nbytes / packet_frame_size in
- bc.forecast_bytes <- bc.forecast_bytes +
+ bc.forecast_bytes <- bc.forecast_bytes +
(nframes * packet_frame_size)
-
+
let register_bytes t nbytes =
match t with
None -> ()
- | Some bc ->
+ | Some bc ->
let nip_packets = 1 + nbytes / !mtu_packet_size in
let nbytes = nbytes + nip_packets * !ip_packet_size in
let nframes = 1 + nbytes / packet_frame_size in
- bc.remaining_bytes <- bc.remaining_bytes -
+ bc.remaining_bytes <- bc.remaining_bytes -
(nframes * packet_frame_size)
-
+
let forecast_download t n =
forecast_bytes t.read_control n
@@ -334,10 +333,10 @@
(*************************************************************************)
let copy_read_buffer = ref true
-
+
let big_buffer_len = 65536
let big_buffer = String.create big_buffer_len
-
+
let min_buffer_read = 2000
let min_read_size = min_buffer_read - 100
@@ -459,7 +458,7 @@
let nread t = t.nread
let nwritten t = t.nwrite
let can_write t = t.wbuf.len = 0
-let can_write_len t len =
+let can_write_len t len =
let b = t.wbuf.max_buf_size > t.wbuf.len + len in
(* if not b then
lprintf "can_write_len failed: %d < %d + %d\n"
@@ -468,7 +467,7 @@
let not_buffer_more t max = t.wbuf.len < max
let can_fill t = t.wbuf.len < (t.wbuf.max_buf_size / 2)
let get_rtimeout t = get_rtimeout t.sock_in
-let max_refill t = t.wbuf.max_buf_size - t.wbuf.len
+let max_refill t = t.wbuf.max_buf_size - t.wbuf.len
let close t s =
(*
@@ -507,7 +506,7 @@
lprintf "shutdown\n";
end;
*)
- (try
+ (try
BasicSocket.shutdown t.sock_out s;
if t.sock_in != t.sock_out then
BasicSocket.shutdown t.sock_in s;
@@ -543,11 +542,11 @@
(* lprintf "WRITE [%s]\n" (String.escaped (String.sub s pos1 len)); *)
let nw = MlUnix.write fd s pos1 len in
if !verbose_bandwidth > 1 then begin
- lprintf "[BW2 %6d] immediate write %d/%d on %s:%d\n" (last_time
())
- nw len
+ lprintf "[BW2 %6d] immediate write %d/%d on %s:%d\n" (last_time
())
+ nw len
t.name (sock_num t.sock_out);
end;
-
+
register_upload t len;
forecast_download t 0;
@@ -561,12 +560,12 @@
bc.moved_bytes <-
Int64.add bc.moved_bytes (Int64.of_int nw));
if t.nwrite = 0 then begin
-(* if t.connecting then
+(* if t.connecting then
lprintf "WRITE BEFORE CONNECTION.......\n";
lprintf "add_connect_latency at %f\n" (current_time ()); *)
add_connect_latency t.host t.connect_time;
end;
-
+
t.nwrite <- t.nwrite + nw;
if nw = 0 then (close t Closed_by_peer; pos2) else
pos1 + nw
@@ -608,7 +607,7 @@
big_buffer, 0, big_buffer_len
else
let can_write_in_buffer =
- if b.buf = "" then
+ if b.buf = "" then
if b.min_buf_size <= min_buffer_read then begin
b.buf <- new_string ();
min_buffer_read
@@ -674,14 +673,14 @@
lprintf "READ LIMITED BY BW CONTROL: %d\n" nread;
end;
*)
-
+
if !verbose_bandwidth > 1 then begin
- lprintf "[BW2 %6d] %sread %d/%d/%d on %s:%d\n" (last_time ())
- (if old_len > 0 then "completing " else "") nread can_read max_len
+ lprintf "[BW2 %6d] %sread %d/%d/%d on %s:%d\n" (last_time ())
+ (if old_len > 0 then "completing " else "") nread can_read max_len
t.name (sock_num t.sock_in);
end;
-
-
+
+
if !copy_read_buffer then begin
(* lprintf "Copying %d bytes\n" nread; *)
buf_add t b big_buffer 0 nread
@@ -698,7 +697,7 @@
lprintf " given by buffer: %d\n" buffer_len;
end;
*)
-
+
tcp_downloaded_bytes := Int64.add !tcp_downloaded_bytes (Int64.of_int
nread);
(match t.read_control with
None -> () | Some bc ->
@@ -714,7 +713,6 @@
close t Closed_by_peer;
end else begin
-
try
(* if t.monitored then
(lprintf "event handler READ DONE\n"; ); *)
@@ -747,14 +745,14 @@
try
(* lprintf "try write %d/%d\n" max_len t.wbuf.len; *)
let fd = fd sock in
-(* lprintf "WRITE [%s]\n" (String.escaped
+(* lprintf "WRITE [%s]\n" (String.escaped
(String.sub b.buf b.pos max_len)); *)
let nw = MlUnix.write fd b.buf b.pos max_len in
if !verbose_bandwidth > 1 then begin
- lprintf "[BW2 %6d] postponed %swrite %d/%d/%d on %s:%d\n"
(last_time ()) (if max_len < b.len then "partial " else "") nw max_len b.len
+ lprintf "[BW2 %6d] postponed %swrite %d/%d/%d on %s:%d\n"
(last_time ()) (if max_len < b.len then "partial " else "") nw max_len b.len
t.name (sock_num t.sock_out);
end;
-
+
(* if t.monitored then
(lprintf "written %d\n" nw; ); *)
tcp_uploaded_bytes := Int64.add !tcp_uploaded_bytes (Int64.of_int nw);
@@ -819,10 +817,10 @@
end
end
end
-
+
let get_latencies verbose =
let b = Buffer.create 300 in
- let counter = ref 0 in
+ let counter = ref 0 in
Hashtbl.iter (fun ip (latency, samples) ->
incr counter;
) latencies;
@@ -835,7 +833,7 @@
) latencies;
Hashtbl.clear latencies;
Buffer.contents b
-
+
let tcp_handler t event =
match event with
| CAN_READ ->
@@ -862,7 +860,7 @@
if t.nwrite = 0 && t.noproxy && t.connecting then begin
t.connecting <- false;
t.event_handler t CONNECTED;
- t.nwrite = 0
+ t.nwrite = 0
end else true
in
if can_write then tcp_handler_write t t.sock_out
@@ -931,10 +929,10 @@
bc.remaining_bytes_user bc.total_bytes bc.remaining_bytes;
bc.remaining_bytes <- bc.total_bytes - bc.forecast_bytes;
if !verbose_bandwidth > 0 && bc.ndone_last_second > 0 then
- lprintf "[BW1 %6d] %s read %d/%d last second\n" (last_time ())
+ lprintf "[BW1 %6d] %s read %d/%d last second\n" (last_time ())
bc.bc_name bc.ndone_last_second bc.total_bytes;
if !verbose_bandwidth > 0 && bc.forecast_bytes > 0 then
- lprintf "[BW1 %6d] %s forecast read %d bytes for next second\n"
(last_time ())
+ lprintf "[BW1 %6d] %s forecast read %d bytes for next second\n"
(last_time ())
bc.bc_name bc.forecast_bytes;
bc.forecast_bytes <- 0;
bc.ndone_last_second <- 0;
@@ -945,10 +943,10 @@
bc.remaining_bytes_user bc.total_bytes bc.remaining_bytes;
bc.remaining_bytes <- bc.total_bytes - bc.forecast_bytes;
if !verbose_bandwidth > 0 && bc.ndone_last_second > 0 then
- lprintf "[BW1 %6d] %s wrote %d/%d last second\n" (last_time ())
+ lprintf "[BW1 %6d] %s wrote %d/%d last second\n" (last_time ())
bc.bc_name bc.ndone_last_second bc.total_bytes;
if !verbose_bandwidth > 0 && bc.forecast_bytes > 0 then
- lprintf "[BW1 %6d] %s forecast write %d bytes for next second\n"
(last_time ())
+ lprintf "[BW1 %6d] %s forecast write %d bytes for next second\n"
(last_time ())
bc.bc_name bc.forecast_bytes;
bc.forecast_bytes <- 0;
bc.ndone_last_second <- 0;
@@ -1027,7 +1025,7 @@
set_handler t WRITE_DONE (fun t ->
(* lprintf "close_after_write: CLOSE\n"; - log output removed *)
shutdown t Closed_by_user)
-
+
exception Http_proxy_error of string
let http_proxy = ref None
@@ -1051,7 +1049,7 @@
let rstr_pos = 12 (*String.index_from b.buf (rcode_pos+1) ' '*)
in
let rstr_end = String.index_from b.buf (rstr_pos+1) '\n' in
let rstr = String.sub b.buf (rstr_pos+1) (rstr_end-rstr_pos-1) in
- lprintf "From proxy for %s: %s %s\n"
+ lprintf "From proxy for %s: %s %s\n"
(Ip.to_string sock.host) rcode rstr;
rcode, rstr, rstr_end
with _ ->
@@ -1102,7 +1100,7 @@
let set_connected t f =
set_handler t CONNECTED f
-
+
(*************************************************************************)
(* *)
(* Socket Configuration *)
@@ -1123,7 +1121,7 @@
let set_monitored t b = t.monitored <- b
let monitored t = t.monitored
-
+
let set_rtimeout s t = set_rtimeout s.sock_in t
let set_wtimeout s t = set_wtimeout s.sock_out t
@@ -1240,15 +1238,15 @@
connect_time = 0.;
compression = None;
} in
-
+
let fname = (fun _ ->
Printf.sprintf "%s (nread: %d nwritten: %d) [U %s,D %s]" name
t.nread t.nwrite (string_of_bool (t.read_control <> None))
(string_of_bool (t.write_control <> None));
;
) in
-
-
+
+
let sock_in = BasicSocket.create name fd_in (fun _ event ->
tcp_handler t event) in
if !debug then begin
@@ -1261,7 +1259,7 @@
tcp_handler t event) in
set_printer sock_out fname;
set_dump_info sock_out (dump_socket t);
-
+
t.sock_in <- sock_in;
t.sock_out <- sock_out;
t
@@ -1336,7 +1334,7 @@
(* connect to proxy in blocking mode, so we sure, connections established when
we send CONNECT *)
lprintf "via proxy\n";
Unix.connect s (Unix.ADDR_INET(Ip.to_inet_addr proxy_ip, proxy_port));
-
+
let buf = Buffer.create 200 in
let dotted_host = Unix.string_of_inet_addr host in
Printf.bprintf buf "CONNECT %s:%d HTTP/1.1\n" dotted_host port;
@@ -1351,14 +1349,14 @@
()
end;
let t = create token name s handler in
-
+
if !verbose_bandwidth > 1 then begin
lprintf "[BW2 %6d] connect on %s:%d\n" (last_time ()) t.name (sock_num
t.sock_out);
end;
-
- token.connection_manager.nconnections_last_second <-
+
+ token.connection_manager.nconnections_last_second <-
token.connection_manager.nconnections_last_second + 1;
-
+
must_write t.sock_out true;
try
t.host <- ip;
@@ -1368,7 +1366,7 @@
t.noproxy <- false;
end else
Unix.connect s (Unix.ADDR_INET(host,port));
-
+
t.connecting <- true;
register_upload t 0; (* The TCP SYN packet *)
forecast_download t 0; (* The TCP ACK packet *)
@@ -1392,7 +1390,7 @@
lprintf "EXCEPTION %s before connect to host %s:%d\n"
(Printexc2.to_string e) (Unix.string_of_inet_addr host) port;
raise e
-
+
(*************************************************************************)
@@ -1421,13 +1419,13 @@
match Unix.getpeername fd with
Unix.ADDR_INET (ip, port) ->
let ip = Ip.of_inet_addr ip in
- t.peer_addr <- Some (ip, port);
+ t.peer_addr <- Some (ip, port);
ip, port
| _ -> raise Not_found
let peer_ip t = fst (peer_addr t)
let peer_port t = snd (peer_addr t)
-
+
(*
let host t =
let fd = fd t.sock_out in
@@ -1510,7 +1508,7 @@
let set_max_output_buffer t len =
t.wbuf.max_buf_size <- len
-
+
(*************************************************************************)
(* *)
(* Compression *)
@@ -1618,7 +1616,7 @@
reset_connection_scheduler ();
deflate_timer ();
);
-
+
set_before_select_hook (fun _ ->
schedule_connections ();
List.iter (fun bc ->
@@ -1636,17 +1634,17 @@
);
(*
-Some ideas:
+Some ideas:
* sort the connections so that we try to read and write as
much as possible to/from connections on which we already have read/written
- a lot.
+ a lot.
* postpone reads and writes for a few seconds so that we have more to
read/write.
dhcppc3:~# ping computer.domain.org -f -s <packet_size> -c 1000
where computer.domain.org is a nearby computer on a 100 Mbs link.
-
+
On my cable link, making <packet_size> vary from 1 to 2000 shows:
* packets > 1450 are 100% lost
* 300 > packets > 1 are 5% lost but exactly the same time (14.5 s)
@@ -1660,26 +1658,26 @@
, pipe 6, ipg/ewma 14.205/0.000 ms
*)
-
-
+
+
set_after_select_hook (fun _ ->
List.iter (fun bc ->
if bc.remaining_bytes > 0 then begin
-
+
bc.connections <- List.sort (fun t1 t2 ->
let w1 = t1.nread in
let w2 = t2.nread in
compare w1 w2
) bc.connections;
-
+
if !verbose_bandwidth > 2 && bc.nconnections > 0 then begin
lprintf "[BW3 %6d] %d read-waiting connections for %d
allowed\n" (last_time ()) bc.nconnections bc.remaining_bytes;
List.iter (fun t ->
lprintf "[BW3 %6d] %s:%d [buffered %d]\n" (last_time
()) t.name (sock_num t.sock_in) ((buf t).len);
) bc.connections;
- end;
-
-
+ end;
+
+
List.iter (fun t ->
if bc.remaining_bytes > 0 then
let nconnections = maxi bc.nconnections 1 in
@@ -1689,7 +1687,7 @@
(try
(* lprintf "allow to read %d\n" can_read; *)
can_read_handler t t.sock_in can_read
- with e ->
+ with e ->
(* lprintf "Exception %s in can_read_handler %s:%d\n"
(Printexc2.to_string e)
t.name (sock_num t.sock_in) *) ()
@@ -1699,35 +1697,35 @@
bc.nconnections <- bc.nconnections - t.read_power
else
if !verbose_bandwidth > 2 then begin
- lprintf "[BW3 %6d] %s:%d could not read\n" (last_time
())
- t.name (sock_num t.sock_out)
+ lprintf "[BW3 %6d] %s:%d could not read\n" (last_time
())
+ t.name (sock_num t.sock_out)
end
) bc.connections;
(* if bc.remaining_bytes > 0 then bc.allow_io := false; *)
end;
-
+
if !verbose_bandwidth > 2 then begin
-
- if bc.remaining_bytes > 0 then
+
+ if bc.remaining_bytes > 0 then
lprintf "[BW3 %6d] still %d bytes to read\n" (last_time ())
bc.remaining_bytes;
-
+
if bc.nconnections > 0 then
lprintf "[BW3 %6d] still %d read-waiting connections after
loop\n" (last_time ()) bc.nconnections;
end;
-
-
+
+
bc.connections <- [];
bc.nconnections <- 0;
) !read_bandwidth_controlers;
List.iter (fun bc ->
if bc.remaining_bytes > 0 then begin
-
+
bc.connections <- List.sort (fun t1 t2 ->
let w1 = remaining_to_write t1 in
let w2 = remaining_to_write t2 in
compare w1 w2
) bc.connections;
-
+
if !verbose_bandwidth > 2 && bc.nconnections > 0 then begin
lprintf "[BW3 %6d] %d write-waiting connections for %d
allowed\n" (last_time ()) bc.nconnections bc.remaining_bytes;
List.iter (fun t ->
@@ -1750,24 +1748,24 @@
bc.nconnections <- bc.nconnections - t.write_power;
else
if !verbose_bandwidth > 2 then begin
- lprintf "[BW3 %6d] %s:%d could not write buffered %d
bytes\n" (last_time ())
+ lprintf "[BW3 %6d] %s:%d could not write buffered %d
bytes\n" (last_time ())
t.name (sock_num t.sock_out) (remaining_to_write t)
end
) bc.connections;
(* if bc.remaining_bytes > 0 then bc.allow_io := false; *)
end;
if !verbose_bandwidth > 2 then begin
-
+
if bc.remaining_bytes > 0 then begin
lprintf "[BW3 %6d] still %d bytes to write\n" (last_time ())
bc.remaining_bytes;
List.iter (fun t ->
let len = remaining_to_write t in
if len > 0 then
- lprintf "[BW3 %6d] %s:%d could write %d\n"
(last_time ())
+ lprintf "[BW3 %6d] %s:%d could write %d\n"
(last_time ())
t.name (sock_num t.sock_out) len
) bc.connections
end;
-
+
if bc.nconnections > 0 then
lprintf "[BW3 %6d] still %d write-waiting connections after
loop\n" (last_time ()) bc.nconnections;
end;
@@ -1775,11 +1773,11 @@
bc.nconnections <- 0;
) !write_bandwidth_controlers
)
-
+
let prevent_close t =
- prevent_close t.sock_in;
+ prevent_close t.sock_in;
prevent_close t.sock_out
-
+
let must_write t bool = must_write t.sock_out bool
let output_buffered t = t.wbuf.len
@@ -1808,7 +1806,7 @@
end;
) fields_titles fields_values
| _ -> assert false
-
+
let load_stats filename =
try
let ic = open_in filename in
@@ -1819,32 +1817,32 @@
let values = input_line ic in
join_stats titles values;
iter_first ic
-
+
in
try
iter_first ic
- with
+ with
End_of_file -> close_in ic
- | e ->
+ | e ->
lprintf "[BWS] Error %s reading %s\n" (Printexc2.to_string e) filename;
close_in ic; proc_net_fs := false
- with
- | e ->
+ with
+ | e ->
lprintf "[BWS] Error %s opening %s\n" (Printexc2.to_string e) filename;
proc_net_fs := false
-
+
let proc_net_timer _ =
if !proc_net_fs && !verbose_bandwidth > 0 then begin
load_stats "/proc/net/netstat";
load_stats "/proc/net/snmp";
end
-
-
+
+
let _ =
Heap.add_memstat "tcpBufferedSocket" (fun level buf ->
Printf.bprintf buf " %d latencies\n" (Hashtbl.length latencies);
Printf.bprintf buf " %d entries in net_stats\n" (Hashtbl.length
net_stats);
);
- add_infinite_timer 1.0 proc_net_timer
+ add_infinite_timer 1.0 proc_net_timer
let packet_frame_size = ref 250