[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Mldonkey-commits] Changes to mldonkey/src/networks/bittorrent/bTClients
From: |
mldonkey-commits |
Subject: |
[Mldonkey-commits] Changes to mldonkey/src/networks/bittorrent/bTClients.ml |
Date: |
Sun, 10 Jul 2005 19:19:20 -0400 |
Index: mldonkey/src/networks/bittorrent/bTClients.ml
diff -u mldonkey/src/networks/bittorrent/bTClients.ml:1.40
mldonkey/src/networks/bittorrent/bTClients.ml:1.41
--- mldonkey/src/networks/bittorrent/bTClients.ml:1.40 Thu Jul 7 00:25:46 2005
+++ mldonkey/src/networks/bittorrent/bTClients.ml Sun Jul 10 23:19:16 2005
@@ -18,13 +18,13 @@
*)
-(** Functions used in client<->client communication
+(** Functions used in client<->client communication
*)
(** A peer (or client) is always a remote peer in this file.
A Piece is a portion of the file associated with a hash (sha1).
In mldonkey a piece is referred as a block inside the swarming system.
- A SubPiece is a portion of a piece (without hash) which can be
+ A SubPiece is a portion of a piece (without hash) which can be
sent/downloaded to/from a peer.
In mldonkey a SubPiece is referred as a range inside the swarming system.
@see <http://wiki.theory.org/index.php/BitTorrentSpecification> wiki for
some
@@ -62,10 +62,10 @@
open BTComplexOptions
open BTChooser
open TcpMessages
-
+
let http_ok = "HTTP 200 OK"
let http11_ok = "HTTP/1.1 200 OK"
-
+
let next_uploaders = ref ([] : BTTypes.client list)
let current_uploaders = ref ([] : BTTypes.client list)
@@ -77,13 +77,13 @@
In this function we connect to a tracker.
@param file The file concerned by the request
@param url Url of the tracker to connect
- @param event Event (as a string) to send to the tracker :
- can be 'completed' if the file is complete, 'started' for the first
+ @param event Event (as a string) to send to the tracker :
+ can be 'completed' if the file is complete, 'started' for the first
connection to this tracker or 'stopped' for a clean stop of the file.
Everything else will be ok for a second connection to the tracker.
Be careful to the spelling of this event
- @param f The function used to parse the result of the connection.
- The function will get a file as an argument (@see
+ @param f The function used to parse the result of the connection.
+ The function will get a file as an argument (@see
get_sources_from_tracker for an example)
*)
let connect_trackers file event f =
@@ -118,15 +118,16 @@
("left", Int64.to_string left) ::
("compact","1") ::
args
- in
+ in
let args = if !!numwant > -1 then
("numwant", string_of_int !!numwant) :: args else args
in
let args = if !!force_client_ip then
("ip", Ip.to_string !!set_client_ip) :: args else args
in
-
+
List.iter (fun t ->
+
(* if we have too few sources we may ask the tracker before the interval
*)
if not must_check_delay
|| not file.file_tracker_connected
@@ -140,8 +141,7 @@
lprintf "get_sources_from_tracker: tracker_connected:%s
last_clients:%i last_conn-last_time:%i file: %s\n"
(string_of_bool file.file_tracker_connected)
t.tracker_last_clients_num
(t.tracker_last_conn - last_time()) file.file_name;
-
-
+
let module H = Http_client in
let url = t.tracker_url in
let r = {
@@ -151,7 +151,7 @@
H.req_user_agent =
Printf.sprintf "MLdonkey/%s" Autoconf.current_version;
} in
-
+
if !verbose_msg_servers then
lprintf "Request sent to tracker %s for file: %s\n"
t.tracker_url file.file_name;
@@ -168,29 +168,29 @@
t.tracker_url (t.tracker_interval - (last_time () -
t.tracker_last_conn)) file.file_name
) file.file_trackers
-(** In this function we decide which peers will be
+(** In this function we decide which peers will be
uploaders. We send a choke message to current uploaders
that are not in the next uploaders list. We send Unchoke
- for clients that are in next list (and not in current)
+ for clients that are in next list (and not in current)
*)
let recompute_uploaders () =
if !verbose_upload then lprintf "recompute_uploaders\n";
next_uploaders := choose_uploaders current_files;
- (*Send choke if a current_uploader is not in next_uploaders*)
+ (*Send choke if a current_uploader is not in next_uploaders*)
List.iter ( fun c -> if ((List.mem c !next_uploaders)==false) then
begin
set_client_has_a_slot (as_client c) false;
(*we will let him finish his download and choke him on next_request*)
end
) !current_uploaders;
-
- (*don't send Choke if new uploader is already an uploaders *)
+
+ (*don't send Choke if new uploader is already an uploaders *)
List.iter ( fun c -> if ((List.mem c !current_uploaders)==false) then
begin
set_client_has_a_slot (as_client c) true;
Rate.update_no_change c.client_downloaded_rate;
Rate.update_no_change c.client_upload_rate;
- c.client_last_optimist<- last_time();
+ c.client_last_optimist <- last_time();
client_enter_upload_queue (as_client c);
send_client c Unchoke;
end
@@ -200,32 +200,31 @@
(****** Fabrice: why are clients which are disconnected removed ???
These clients might still be useful to reconnect to, no ? *)
-
(** This function is called when a client is disconnected
(be it by our side or its side).
A client which disconnects (even only one time) is discarded.
- If it's an uploader which disconnects we recompute uploaders
+ If it's an uploader which disconnects we recompute uploaders
(see recompute_uploaders) immediately.
@param c The client to disconnect
@param reason The reason for the disconnection (see in BasicSocket.ml)
-*)
+*)
let disconnect_client c reason =
if !verbose_msg_clients then
- lprintf "CLIENT %d: disconnected: %s\n" (client_num c) (string_of_reason
reason);
+ lprintf_nl "CLIENT %d: disconnected: %s" (client_num c) (string_of_reason
reason);
begin
match c.client_sock with
NoConnection -> ()
| ConnectionWaiting token ->
cancel_token token;
c.client_sock <- NoConnection
- | Connection sock ->
+ | Connection sock ->
close sock reason;
try
(* List.iter (fun r -> Int64Swarmer.free_range r) c.client_ranges; *)
set_client_disconnected c reason;
let file = c.client_file in
-(* this is not useful already done in the match
+ (* this is not useful already done in the match
(try close sock reason with _ -> ()); *)
(*---------not needed ?? VvvvvV---------------
c.client_ranges <- [];
@@ -325,7 +324,7 @@
(** A wrapper to send Interested message to a client.
(Send interested only if needed)
@param c The client to send Interested
-*)
+*)
let send_interested c =
if c.client_interesting && (not c.client_alrd_sent_interested) then
begin
@@ -334,13 +333,13 @@
end
-(** Send a Bitfield message to a client.
+(** Send a Bitfield message to a client.
@param c The client to send the Bitfield message
*)
let send_bitfield c =
let bitmap =
match c.client_file.file_swarmer with
- None ->
+ None ->
(* This must be a seeded file... *)
String.make (Array.length c.client_file.file_chunks) '3'
| Some swarmer ->
@@ -350,7 +349,7 @@
if !verbose then lprintf_nl "SENDING Verified bitmap: [%s]" bitmap;
send_client c (BitField
- (
+ (
let nchunks = String.length bitmap in
let len = (nchunks+7)/8 in
let s = String.make len '\000' in
@@ -415,13 +414,13 @@
let ccc = new_client file peer_id (TcpBufferedSocket.host sock)
in
lprintf "CLIENT %d: testing instead of %d\n"
(client_num ccc) (client_num c);
- (match ccc.client_sock with
- Connection _ ->
+ (match ccc.client_sock with
+ Connection _ ->
lprintf "This client is already connected\n";
- close sock (Closed_for_error "Already connected");
+ close sock (Closed_for_error "Already connected");
remove_client ccc;
c
- | _ ->
+ | _ ->
lprintf "CLIENT %d: recovered by UID\n" (client_num ccc);
remove_client c;
cc := Some ccc;
@@ -539,18 +538,18 @@
let file = c.client_file in
(*Check if there's not enough requests in the 'pipeline'
and if a request can be send (not choked and file is downloading) *)
- if List.length c.client_ranges_sent < max_range_requests &&
+ if List.length c.client_ranges_sent < max_range_requests &&
file_state file = FileDownloading && (c.client_choked == false) then
(*num is the number of the piece, x and y are the position
of the subpiece in the piece(!), r is a (CommonSwarmer) range *)
let up = match c.client_uploader with
None -> assert false
- | Some up -> up in
+ | Some up -> up in
let swarmer = Int64Swarmer.uploader_swarmer up in
try
- let num, x,y, r =
+ let num, x,y, r =
if !verbose_msg_clients then begin
lprintf_nl "[BT]: CLIENT %d: Finding new range to send" (client_num
c);
end;
@@ -573,7 +572,7 @@
lprintf_nl "\nBT: Finding Range:";
end;
try
- (*We must find a block to request first, and then
+ (*We must find a block to request first, and then
some range inside this block
*)
let rec iter () =
@@ -587,7 +586,7 @@
let b = Int64Swarmer.find_block up in
if !verbose_swarming then begin
lprintf "[BT]: Block Found: "; Int64Swarmer.print_block b;
- end;
+ end;
c.client_block <- Some b;
(*We put the found block in client_block to
request range in this block. (Useful for
@@ -600,9 +599,9 @@
end;
try
(*Given a block find a range inside*)
- let (x,y,r) =
+ let (x,y,r) =
match c.client_range_waiting with
- Some (x,y,r) ->
+ Some (x,y,r) ->
c.client_range_waiting <- None;
(x,y,r)
| None -> Int64Swarmer.find_range up in
@@ -658,17 +657,17 @@
@param sock The socket used for this client
@param msg The message sent by the client
*)
-and client_to_client c sock msg =
+and client_to_client c sock msg =
if !verbose_msg_clients then begin
let (timeout, next) = get_rtimeout sock in
- lprintf "CLIENT %d: (%d, %d,%d) Received %s\n"
+ lprintf_nl "[BT]: CLIENT %d: (%d, %d,%d) Received %s"
(client_num c)
(last_time ())
(int_of_float timeout)
(int_of_float next)
(TcpMessages.to_string msg);
end;
-
+
let file = c.client_file in
(* if c.client_blocks_sent != file.file_blocks_downloaded then begin
let rec iter list =
@@ -682,60 +681,60 @@
in
iter file.file_blocks_downloaded
end;*)
-
+
try
match msg with
Piece (num, offset, s, pos, len) ->
-(*A Piece message contains the data*)
+ (*A Piece message contains the data*)
set_client_state c (Connected_downloading (file_num file));
-(*?*)
+ (*flag it as a good client *)
c.client_good <- true;
if file_state file = FileDownloading then begin
let position = offset ++ file.file_piece_size *.. num in
-
+
let up = match c.client_uploader with
None -> assert false
| Some up -> up in
- let swarmer = Int64Swarmer.uploader_swarmer up in
-
+ let swarmer = Int64Swarmer.uploader_swarmer up in
+
if !verbose_msg_clients then
(match c.client_ranges_sent with
[] -> lprintf "EMPTY Ranges !!!\n"
- | (p1,p2,r) :: _ ->
+ | (p1,p2,r) :: _ ->
let (x,y) = Int64Swarmer.range_range r in
lprintf "Current range %Ld [%d] (asked %Ld-%Ld[%Ld-%Ld])\n"
position len
- p1 p2 x y
+ p1 p2 x y
);
-
- let old_downloaded =
+
+ let old_downloaded =
Int64Swarmer.downloaded swarmer in
(* List.iter Int64Swarmer.free_range c.client_ranges; *)
Int64Swarmer.received up
position s pos len;
(* List.iter Int64Swarmer.alloc_range c.client_ranges; *)
- let new_downloaded =
+ let new_downloaded =
Int64Swarmer.downloaded swarmer in
-(*Update rate and ammount of data received from client*)
- c.client_downloaded <- c.client_downloaded ++
+ (*Update rate and ammount of data received from client*)
+ c.client_downloaded <- c.client_downloaded ++
(new_downloaded -- old_downloaded);
Rate.update c.client_downloaded_rate (float_of_int len);
- if !verbose_msg_clients then
+ if !verbose_msg_clients then
(match c.client_ranges_sent with
- [] -> lprintf "EMPTY Ranges !!!\n"
+ [] -> lprintf_nl "EMPTY Ranges !!!"
| (p1,p2,r) :: _ ->
let (x,y) = Int64Swarmer.range_range r in
- lprintf "Received %Ld [%d] %Ld-%Ld[%Ld-%Ld] -> %Ld\n"
+ lprintf_nl "Received %Ld [%d] %Ld-%Ld[%Ld-%Ld] -> %Ld"
position len
- p1 p2 x y
+ p1 p2 x y
(new_downloaded -- old_downloaded)
);
(* changed 2.5.28 should have been done before !
if new_downloaded <> old_downloaded then
- add_file_downloaded (as_file file)
+ add_file_downloaded (as_file file)
(new_downloaded -- old_downloaded); *)
end;
begin
@@ -761,7 +760,7 @@
(* Check if the client is still interesting for us... *)
check_if_interesting file c
-
+
| PeerID p ->
c.client_software <- (parse_software p);
c.client_uid <- Sha1.direct_of_string p;
@@ -769,12 +768,11 @@
if c.client_uid = !!client_uid then disconnect_client c Closed_by_user
| BitField p ->
-(*A bitfield is a summary of what a client have*)
-
- begin
+ (*A bitfield is a summary of what a client have*)
+ begin
match c.client_file.file_swarmer with
None -> ()
- | Some swarmer ->
+ | Some swarmer ->
c.client_new_chunks <- [];
let len = String.length p in
let bitmap = String.make (len*8) '0' in
@@ -785,7 +783,7 @@
c.client_new_chunks <- i*8+j :: c.client_new_chunks;
bitmap.[i*8+j] <- '1';
end
- else
+ else
bitmap.[i*8+j] <- '0';
done;
done;
@@ -799,7 +797,7 @@
if bitmap.[i] = '1' && verified.[i] < '2' then
c.client_interesting <- true;
done;
-
+
if !verbose_msg_clients then
lprintf_nl "[BT]: BitField translated";
if !verbose_msg_clients then
@@ -844,15 +842,15 @@
let swarmer = Int64Swarmer.uploader_swarmer up in
let n = Int64.to_int n in
if bitmap.[n] <> '1' then
-
+
let verified = Int64Swarmer.verified_bitmap swarmer in
if verified.[n] < '2' then begin
c.client_interesting <- true;
- send_interested c;
+ send_interested c;
c.client_new_chunks <- n :: c.client_new_chunks;
if c.client_block = None then begin
update_client_bitmap c;
-(* for i = 1 to max_range_requests -
+(* for i = 1 to max_range_requests -
List.length c.client_ranges do
(try get_from_client sock c with _ -> ())
done*)
@@ -862,7 +860,7 @@
| Some _ , None ->lprintf "[BT]: bitmap but no client_uploader\n";
| None, None -> lprintf "[BT]: no bitmap no client_uploader\n";
end
-*)
+*)
end
| Interested ->
@@ -880,7 +878,7 @@
let (ip,port) = c.client_host in
if !verbose_msg_clients then lprintf_nl "[BT]: %s:%d with
software %s : Choke send, but no client bitmap"
(Ip.to_string ip) port (c.client_software)
- | Some up ->
+ | Some up ->
Int64Swarmer.clear_uploader_ranges up
end;
c.client_ranges_sent <- [];
@@ -888,14 +886,14 @@
c.client_choked <- true;
end
- | NotInterested ->
+ | NotInterested ->
c.client_interested <- false;
| Unchoke ->
begin
c.client_choked <- false;
(* remote peer cleared our request : re-request *)
- for i = 1 to max_range_requests -
+ for i = 1 to max_range_requests -
List.length c.client_ranges_sent do
(try get_from_client sock c with _ -> ())
done
@@ -948,7 +946,7 @@
be put in a fifo and dequeued according to
!!max_connections_per_second. (@see commonGlobals.ml)
@param c The client we must connect
-*)
+*)
let connect_client c =
if can_open_connection connection_manager &&
(let (ip,port) = c.client_host in
@@ -962,7 +960,7 @@
then
match c.client_sock with
NoConnection ->
-
+
let token =
add_pending_connection connection_manager (fun token ->
try
@@ -1005,7 +1003,7 @@
TcpBufferedSocket.set_write_controler sock upload_control;
TcpBufferedSocket.set_rtimeout sock 30.;
let file = c.client_file in
-
+
if !verbose_msg_clients then begin
lprintf_nl "[BT]: READY TO DOWNLOAD FILE";
end;
@@ -1054,7 +1052,7 @@
let ip = (Ip.of_inet_addr from_ip) in
if !verbose_sources > 1 then lprintf_nl "[BT]: CONNECTION
RECEIVED FROM %s"
(Ip.to_string (Ip.of_inet_addr from_ip))
- ;
+ ;
(*Reject this connection if we don't want
to bypass the max_connection parameter
*)
@@ -1083,7 +1081,7 @@
in
TcpBufferedSocket.set_read_controler sock download_control;
TcpBufferedSocket.set_write_controler sock upload_control;
-
+
let c = ref None in
TcpBufferedSocket.set_closer sock (fun _ r ->
match !c with
@@ -1097,97 +1095,88 @@
);
set_rtimeout sock 30.;
incr counter;
-(*Again : 'hook' client_parse_header to the socket*)
+ (*Again : 'hook' client_parse_header to the socket*)
set_bt_sock sock !verbose_msg_clients
(BTHeader (client_parse_header !counter c false));
end
else
-(*don't forget to close the incoming sock if we can't
- open a new connection*)
+ (*don't forget to close the incoming sock if we can't
+ open a new connection
+ *)
Unix.close s
| _ -> ()
) in
listen_sock := Some s;
()
with e ->
- lprintf "Exception %s while init bittorrent server\n"
+ lprintf_nl "[BT]: Exception %s while init bittorrent server"
(Printexc2.to_string e)
-
-
-
-
-
(** This function send keepalive messages to all connected clients
(and update socket lifetime)
-*)
+*)
let send_pings () =
List.iter (fun file ->
Hashtbl.iter (fun _ c ->
match c.client_sock with
- | Connection sock ->
+ | Connection sock ->
send_client c Ping;
- set_lifetime sock 130.;
+ set_lifetime sock 130.;
| _ -> ()
) file.file_clients
) !current_files
-
open Bencode
-
-
+
(** Check each clients for a given file if they are connected.
If they aren't, try to connect them
*)
-let resume_clients file =
+let resume_clients file =
Hashtbl.iter (fun _ c ->
try
- match c.client_sock with
+ match c.client_sock with
| Connection sock -> ()
- (*i think this one is not realy usefull for debugging
- lprintf "RESUME: Client is already connected\n"; *)
+ (*i think this one is not really usefull for debugging
+ lprintf "[BT] : RESUME: Client is already connected\n"; *)
| _ ->
- (try
- (*test if we can connect client according to the its
+ (try
+ (*test if we can connect client according to the its
connection_control.
- Currently the delay between two try is 120 seconds.
+ Currently the delay between two try is 120 seconds.
*)
- if connection_can_try c.client_connection_control then
+ if connection_can_try c.client_connection_control then
connect_client c
- else
- print_control c.client_connection_control
+ else
+ print_control c.client_connection_control
with _ -> ())
with e -> ()
(* lprintf "Exception %s in resume_clients\n" (Printexc2.to_string e) *)
) file.file_clients
-
-
-
(** In this function we initiate a connection to the file tracker
to get sources.
@param file The file for which we want some sources
@param url Url of the tracker
- If we have less than !!ask_tracker_threshold sources
+ If we have less than !!ask_tracker_threshold sources
and if we respect the file_tracker_interval then
we really ask sources to the tracker
*)
-let get_sources_from_tracker file =
- let f t filename =
+let get_sources_from_tracker file =
+ let f t filename =
(*This is the function which will be called by the http client
for parsing the response*)
let v = Bencode.decode (File.to_string filename) in
-
+
t.tracker_interval <- 600;
t.tracker_last_clients_num <- 0;
match v with
Dictionary list ->
List.iter (fun (key,value) ->
match (key, value) with
- String "interval", Int n ->
+ String "interval", Int n ->
t.tracker_interval <- Int64.to_int n
| String "failure reason", String failure ->
lprintf "Failure from BT-Tracker %s in file: %s Reason: %s\n"
t.tracker_url file.file_name failure
@@ -1198,11 +1187,11 @@
match v with
Dictionary list ->
t.tracker_last_clients_num <-
t.tracker_last_clients_num + 1;
-
+
let peer_id = ref Sha1.null in
let peer_ip = ref Ip.null in
let port = ref 0 in
-
+
List.iter (fun v ->
match v with
String "peer id", String id ->
@@ -1239,7 +1228,7 @@
let ip = Ip.of_ints (get_uint8 s pos,get_uint8 s (pos+1),
get_uint8 s (pos+2),get_uint8 s (pos+3))
and port = get_int16 s (pos+4)
- in
+ in
ignore( new_client file Sha1.null (ip,port));
t.tracker_last_clients_num <- t.tracker_last_clients_num +
1;
@@ -1267,7 +1256,7 @@
connect_trackers file event f
-(** Check to see if file is finished, if not
+(** Check to see if file is finished, if not
try to get sources for it
*)
let recover_files () =
@@ -1280,7 +1269,7 @@
FileDownloading ->
(try get_sources_from_tracker file with _ -> ())
| FileShared ->
- (try
+ (try
connect_trackers file "" (fun _ _ -> ()) with _ -> ())
| FilePaused -> () (*when we are paused we do nothing, not even
logging this vvvv*)
| s -> lprintf_nl "[BT]: Other state %s!!" (string_of_state s)
@@ -1290,11 +1279,11 @@
(**
- Send a Piece message
+ Send a Piece message
for one of the request of client
@param sock The socket of the client
@param c The client
-*)
+*)
let rec iter_upload sock c =
match c.client_upload_requests with
[] -> ()
@@ -1305,7 +1294,7 @@
end else
if c.client_allowed_to_write >= len then begin
c.client_upload_requests <- tail;
-
+
let file = c.client_file in
let offset = pos ++ file.file_piece_size *.. num in
c.client_allowed_to_write <- c.client_allowed_to_write -- len;
@@ -1334,8 +1323,6 @@
(* lprintf "client is waiting for another piece\n"; *)
ready_for_upload (as_client c)
end
-
-
(**
@@ -1345,14 +1332,14 @@
@param c the client to which we can send some bytes
@param allowed the amount of bytes we can send to client
*)
-let client_can_upload c allowed =
+let client_can_upload c allowed =
(* lprintf "allowed to upload %d\n" allowed; *)
do_if_connected c.client_sock (fun sock ->
match c.client_upload_requests with
[] -> ()
| _ :: tail ->
CommonUploads.consume_bandwidth allowed;
- c.client_allowed_to_write <-
+ c.client_allowed_to_write <-
c.client_allowed_to_write ++ (Int64.of_int allowed);
iter_upload sock c
)
@@ -1360,7 +1347,7 @@
(** Probably useless now
*)
-let file_resume file =
+let file_resume file =
(*useless with no saving of sources
resume_clients file;
*)