[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Mldonkey-users] [PATCH] BT/DHT: show stats, choose random port
From: |
ygrek |
Subject: |
[Mldonkey-users] [PATCH] BT/DHT: show stats, choose random port |
Date: |
Sun, 20 Mar 2011 13:31:50 +0200 |
---
src/networks/bittorrent/bTGlobals.ml | 27 +++++++++++-
src/networks/bittorrent/bTOptions.ml | 4 +-
src/networks/bittorrent/bT_DHT.ml | 80 ++++++++++++++++++++++++++-------
src/networks/bittorrent/kademlia.ml | 1 +
4 files changed, 92 insertions(+), 20 deletions(-)
diff --git src/networks/bittorrent/bTGlobals.ml
src/networks/bittorrent/bTGlobals.ml
index f6c9b99..8d8c5e7 100644
--- src/networks/bittorrent/bTGlobals.ml
+++ src/networks/bittorrent/bTGlobals.ml
@@ -925,7 +925,7 @@ Define a function to be called when the "mem_stats" command
**************************************************************)
-let _ =
+let () =
Heap.add_memstat "BittorrentGlobals" (fun level buf ->
Printf.bprintf buf "Number of old files: %d\n" (List.length !!old_files);
let downloads = ref 0 in
@@ -940,3 +940,28 @@ let _ =
Printf.bprintf buf "files_by_uid: %d\n" (Hashtbl.length files_by_uid);
Printf.bprintf buf "ft_by_num: %d\n" (Hashtbl.length ft_by_num);
)
+
+open BT_DHT
+
+let () =
+ Heap.add_memstat "BittorrentDHT" (fun _level buf ->
+ match !bt_dht with
+ | None -> ()
+ | Some dht ->
+ let (buckets,nodes,keys,peers) = stat dht in
+ Printf.bprintf buf "Routing : %d nodes in %d buckets\n" nodes buckets;
+ Printf.bprintf buf "Storage : %d keys with %d peers\n" keys peers;
+ List.iter (fun s -> Printf.bprintf buf "%s\n" s) (rpc_stats dht);
+ let queries =
["PING",`Ping;"FIND_NODE",`FindNode;"GET_PEERS",`GetPeers;"ANNOUNCE",`Announce]
in
+ Printf.bprintf buf "Outgoing queries : ok/error/timeout\n";
+ List.iter begin fun (name,qt) ->
+ let get k = try Hashtbl.find dht.M.stats (qt,`Out k) with Not_found -> 0
in
+ Printf.bprintf buf "%s: %d/%d/%d\n" name (get `Answer) (get `Error) (get
`Timeout);
+ end queries;
+ Printf.bprintf buf "Incoming queries\n";
+ List.iter begin fun (name,qt) ->
+ let get () = try Hashtbl.find dht.M.stats (qt,`In) with Not_found -> 0 in
+ Printf.bprintf buf "%s: %d\n" name (get ())
+ end queries
+ )
+
diff --git src/networks/bittorrent/bTOptions.ml
src/networks/bittorrent/bTOptions.ml
index 1664fec..64064c7 100644
--- src/networks/bittorrent/bTOptions.ml
+++ src/networks/bittorrent/bTOptions.ml
@@ -165,8 +165,8 @@ let get_user_agent () =
else !!user_agent
let dht_port = define_option bittorrent_section ["dht_port"]
- "The UDP port to bind the DHT node to (0 to disable)"
- port_option 12345
+ "DHT port (UDP, set 0 to disable)"
+ port_option (2000 + Random.int 20000)
let use_trackers = define_option bittorrent_section ["use_trackers"]
"Send announces to trackers"
diff --git src/networks/bittorrent/bT_DHT.ml src/networks/bittorrent/bT_DHT.ml
index 8d6f039..b554e4c 100644
--- src/networks/bittorrent/bT_DHT.ml
+++ src/networks/bittorrent/bT_DHT.ml
@@ -56,6 +56,8 @@ let clear h = Hashtbl.clear h
end
+let stats_add h k n = Hashtbl.replace h k (n + try Hashtbl.find h k with
Not_found -> 0)
+
module KRPC = struct
type dict = (string * Bencode.value) list
@@ -120,12 +122,26 @@ let udp_set_reader socket f =
module A = Assoc2
-let send sock (ip,port as addr) txnmsg =
+let send sock stats (ip,port as addr) txnmsg =
let s = encode txnmsg in
if !debug then lprintf_nl "KRPC to %s : %S" (show_addr addr) s;
+ stats_add stats `Sent 1;
+ stats_add stats `SentBytes (String.length s);
write sock false s ip port
-type t = UdpSocket.t * (addr, string, (addr -> dict -> unit) * (unit -> unit)
* int) A.t
+type stats_key = [ `Timeout | `Sent | `SentBytes | `Recv | `RecvBytes |
`Decoded | `Handled | `NoTxn ]
+type t =
+ UdpSocket.t *
+ (stats_key, int) Hashtbl.t *
+ (addr, string, (addr -> dict -> unit) * ([`Error|`Timeout]-> unit) * int) A.t
+let show_stats t =
+ let get k = try Hashtbl.find t k with Not_found -> 0 in
+ [
+ sprintf "rpc recv %d pkts (%d bytes)" (get `Recv) (get `RecvBytes);
+ sprintf "rpc sent %d pkts (%d bytes)" (get `Sent) (get `SentBytes);
+ sprintf "rpc decoded %d, handled %d" (get `Decoded) (get `Handled);
+ sprintf "rpc timeouted %d, orphan %d" (get `Timeout) (get `NoTxn);
+ ]
let create port enabler bw_control answer : t =
let socket = create Unix.inet_addr_any port (fun sock event ->
@@ -141,15 +157,17 @@ let create port enabler bw_control answer : t =
set_wtimeout (sock socket) 5.;
set_rtimeout (sock socket) 5.;
let h = A.create () in
+ let stats = Hashtbl.create 10 in
let timeout h =
let now = last_time () in
let bad = ref [] in
let total = ref 0 in
A.iter h (fun addr txn (_,kerr,t) -> incr total; if t < now then bad :=
(addr,txn,kerr) :: !bad);
if !debug then lprintf_nl "timeouted %d of %d DHT queries" (List.length
!bad) !total;
+ stats_add stats `Timeout (List.length !bad);
List.iter (fun (addr,txn,kerr) ->
A.remove h addr txn;
- try kerr () with exn -> if !debug then lprintf_nl ~exn "timeout for %s"
(show_addr addr)) !bad;
+ try kerr `Timeout with exn -> if !debug then lprintf_nl ~exn "timeout
for %s" (show_addr addr)) !bad;
in
BasicSocket.add_session_timer enabler 5. (fun () -> timeout h);
let handle addr (txn,ver,msg) =
@@ -159,15 +177,19 @@ let create port enabler bw_control answer : t =
| Error (code,msg) ->
if !verb then lprintf_nl "error received from %s%s : %Ld %S"
(show_addr addr) !!version code msg;
begin match A.find h addr txn with
- | None -> if !verb then lprintf_nl "no txn %S for %s%s" txn (show_addr
addr) !!version
- | Some (_, kerr, _) -> A.remove h addr txn; kerr ()
+ | None ->
+ stats_add stats `NoTxn 1;
+ if !verb then lprintf_nl "no txn %S for %s%s" txn (show_addr addr)
!!version
+ | Some (_, kerr, _) -> A.remove h addr txn; kerr `Error
end
| Query (name,args) ->
let ret = answer addr name args in
- send socket addr (txn, ret)
+ send socket stats addr (txn, ret)
| Response ret ->
match A.find h addr txn with
- | None -> if !verb then lprintf_nl "no txn %S for %s%s" txn (show_addr
addr) !!version
+ | None ->
+ stats_add stats `NoTxn 1;
+ if !verb then lprintf_nl "no txn %S for %s%s" txn (show_addr addr)
!!version
| Some (k,_,_) -> A.remove h addr txn; k addr ret
in
let handle p =
@@ -177,13 +199,17 @@ let create port enabler bw_control answer : t =
let addr = (Ip.of_inet_addr inet_addr, port) in
let ret = ref None in
try
+ stats_add stats `RecvBytes (String.length p.udp_content);
+ stats_add stats `Recv 1;
let r = decode_exn p.udp_content in
+ stats_add stats `Decoded 1;
ret := Some r;
- handle addr r
+ handle addr r;
+ stats_add stats `Handled 1;
with exn ->
let version = match !ret with Some (_,Some s,_) -> sprintf " client
%S" s | _ -> "" in
if !verb then lprintf_nl ~exn "handle packet from %s%s : %S"
(show_addr addr) version p.udp_content;
- let error txn code str = send socket addr (txn,(Error (Int64.of_int
code,str))) in
+ let error txn code str = send socket stats addr (txn,(Error
(Int64.of_int code,str))) in
match exn,!ret with
| Malformed_packet x, Some (txn, _, _)
| Protocol_error ("",x), Some(txn, _, _) | Protocol_error (txn,x), _
-> error txn 203 x
@@ -192,15 +218,15 @@ let create port enabler bw_control answer : t =
| _ -> ()
in
udp_set_reader socket handle;
- (socket,h)
+ (socket,stats,h)
-let shutdown (socket,h) =
+let shutdown (socket,_,h) =
close socket Closed_by_user;
A.iter h (fun addr _ (_,kerr,_) ->
- try kerr () with exn -> if !verb then lprintf_nl ~exn "shutdown for %s"
(show_addr addr));
+ try kerr `Timeout with exn -> if !verb then lprintf_nl ~exn "shutdown for
%s" (show_addr addr));
A.clear h
-let write (socket,h) msg addr k ~kerr =
+let write (socket,stats,h) msg addr k ~kerr =
let tt = Assoc2.find_all h addr in
let rec loop () = (* choose txn FIXME *)
let txn = string_of_int (Random.int 1_000_000) in
@@ -210,7 +236,7 @@ let write (socket,h) msg addr k ~kerr =
in
let txn = loop () in
Assoc2.add h addr txn (k,kerr,last_time () + dht_query_timeout);
- send socket addr (txn,msg)
+ send socket stats addr (txn,msg)
end (* KRPC *)
@@ -350,21 +376,33 @@ module Peers = Map.Make(struct type t = addr let compare
= compare end)
module M = struct
+type query_type = [ `Ping | `FindNode | `GetPeers | `Announce ]
+type answer_type = [ `Answer | `Error | `Timeout ]
+
type t = {
rt : Kademlia.table; (* routing table *)
rpc : KRPC.t; (* KRPC protocol socket *)
dht_port : int; (* port *)
torrents : (H.t, int Peers.t) Hashtbl.t; (* torrents announced by other
peers *)
enabler : bool ref; (* timers' enabler *)
+ stats : (query_type * [ `In | `Out of answer_type ], int) Hashtbl.t; (*
statistics *)
}
+let query_type_of_query = function
+| Ping -> `Ping
+| FindNode _ -> `FindNode
+| GetPeers _ -> `GetPeers
+| Announce _ -> `Announce
+
let dht_query t addr q k ~kerr =
if !debug then lprintf_nl "DHT query to %s : %s" (show_addr addr)
(show_query q);
+ let qt = query_type_of_query q in
KRPC.write t.rpc (make_query t.rt.self q) addr begin fun addr dict ->
- let (id,r) = try parse_response_exn q dict with exn -> kerr (); raise exn
in
+ let (id,r) = try parse_response_exn q dict with exn -> stats_add t.stats
(qt, `Out `Error) 1; kerr (); raise exn in
if !debug then lprintf_nl "DHT response from %s (%s) : %s" (show_addr
addr) (show_id id) (show_response r);
+ stats_add t.stats (qt, `Out `Answer) 1;
k (id,addr) r
- end ~kerr
+ end ~kerr:(fun reason -> stats_add t.stats (qt, `Out (reason:>answer_type))
1; kerr ())
let ping t addr k = dht_query t addr Ping begin fun node r ->
match r with Ack -> k (Some node)
@@ -407,7 +445,8 @@ let create rt dht_port bw_control answer =
let rpc = KRPC.create dht_port enabler bw_control answer in
let torrents = Hashtbl.create 8 in
manage_timeouts enabler torrents;
- { rt = rt; rpc = rpc; torrents = torrents; dht_port = dht_port; enabler =
enabler; }
+ { rt = rt; rpc = rpc; torrents = torrents; dht_port = dht_port; enabler =
enabler;
+ stats = Hashtbl.create 10; }
let shutdown dht =
dht.enabler := false;
@@ -579,6 +618,12 @@ let show_torrents torrents =
torrents
let show dht = show_table dht.M.rt; show_torrents dht.M.torrents
+let stat dht =
+ buckets dht.M.rt,
+ size dht.M.rt,
+ Hashtbl.length dht.M.torrents,
+ Hashtbl.fold (fun _ peers acc -> acc + Peers.fold (fun _ _ acc -> acc + 1)
peers 0) dht.M.torrents 0
+let rpc_stats dht = let (_,st,_) = dht.M.rpc in KRPC.show_stats st
let bootstrap dht host addr k =
M.ping dht addr begin function
@@ -642,6 +687,7 @@ let start rt port bw_control =
let node = (id,addr) in
if !debug then lprintf_nl "DHT query from %s : %s" (show_node node)
(show_query q);
update !!dht Good id addr;
+ stats_add (!!dht).M.stats (M.query_type_of_query q, `In) 1;
let response =
match q with
| Ping -> Ack
diff --git src/networks/bittorrent/kademlia.ml
src/networks/bittorrent/kademlia.ml
index 5336449..c6d901f 100644
--- src/networks/bittorrent/kademlia.ml
+++ src/networks/bittorrent/kademlia.ml
@@ -352,6 +352,7 @@ let rec fold f acc = function
| L b -> f acc b
let size t = fold (fun acc b -> acc + Array.length b.nodes) 0 t.root
+let buckets t = fold (fun acc b -> acc + 1) 0 t.root
(*
module NoNetwork : Network = struct
--
1.7.2.3
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Mldonkey-users] [PATCH] BT/DHT: show stats, choose random port,
ygrek <=