[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Mldonkey-commits] Changes to mldonkey/src/networks/fasttrack/fasttrackS
From: |
mldonkey-commits |
Subject: |
[Mldonkey-commits] Changes to mldonkey/src/networks/fasttrack/fasttrackServers.ml |
Date: |
Sun, 07 Aug 2005 08:57:36 -0400 |
Index: mldonkey/src/networks/fasttrack/fasttrackServers.ml
diff -u mldonkey/src/networks/fasttrack/fasttrackServers.ml:1.22
mldonkey/src/networks/fasttrack/fasttrackServers.ml:1.23
--- mldonkey/src/networks/fasttrack/fasttrackServers.ml:1.22 Mon Aug 1
23:53:54 2005
+++ mldonkey/src/networks/fasttrack/fasttrackServers.ml Sun Aug 7 12:57:22 2005
@@ -44,12 +44,12 @@
open FasttrackComplexOptions
open FasttrackProto
-let bootstrap_from_imesh = define_option fasttrack_section
+let bootstrap_from_imesh = define_option fasttrack_section
["bootstrap_from_imesh"]
- "(only for development tests)"
+ "(only for development tests)"
bool_option true
-let server_parse_after s gconn sock =
+let server_parse_after s gconn sock =
try
match s.server_ciphers with
None -> assert false
@@ -72,26 +72,26 @@
iter ()
in
iter ()
- with e ->
+ with e ->
lprintf "Exception %s in server_parse_after\n"
(Printexc2.to_string e);
close sock (Closed_for_error "Reply not understood")
let server_connection_hook = ref (None : (server -> unit) option)
-
+
let greet_supernode s =
(match !server_connection_hook with
- None ->
+ None ->
server_send s TcpMessages.DirectPacket (
TcpMessages.NodeInfoReq (
- client_ip s.server_sock,
+ client_ip s.server_sock,
!!client_port,
default_bandwidth,
- client_name ()))
+ client_name ()))
| Some f -> f s)
(* ; server_send_ping s *)
-
-let server_parse_netname s gconn sock =
+
+let server_parse_netname s gconn sock =
let b = TcpBufferedSocket.buf sock in
let len = b.len in
let start_pos = b.pos in
@@ -99,26 +99,26 @@
let buf = b.buf in
let net = String.sub buf start_pos len in
if !verbose_msg_raw then
- lprintf "net:[%s]\n" (String.escaped net);
+ lprintf "net:[%s]\n" (String.escaped net);
let rec iter pos =
- if pos < end_pos then
+ if pos < end_pos then
if buf.[pos] = '\000' then begin
let netname = String.sub buf start_pos (pos-start_pos) in
if !verbose_msg_raw then
- lprintf "netname: [%s]\n" (String.escaped netname);
+ lprintf "netname: [%s]\n" (String.escaped netname);
buf_used b (pos-start_pos+1);
match s.server_ciphers with
None -> assert false
| Some ciphers ->
- gconn.gconn_handler <-
+ gconn.gconn_handler <-
CipherReader (ciphers.in_cipher, server_parse_after s);
greet_supernode s
end else
iter (pos+1)
in
iter start_pos
-
-let server_parse_cipher s gconn sock =
+
+let server_parse_cipher s gconn sock =
H.connected s.server_host;
let b = TcpBufferedSocket.buf sock in
if b.len >= 8 then
@@ -129,32 +129,32 @@
lprintf "Cipher received from server\n";
get_cipher_from_packet b.buf b.pos ciphers.in_cipher;
init_cipher ciphers.in_cipher;
-
+
xor_ciphers ciphers.out_cipher ciphers.in_cipher;
init_cipher ciphers.out_cipher;
-
+
buf_used b 8;
server_crypt_and_send s ciphers.out_cipher (network_name ^ "\000");
gconn.gconn_handler <- CipherReader (ciphers.in_cipher,
server_parse_netname s);
if !verbose_msg_raw then
lprintf "waiting for netname\n"
-
-let client_cipher_seed () =
+
+let client_cipher_seed () =
(* Int32.of_int (Random.int max_int) *)
- Int32.of_string "0x0fACB1238"
+ Int32.of_string "0x0fACB1238"
let connection_header_hook = ref None
-
-let connect_server h =
+
+let connect_server h =
let s = match h.host_server with
- None ->
+ None ->
let s = new_server h.host_addr h.host_port in
h.host_server <- Some s;
s
| Some s -> s
in
match s.server_sock with
- | NoConnection ->
+ | NoConnection ->
incr nservers;
let token =
add_pending_connection connection_manager (fun token ->
@@ -165,7 +165,7 @@
failwith "Invalid IP for server\n";
let port = s.server_host.host_port in
if !verbose_msg_servers then
- lprintf "CONNECT TO %s:%d\n"
+ lprintf "CONNECT TO %s:%d\n"
(Ip.string_of_addr h.host_addr) port;
H.set_request h Tcp_Connect;
H.try_connect h;
@@ -178,27 +178,27 @@
let ip = Ip.to_inet_addr ip in
let sock = connect token "fasttrack to server"
ip port
- (fun sock event ->
+ (fun sock event ->
match event with
- BASIC_EVENT (RTIMEOUT|LTIMEOUT) ->
+ BASIC_EVENT (RTIMEOUT|LTIMEOUT) ->
(* lprintf "RTIMEOUT\n"; *)
disconnect_from_server nservers s Closed_for_timeout
| _ -> ()
) in
TcpBufferedSocket.set_read_controler sock download_control;
TcpBufferedSocket.set_write_controler sock upload_control;
-
+
set_server_state s Connecting;
s.server_sock <- Connection sock;
incr nservers;
set_fasttrack_sock sock !verbose_msg_servers
(Reader (server_parse_cipher s)
);
- set_closer sock (fun _ error ->
+ set_closer sock (fun _ error ->
(* lprintf "CLOSER %s\n" error; *)
disconnect_from_server nservers s error);
set_rtimeout sock !!server_connection_timeout;
-
+
let in_cipher = create_cipher () in
let out_cipher = create_cipher () in
s.server_ciphers <- Some {
@@ -208,19 +208,19 @@
out_xinu = Int64.of_int 0x51;
};
set_cipher out_cipher (client_cipher_seed ()) 0x29;
-
+
let s = String.create 12 in
-
+
(match !connection_header_hook with
- None ->
+ None ->
s.[0] <- '\250';
s.[1] <- '\000';
s.[2] <- '\182';
- s.[3] <- '\043';
+ s.[3] <- '\043';
| Some f -> f s);
-
+
cipher_packet_set out_cipher s 4;
-
+
if !verbose_msg_raw then begin
lprintf "SENDING %s\n" (String.escaped s);
AnyEndian.dump s;
@@ -232,7 +232,7 @@
in
s.server_sock <- ConnectionWaiting token;
| _ -> ()
-
+
let get_file_from_source c file =
try
if connection_can_try c.client_connection_control then begin
@@ -240,7 +240,7 @@
match c.client_user.user_kind with
Indirect_location ("", uid) ->
(*
- lprintf "++++++ ASKING FOR PUSH +++++++++\n";
+ lprintf "++++++ ASKING FOR PUSH +++++++++\n";
(* do as if connection failed. If it connects, connection will be set to OK *)
connection_failed c.client_connection_control;
@@ -259,30 +259,30 @@
end
with e ->
lprintf "get_file_from_source: exception %s\n" (Printexc2.to_string e)
-
+
let exit = Exit
-
+
let disconnect_server s r =
match s.server_sock with
| Connection sock -> close sock r
- | ConnectionWaiting token ->
+ | ConnectionWaiting token ->
cancel_token token;
s.server_sock <- NoConnection;
free_ciphers s
-
+
| _ -> ()
-let really_recover_file file =
+let really_recover_file file =
List.iter (fun s ->
List.iter (fun ss ->
if not (Fifo.mem s.server_searches ss) then
- Fifo.put s.server_searches ss
+ Fifo.put s.server_searches ss
) file.file_searches
) !connected_servers
-
+
let really_download_file (r : CommonTypes.result_info) =
let rec iter uids =
- match uids with
+ match uids with
uid :: tail ->
(match Uid.to_uid uid with
Md5Ext hash -> hash, Uid.to_file_string uid
@@ -290,9 +290,8 @@
| [] -> raise IgnoreNetwork
in
let hash,file_temp = iter r.result_uids in
-
-
- let file = new_file file_temp (List.hd r.result_names)
+
+ let file = new_file file_temp (List.hd r.result_names)
r.result_size [Uid.create (Md5Ext hash)] in
if !verbose then
lprintf "DOWNLOAD FILE %s\n" file.file_name;
@@ -304,11 +303,11 @@
List.iter (fun (user, _) ->
let c = new_client user.user_kind in
add_download file c ();
- get_file_from_source c file;
+ get_file_from_source c file;
) !sources;
end;
file
-
+
let ask_for_files () = (* called every minute *)
List.iter (fun file ->
List.iter (fun c ->
@@ -323,11 +322,11 @@
FileUidSearch (file,file_hash) ->
if file_state file = FileDownloading then
server_send s M.DirectPacket
- (M.SearchReq
+ (M.SearchReq
(32, ss.search_uid, M.QueryLocationReq file_hash))
- | UserSearch (_, words, (realm, tags)) ->
+ | UserSearch (_, words, (realm, tags)) ->
- let realm =
+ let realm =
match realm with
"audio" -> 0x21
| "video" -> 0x22
@@ -338,13 +337,13 @@
in
server_send s M.DirectPacket
- (M.SearchReq
- (32, ss.search_uid, M.QueryFilesReq
+ (M.SearchReq
+ (32, ss.search_uid, M.QueryFilesReq
(words, realm, tags)))
with _ -> ()
) !connected_servers;
()
-
+
let _ =
server_ops.op_server_disconnect <- (fun s ->
disconnect_server s Closed_by_user);
@@ -352,16 +351,16 @@
disconnect_server s Closed_by_user
)
-let nranges file =
- Int64.to_int (Int64.div (file_size file)
+let nranges file =
+ Int64.to_int (Int64.div (file_size file)
min_range_size) + 5
-
-let manage_hosts () =
+
+let manage_hosts () =
H.manage_hosts ();
List.iter (fun file ->
if file_state file = FileDownloading then
try
-(* For each file, we allow only (nranges+5) simultaneous communications,
+(* For each file, we allow only (nranges+5) simultaneous communications,
to prevent too many clients from saturing the line for only one file. *)
let max_nconnected_clients = nranges file in
while file.file_nconnected_clients < max_nconnected_clients do
@@ -372,7 +371,6 @@
with _ -> ()
) !current_files
-
let rec find_ultrapeer queue =
let (next,h) = Queue.head queue in
try
@@ -386,22 +384,22 @@
let ft_boot () =
let imesh_ip = Ip.addr_of_string "fm2.imesh.com" in
- let (h : host) =
+ let (h : host) =
if !verbose then lprintf_nl "FT: Bootstrapping from Imesh %s"
(Ip.string_of_addr imesh_ip);
- H.new_host imesh_ip 1214 IndexServer
+ H.new_host imesh_ip 1214 IndexServer
in
- connect_server h
+ connect_server h
let try_connect_ultrapeer connect =
(* lprintf "try_connect_ultrapeer....\n"; *)
- let h =
+ let h =
try
find_ultrapeer ultrapeers_waiting_queue
with _ ->
(* lprintf "not in ultrapeers_waiting_queue\n"; *)
if !!bootstrap_from_imesh then
let imesh_ip = Ip.addr_of_string "fm2.imesh.com" in
- let (h : host) =
+ let (h : host) =
if !verbose then lprintf_nl "FT: Bootstrapping from Imesh %s"
(Ip.string_of_addr imesh_ip);
H.new_host (Ip.addr_of_string "fm2.imesh.com") 1214 IndexServer in
find_ultrapeer peers_waiting_queue
@@ -409,7 +407,7 @@
in
(* lprintf "contacting..\n"; *)
connect h
-
+
let connect_servers connect =
(* lprintf "connect_servers %d %d\n" !nservers !!max_ultrapeers; *)
(if !!max_ultrapeers > List.length !connected_servers then
@@ -420,10 +418,10 @@
try_connect_ultrapeer connect
done
with _ -> ())
-
+
(* Looks like there is no ping to send in Fasttrack? *)
let send_pings () = ()
-
+
let send_query ss =
let s_uid = ss.search_uid in
let module M = TcpMessages in
@@ -442,15 +440,14 @@
M.QueryFilesReq (words, realm, tags)
| FileUidSearch (_, file_hash) ->
M.QueryLocationReq file_hash
- in
- List.iter (fun s ->
+ in
+ List.iter (fun s ->
FasttrackProto.server_send s M.DirectPacket (
M.SearchReq (32, s_uid, t))) !connected_servers
-(*
+(*
TODO:
-push request: we send a push to the server when we cannot connect to
-a particular client. The client by connecting to us with a
+push request: we send a push to the server when we cannot connect to
+a particular client. The client by connecting to us with a
"GIVE <push_id>\r\n" request, to which we can reply by a "GET ...." *)
-