[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Mldonkey-commits] Changes to mldonkey/src/utils/net/basicSocket.ml
From: |
mldonkey-commits |
Subject: |
[Mldonkey-commits] Changes to mldonkey/src/utils/net/basicSocket.ml |
Date: |
Wed, 06 Jul 2005 20:25:54 -0400 |
Index: mldonkey/src/utils/net/basicSocket.ml
diff -u mldonkey/src/utils/net/basicSocket.ml:1.16
mldonkey/src/utils/net/basicSocket.ml:1.17
--- mldonkey/src/utils/net/basicSocket.ml:1.16 Tue Jun 28 22:45:57 2005
+++ mldonkey/src/utils/net/basicSocket.ml Thu Jul 7 00:25:46 2005
@@ -30,19 +30,19 @@
type close_reason =
Closed_for_timeout (* timeout exceeded *)
| Closed_for_lifetime (* lifetime exceeded *)
- | Closed_by_peer (* end of file *)
+ | Closed_by_peer (* end of file *)
| Closed_for_error of string
| Closed_by_user
| Closed_for_overflow
| Closed_connect_failed
| Closed_for_exception of exn
-
-type event =
+
+type event =
| CLOSED of close_reason
-| RTIMEOUT (* called after a timeout on reading *)
-| WTIMEOUT (* called after a timeout on writing *)
-| LTIMEOUT (* called after a timeout on lifetime *)
-| CAN_READ (* called when read is possible *)
+| RTIMEOUT (* called after a timeout on reading *)
+| WTIMEOUT (* called after a timeout on writing *)
+| LTIMEOUT (* called after a timeout on lifetime *)
+| CAN_READ (* called when read is possible *)
| CAN_WRITE (* called when write is possible *)
type t = {
@@ -55,16 +55,16 @@
mutable pollpos : int;
mutable read_allowed : bool ref;
mutable write_allowed : bool ref;
-
+
(* YOU CAN MODIFY THESE *)
mutable rtimeout: float;
mutable next_rtimeout : float;
-
+
mutable wtimeout: float;
mutable next_wtimeout : float;
-
+
mutable lifetime : float;
-
+
mutable event_handler : handler;
mutable error : close_reason;
@@ -98,7 +98,7 @@
external get_fd_num : Unix.file_descr -> int = "ml_get_fd_num" "noalloc"
-external select: t list -> float -> unit = "ml_select"
+external select: t list -> float -> unit = "ml_select"
external use_poll : bool -> unit = "ml_use_poll"
external has_threads : unit -> bool = "ml_has_pthread"
@@ -109,20 +109,20 @@
(* Global values *)
(* *)
(*************************************************************************)
-
+
let can_read = 1
let can_write = 2
-
+
let debug = ref false
let nb_sockets = ref 0
let allow_read = ref true
let allow_write = ref true
-
+
let infinite_timeout = 3600. *. 24. *. 365. (* one year ! *)
let current_time = ref (Unix.gettimeofday ())
let last_time = ref (int_of_float (!current_time -. 1000000000.))
-
+
let dummy_fd = Obj.magic (-1)
let closed_tasks = ref []
let fd_tasks = ref ([]: t list)
@@ -132,7 +132,7 @@
let timers = ref []
let loop_delay = ref 0.005
let socket_keepalive = ref false
-let verbose_bandwidth = ref 0
+let verbose_bandwidth = ref 0
let bandwidth_second_timers = ref []
let use_threads = ref true
@@ -157,21 +157,21 @@
t.next_wtimeout <- time +. !current_time
let set_lifetime t time =
- t.lifetime <- time +. !current_time
+ t.lifetime <- time +. !current_time
let set_handler t handler =
t.event_handler <- handler
let handler t = t.event_handler
-
-
+
+
let set_before_select_hook f =
before_select_hooks := f :: !before_select_hooks
let set_after_select_hook f =
after_select_hooks := f :: !after_select_hooks
-
+
let add_bandwidth_second_timer f =
bandwidth_second_timers := f :: !bandwidth_second_timers
@@ -180,7 +180,7 @@
(* PRINTERS *)
(* *)
(*************************************************************************)
-
+
let string_of_reason c =
match c with
Closed_for_timeout -> "timeout"
@@ -192,14 +192,14 @@
| Closed_connect_failed -> "connect failed"
| Closed_for_exception e -> Printf.sprintf "exception %s"
(Printexc2.to_string e)
-
+
(*************************************************************************)
(* *)
(* Simple functions *)
(* *)
(*************************************************************************)
-
+
let minf (x: float) (y: float) =
if x > y then y else x
@@ -218,52 +218,52 @@
(* *)
(*************************************************************************)
-let set_allow_read s ref =
- s.read_allowed <- ref;
+let set_allow_read s ref =
+ s.read_allowed <- ref;
change_fd_event_setting s
-
-let set_allow_write s ref =
+
+let set_allow_write s ref =
s.write_allowed <- ref;
change_fd_event_setting s
-
+
let update_time () =
current_time := Unix.gettimeofday ();
last_time := (int_of_float (!current_time -. 1000000000.));
!current_time
-let must_write t b =
+let must_write t b =
if b <> t.want_to_write then begin
t.want_to_write <- b;
change_fd_event_setting t
end
-let must_read t b =
+let must_read t b =
if b <> t.want_to_read then begin
t.want_to_read <- b;
change_fd_event_setting t
end
-let print_socket buf s =
- Printf.bprintf buf "FD %s:%d: %20s\n"
+let print_socket buf s =
+ Printf.bprintf buf "FD %s:%d: %20s\n"
(s.name) (get_fd_num s.fd)
(Date.to_string s.born)
-let sprint_socket s =
+let sprint_socket s =
let buf = Buffer.create 100 in
- print_socket buf s;
+ print_socket buf s;
Buffer.contents buf
(*************************************************************************)
(* *)
(* close *)
(* *)
-(*************************************************************************)
+(*************************************************************************)
let close t msg =
if t.fd <> dummy_fd then begin
if !debug then begin
- lprintf "CLOSING: %s" (sprint_socket t);
+ lprintf "CLOSING: %s" (sprint_socket t);
end;
- (try
+ (try
Unix.close t.fd;
with _ -> ());
t.fd <- dummy_fd;
@@ -272,7 +272,7 @@
t.error <- msg;
decr nb_sockets
end
-
+
let default_before_select t = ()
let dump_basic_socket buf = ()
@@ -282,15 +282,15 @@
(* *)
(* create_blocking *)
(* *)
-(*************************************************************************)
-
+(*************************************************************************)
+
let create_blocking name fd handler =
-
+
let (fdnum : int) = get_fd_num fd in
(*
if fdnum >= Unix32.fds_size then begin
Unix.close fd;
- lprintf "**** File descriptor above limit %d!!\n" fdnum;
+ lprintf "**** File descriptor above limit %d!!\n" fdnum;
failwith "File Descriptor removed";
end;
*)
@@ -300,31 +300,31 @@
ignore (update_time ());
let t = {
fd = fd;
-
+
want_to_write = false;
want_to_read = true;
closed = false;
- pollpos = -1;
+ pollpos = -1;
flags = 0;
-
+
rtimeout = infinite_timeout;
next_rtimeout = !current_time +. infinite_timeout;
-
+
wtimeout = infinite_timeout;
next_wtimeout = !current_time +. infinite_timeout;
-
+
lifetime = !current_time +. infinite_timeout;
read_allowed = allow_read;
write_allowed = allow_write;
-
+
event_handler = handler;
error = Closed_by_peer;
(* before_select = default_before_select; *)
name = name;
printer = (fun _ -> "");
born = !current_time;
-
+
dump_info = dump_basic_socket;
can_close = true;
} in
@@ -332,11 +332,11 @@
if !debug then begin
lprintf "OPENING: %s" ( sprint_socket t);
end;
- fd_tasks := t :: !fd_tasks;
+ fd_tasks := t :: !fd_tasks;
add_fd_to_event_set t;
t
-
+
(*************************************************************************)
(* *)
@@ -355,7 +355,7 @@
(* iter_task *)
(* *)
(*************************************************************************)
-
+
let rec iter_task old_tasks time =
match old_tasks with
[] -> ()
@@ -363,19 +363,19 @@
(* lprintf "NEXT TASK\n"; *)
if t.closed then begin
remove_fd_from_event_set t;
- iter_task old_tail time
+ iter_task old_tail time
end else
begin
fd_tasks := t :: !fd_tasks;
-
+
(* t.before_select t; *)
-
- if t.want_to_write then
+
+ if t.want_to_write then
timeout := minf (t.next_wtimeout -. time) !timeout;
-
+
if t.want_to_read then
timeout := minf (t.next_rtimeout -. time) !timeout;
-
+
iter_task old_tail time;
end
@@ -402,7 +402,7 @@
(* TIMERS *)
(* *)
(*************************************************************************)
-
+
let add_timer delay f =
timers := {
next_time = !current_time +. delay;
@@ -417,7 +417,7 @@
t.applied <- false;
end
-let add_session_timer enabler delay f =
+let add_session_timer enabler delay f =
let f t =
if !enabler then begin
reactivate_timer t;
@@ -426,12 +426,12 @@
in
add_timer delay f
-
+
let add_infinite_timer delay f = add_session_timer (ref true) delay f
-let add_session_option_timer enabler option f =
+let add_session_option_timer enabler option f =
let f t =
- if !enabler then
+ if !enabler then
let delay = !!option in
(* Negative delays are re-examined every minute, and the timer is only called
if the delay becomes positive again. *)
@@ -440,8 +440,8 @@
if delay > 0. then f ()
in
add_timer !!option f
-
-let add_infinite_option_timer option f =
+
+let add_infinite_option_timer option f =
add_session_option_timer (ref true) option f
(*************************************************************************)
@@ -463,24 +463,24 @@
(* *)
(*************************************************************************)
-let rec exec_tasks =
+let rec exec_tasks =
function [] -> ()
| t :: tail ->
(
let time = !current_time in
- if not t.closed && t.next_rtimeout < time then
+ if not t.closed && t.next_rtimeout < time then
(try t.event_handler t RTIMEOUT with _ -> ());
- if not t.closed && t.next_wtimeout < time then
+ if not t.closed && t.next_wtimeout < time then
(try t.event_handler t WTIMEOUT with _ -> ());
- if not t.closed && t.lifetime < time then
+ if not t.closed && t.lifetime < time then
(try t.event_handler t LTIMEOUT with _ -> ());
- if not t.closed && t.flags land can_read <> 0 then
- (try
+ if not t.closed && t.flags land can_read <> 0 then
+ (try
t.next_rtimeout <- time +. t.rtimeout;
t.event_handler t CAN_READ with _ -> ());
- if not t.closed && t.flags land can_write <> 0 then
- (try
- t.next_wtimeout <- time +. t.wtimeout;
+ if not t.closed && t.flags land can_write <> 0 then
+ (try
+ t.next_wtimeout <- time +. t.wtimeout;
t.event_handler t CAN_WRITE with _ -> ());
);
exec_tasks tail
@@ -490,7 +490,7 @@
(* exec_timers *)
(* *)
(*************************************************************************)
-
+
let rec exec_timers = function
[] -> ()
| t :: tail ->
@@ -507,10 +507,10 @@
(* loop *)
(* *)
(*************************************************************************)
-
+
let loop () =
add_infinite_timer 1.0 (fun _ ->
- if !verbose_bandwidth > 0 then
+ if !verbose_bandwidth > 0 then
lprintf "[BW1] Resetting bandwidth counters\n";
List.iter (fun f -> try f () with _ -> ()) !bandwidth_second_timers
);
@@ -529,52 +529,51 @@
closed_tasks := tail;
(try t.event_handler t (CLOSED t.error) with _ -> ());
done;
-
+
(* lprintf "before iter_timer\n"; *)
let time = update_time () in
timeout := infinite_timeout;
timers := iter_timer !timers time;
(* lprintf "before iter_task\n"; *)
-
+
let old_tasks = !fd_tasks in
fd_tasks := [];
iter_task old_tasks time;
(* lprintf "timeout %f\n" !timeout; *)
-
+
if !timeout < 0.01 then timeout := 0.01;
-(*
- lprintf "TASKS: %d\n" (List.length !tasks);
- lprintf "TIMEOUT: %f\n" !timeout;
+(*
+ lprintf "TASKS: %d\n" (List.length !tasks);
+ lprintf "TIMEOUT: %f\n" !timeout;
timeout := 5.;
*)
exec_hooks !before_select_hooks;
(* lprintf "Tasks %d\n" (List.length !fd_tasks); *)
- select !fd_tasks !timeout;
- with
+ select !fd_tasks !timeout;
+ with
| e ->
lprintf "Exception %s in Select.loop\n" (Printexc2.to_string e);
done
-
-
-
+
+
let shutdown t s =
if t.fd <> dummy_fd then begin
(* lprintf "SHUTDOWN\n"; *)
(try Unix.shutdown t.fd Unix.SHUTDOWN_ALL with _ -> ());
close t s
end
-
+
let nb_sockets () = !nb_sockets
-
+
let stats buf t =
lprintf "Socket %d\n" (get_fd_num t.fd)
let sock_num t = get_fd_num t.fd
-
+
let print_socket buf s =
print_socket buf s;
- Printf.bprintf buf " rtimeout %5.0f/%5.0f read %s & %s write %s & %s (born
%f)\n"
+ Printf.bprintf buf " rtimeout %5.0f/%5.0f read %s & %s write %s & %s (born
%f)\n"
(s.next_rtimeout -. !current_time)
s.rtimeout
(string_of_bool s.want_to_read)
@@ -582,17 +581,17 @@
(string_of_bool s.want_to_write)
(string_of_bool !(s.write_allowed))
(!current_time -. s.born)
-
+
let print_sockets buf =
Printf.bprintf buf "PRINT SOCKETS: %d\n" (List.length !fd_tasks);
List.iter (print_socket buf) !fd_tasks;
()
-
+
let info t = t.name
-
+
let set_printer s f =
s.printer <- f
-
+
let set_dump_info s f =
s.dump_info <- f
@@ -602,39 +601,39 @@
if s.can_close then
close s Closed_by_user
) !fd_tasks
-
-
+
+
let last_time () = !last_time
let start_time = last_time ()
-let date_of_int date =
+let date_of_int date =
if date >= 1000000000 then (float_of_int date) else (float_of_int date +.
1000000000.)
-
-let string_of_date date =
+
+let string_of_date date =
if date < 1 then "never" else
Date.to_string (date_of_int date)
let normalize_time time =
if time >= 1000000000 || time < 0 then time - 1000000000 else time
-
+
let get_rtimeout t = t.rtimeout, t.next_rtimeout -. !current_time
let int64_time () = Int64.of_float !current_time
let update_time () = ignore (update_time ())
let current_time () = !current_time
-
+
(*************************************************************************)
(* *)
(* MAIN *)
(* *)
(*************************************************************************)
-
+
let _ =
Heap.add_memstat "BasicSocket" (fun level buf ->
- Printf.bprintf buf " %d timers\n" (List.length !timers);
- Printf.bprintf buf " %d fd_tasks:\n" (List.length !fd_tasks);
+ Printf.bprintf buf " %d timers\n" (List.length !timers);
+ Printf.bprintf buf " %d fd_tasks:\n" (List.length !fd_tasks);
if level > 0 then
List.iter (fun t -> t.dump_info buf) !fd_tasks;
- Printf.bprintf buf " %d closed_tasks:\n" (List.length !closed_tasks);
+ Printf.bprintf buf " %d closed_tasks:\n" (List.length !closed_tasks);
if level > 0 then
List.iter (fun t -> t.dump_info buf) !closed_tasks;
);
@@ -642,15 +641,15 @@
Printexc2.register_exn (fun e ->
match e with
Unix.Unix_error (e, f, arg) ->
- Printf.sprintf "%s failed%s: %s" f (if arg = "" then "" else
+ Printf.sprintf "%s failed%s: %s" f (if arg = "" then "" else
"on " ^ arg) (Unix.error_message e)
| _ -> raise e
);
-
+
add_timer 300. (fun t ->
reactivate_timer t;
if !debug then
- let buf = Buffer.create 100 in
+ let buf = Buffer.create 100 in
print_sockets buf;
lprintf "%s\n" (Buffer.contents buf);
)
- [Mldonkey-commits] Changes to mldonkey/src/utils/net/basicSocket.ml,
mldonkey-commits <=