diff --git a/Makefile b/Makefile index c9e8f78..c5adb09 100644 --- a/Makefile +++ b/Makefile @@ -28,8 +28,11 @@ NO_LIBS_opt= NO_STATIC_LIBS_opt= NO_CMXA= -LIBS_byte=-custom bigarray.cma unix.cma str.cma -LIBS_opt= bigarray.cmxa unix.cmxa str.cmxa +BITSTRING_DIR="$(shell ocamlfind query bitstring)" +#BITSTRING_DIR=/usr/lib/ocaml/bitstring + +LIBS_byte=-custom bigarray.cma unix.cma str.cma -I $(BITSTRING_DIR) bitstring.cma +LIBS_opt= bigarray.cmxa unix.cmxa str.cmxa -I $(BITSTRING_DIR) bitstring.cmxa BIGARRAY_LIBS_opt=bigarray.cmxa BIGARRAY_LIBS_byte=bigarray.cma @@ -91,7 +94,7 @@ SRC_FILETP=src/networks/fileTP SUBDIRS=$(CDK) $(LIB) $(RSS) $(XML) $(NET) tools \ $(COMMON) $(DRIVER) $(MP3) src/config/$(OS_FILES) -INCLUDES += $(foreach file, $(SUBDIRS), -I $(file)) +INCLUDES += $(foreach file, $(SUBDIRS), -I $(file)) -I $(BITSTRING_DIR) CFLAGS:=$(CFLAGS) $(CONFIG_INCLUDES) $(GTKCFLAGS) $(GD_CFLAGS) @@ -426,6 +429,7 @@ BITTORRENT_SRCS= \ $(SRC_BITTORRENT)/bTRate.ml \ $(SRC_BITTORRENT)/bTTypes.ml \ $(SRC_BITTORRENT)/bTOptions.ml \ + $(SRC_BITTORRENT)/bTUdpTracker.ml \ $(SRC_BITTORRENT)/bTProtocol.ml \ $(SRC_BITTORRENT)/bTTorrent.ml \ $(SRC_BITTORRENT)/bTGlobals.ml \ @@ -1482,11 +1486,14 @@ ZOGSOURCES += $(BITTORRENT_ZOG:.zog=.ml) MLTSOURCES += $(BITTORRENT_MLT:.mlt=.ml) MLPSOURCES += $(BITTORRENT_MLP:.mlcpp=.ml) +$(SRC_BITTORRENT)/bTUdpTracker.ml: $(SRC_BITTORRENT)/bTUdpTracker.mlp + camlp4of -I $(BITSTRING_DIR) bitstring.cma bitstring_persistent.cma pa_bitstring.cmo -impl $< -o $@ + build/mlbt.cmxa: $(BITTORRENT_OBJS) $(BITTORRENT_CMXS) - $(OCAMLOPT) -a -o $@ $(BITTORRENT_OBJS) $(LIBS_flags) $(_LIBS_flags) $(BITTORRENT_CMXS) + $(OCAMLOPT) -a -o $@ -I $(BITSTRING_DIR) $(BITTORRENT_OBJS) $(LIBS_flags) $(_LIBS_flags) $(BITTORRENT_CMXS) build/mlbt.cma: $(BITTORRENT_OBJS) $(BITTORRENT_CMOS) - $(OCAMLC) -a -o $@ $(BITTORRENT_OBJS) $(LIBS_flags) $(_LIBS_flags) $(BITTORRENT_CMOS) + $(OCAMLC) -a -o $@ -I $(BITSTRING_DIR) $(BITTORRENT_OBJS) $(LIBS_flags) $(_LIBS_flags) $(BITTORRENT_CMOS) diff --git a/src/networks/bittorrent/bTClients.ml b/src/networks/bittorrent/bTClients.ml index ff27686..d58e3d4 100644 --- a/src/networks/bittorrent/bTClients.ml +++ b/src/networks/bittorrent/bTClients.ml @@ -18,7 +18,8 @@ *) -(** Functions used in client<->client communication +(** Functions used in client<->client communication + and also client<->tracker *) (** A peer (or client) is always a remote peer in this file. @@ -73,6 +74,125 @@ let http11_ok = "HTTP/1.1 200 OK" let next_uploaders = ref ([] : BTTypes.client list) let current_uploaders = ref ([] : BTTypes.client list) +(** Check that client is valid and record it *) +let maybe_new_client file id ip port = + let cc = Geoip.get_country_code_option ip in + if id <> !!client_uid + && ip != Ip.null + && port <> 0 + && (match !Ip.banned (ip, cc) with + | None -> true + | Some reason -> + if !verbose_connect then + lprintf_file_nl (as_file file) "%s:%d blocked: %s" (Ip.to_string ip) port reason; + false) + then + ignore (new_client file id (ip,port) cc); + if !verbose_sources > 1 then + lprintf_file_nl (as_file file) "Received %s:%d" (Ip.to_string ip) port + + +let resume_clients_hook = ref (fun _ -> assert false) + +include struct + +(* open modules locally *) +open BTUdpTracker +open UdpSocket + +let string_of_event = function + | READ_DONE -> "READ_DONE" + | WRITE_DONE -> "WRITE_DONE" + | CAN_REFILL -> "CAN_REFILL" + | BASIC_EVENT e -> match e with + | CLOSED reason -> "CLOSED " ^ (string_of_reason reason) + | RTIMEOUT -> "RTIMEOUT" + | WTIMEOUT -> "WTIMEOUT" + | LTIMEOUT -> "LTIMEOUT" + | CAN_READ -> "CAN_READ" + | CAN_WRITE -> "CAN_WRITE" + +(** talk to udp tracker and parse response + except of parsing should perform everything that + talk_to_tracker's inner function does FIXME refactor both + + Better create single global udp socket and use it for all + tracker requests and distinguish trackers by txn? FIXME? + *) +let talk_to_udp_tracker host port args file t need_sources = + try + lprintf_nl "udpt start with %s:%d" host port; + let addr = try (Unix.gethostbyname host).Unix.h_addr_list.(0) with exn -> failwith ("failed to resolve " ^ host) in + let ip = Ip.of_inet_addr addr in + lprintf_nl "udpt resolved to ip %s" (Ip.to_string ip); + let socket = create Unix.inet_addr_any 0 (fun sock event -> +(* lprintf_nl "udpt got event %s for %s" (string_of_event event) host *) + match event with + | WRITE_DONE | CAN_REFILL -> () + | READ_DONE -> assert false (* set_reader prevents this *) + | BASIC_EVENT x -> match x with + | CLOSED _ -> () + | CAN_READ | CAN_WRITE -> assert false (* udpSocket implementation prevents this *) + | LTIMEOUT | WTIMEOUT | RTIMEOUT -> close sock (Closed_for_error "udpt timeout")) + in + BasicSocket.set_wtimeout (sock socket) 120.; + BasicSocket.set_rtimeout (sock socket) 120.; + let txn = Random.int32 Int32.max_int in + lprintf_nl "udpt txn %ld for %s" txn host; + write socket false (connect_request txn) ip port; + set_reader socket (fun _ -> + let p = read socket in + let conn = connect_response p.udp_content txn in + lprintf_nl "udpt connection_id %Ld for %s" conn host; + let txn = Random.int32 Int32.max_int in + lprintf_nl "udpt txn' %ld for host %s" txn host; + let int s = Int64.of_string (List.assoc s args) in + let req = announce_request conn txn + ~info_hash:(List.assoc "info_hash" args) + ~peer_id:(List.assoc "peer_id" args) + (int "downloaded",int "left",int "uploaded") + (match List.assoc "event" args with + | "completed" -> 1l + | "started" -> 2l + | "stopped" -> 3l + | "" -> 0l + | s -> lprintf_nl "udpt event %s? for %s" s host; 0l) + ~numwant:(try Int32.of_string (List.assoc "numwant" args) with _ -> -1l) + (int_of_string (List.assoc "port" args)) + in + write socket false req ip port; + set_reader socket (fun _ -> + let p = read socket in + + t.tracker_last_conn <- last_time (); + file.file_tracker_connected <- true; + t.tracker_interval <- 600; + t.tracker_min_interval <- 600; + if need_sources then t.tracker_last_clients_num <- 0; + + let (interval,clients) = announce_response p.udp_content txn in + lprintf_nl "udpt got interval %ld clients %d for host %s" interval (List.length clients) host; + if interval > 0l then + begin + t.tracker_interval <- Int32.to_int interval; + if t.tracker_min_interval > t.tracker_interval then + t.tracker_min_interval <- t.tracker_interval + end; + List.iter (fun (ip',port) -> + let ip = Ip.of_int64 (Int64.logand 0xFFFFFFFFL (Int64.of_int32 ip')) in + lprintf_nl "udpt got %s:%d" (Ip.to_string ip) port; + t.tracker_last_clients_num <- t.tracker_last_clients_num + 1; + maybe_new_client file Sha1.null ip port + ) clients; + close socket Closed_by_user; + lprintf_nl "udpt interact done for %s" host; + if need_sources then !resume_clients_hook file + )) + with + exn -> + lprintf_nl "udpt interact exn %s" (Printexc2.to_string exn) + +end (* include *) (** In this function we connect to a tracker. @@ -173,28 +293,30 @@ let connect_trackers file event f = then begin (* if we already tried to connect but failed, disable tracker, but allow re-enabling *) - if file.file_tracker_connected && t.tracker_last_clients_num = 0 && - t.tracker_last_conn < 1 then begin + (* FIXME t.tracker_last_conn < 1 only at first connect, so later failures will stay undetected! *) + if file.file_tracker_connected && t.tracker_last_clients_num = 0 && t.tracker_last_conn < 1 then + begin if !verbose_msg_servers then - lprintf_nl "Request error from tracker: disabling %s" t.tracker_url; - t.tracker_status <- Disabled (intern "MLDonkey: Request error from tracker") + lprintf_nl "Request error from tracker: disabling %s" (show_tracker_url t.tracker_url); + t.tracker_status <- Disabled (intern "MLDonkey: Request error from tracker") end (* Send request to tracker *) - else begin - let args = if String.length t.tracker_id > 0 then - ("trackerid", t.tracker_id) :: args else args - in - let args = if String.length t.tracker_key > 0 then - ("key", t.tracker_key) :: args else args - in - if !verbose_msg_servers then - lprintf_nl "get_sources_from_tracker: tracker_connected:%s id:%s key:%s last_clients:%i last_conn-last_time:%i file: %s" - (string_of_bool file.file_tracker_connected) - t.tracker_id t.tracker_key t.tracker_last_clients_num - (t.tracker_last_conn - last_time()) file.file_name; - + else + let args = if String.length t.tracker_id > 0 then + ("trackerid", t.tracker_id) :: args else args + in + let args = if String.length t.tracker_key > 0 then + ("key", t.tracker_key) :: args else args + in + if !verbose_msg_servers then + lprintf_nl "connect_trackers: connected:%s id:%s key:%s last_clients:%i last_conn-last_time:%i numwant:%s file: %s" + (string_of_bool file.file_tracker_connected) + t.tracker_id t.tracker_key t.tracker_last_clients_num + (t.tracker_last_conn - last_time()) (try List.assoc "numwant" args with _ -> "_") file.file_name; + + match t.tracker_url with + | `Http url -> let module H = Http_client in - let url = t.tracker_url in let r = { H.basic_request with H.req_url = Url.of_string ~args: args url; @@ -206,19 +328,20 @@ let connect_trackers file event f = if !verbose_msg_servers then lprintf_nl "Request sent to tracker %s for file: %s" - t.tracker_url file.file_name; + url file.file_name; H.wget r (fun fileres -> t.tracker_last_conn <- last_time (); file.file_tracker_connected <- true; f t fileres) - end + | `Other url -> assert false (* should have been disabled *) + | `Udp (host,port) -> talk_to_udp_tracker host port args file t need_sources end else if !verbose_msg_servers then lprintf_nl "Request NOT sent to tracker %s - next request in %ds for file: %s" - t.tracker_url (t.tracker_interval - (last_time () - t.tracker_last_conn)) file.file_name + (show_tracker_url t.tracker_url) (t.tracker_interval - (last_time () - t.tracker_last_conn)) file.file_name ) enabled_trackers let start_upload c = @@ -354,7 +477,9 @@ let disconnect_clients file = let download_finished file = if List.memq file !current_files then begin - connect_trackers file "completed" (fun _ _ -> ()); (*must be called before swarmer gets removed from file*) + connect_trackers file "completed" (fun _ _ -> + lprintf_file_nl (as_file file) "Tracker return: completed %s" file.file_name; + ()); (*must be called before swarmer gets removed from file*) (*CommonComplexOptions.file_completed*) file_completed (as_file file); (* Remove the swarmer for this file as it is not useful anymore... *) @@ -1304,6 +1429,9 @@ let resume_clients file = lprintf_file_nl (as_file file) "Exception %s in resume_clients" (Printexc2.to_string e) ) file.file_clients +let () = + resume_clients_hook := resume_clients + (** Check if the value replied by the tracker is correct. @param key the name of the key @param n the value to check @@ -1313,17 +1441,15 @@ let resume_clients file = let chk_keyval key n url name = let int_n = (Int64.to_int n) in if !verbose_msg_clients then - lprintf_nl "Reply from %s in file: %s has %s: %d" url name key int_n; + lprintf_nl "Reply from %s in file: %s has %s: %d" (show_tracker_url url) name key int_n; if int_n > -1 then int_n else begin - lprintf_nl "Reply from %s in file: %s has an invalid %s value: %d" url name key int_n; + lprintf_nl "Reply from %s in file: %s has an invalid %s value: %d" (show_tracker_url url) name key int_n; 0 end - -(** In this function we initiate a connection to the file tracker - to get sources. +(** In this function we interact with the tracker @param file The file for which we want some sources @param url Url of the tracker If we have less than !!ask_tracker_threshold sources @@ -1335,6 +1461,7 @@ let get_sources_from_tracker file = (*This is the function which will be called by the http client for parsing the response *) + let tracker_url = show_tracker_url t.tracker_url in let tracker_reply = try File.to_string filename @@ -1344,7 +1471,7 @@ let get_sources_from_tracker file = match tracker_reply with | "" -> if !verbose_connect then - lprintf_file_nl (as_file file) "Empty reply from tracker"; + lprintf_file_nl (as_file file) "Empty reply from tracker %s" tracker_url; Bencode.decode "" | _ -> Bencode.decode tracker_reply in @@ -1359,7 +1486,7 @@ let get_sources_from_tracker file = | _ -> (match t.tracker_status with | Disabled_failure (i, _) -> lprintf_file_nl (as_file file) "Received good message from Tracker %s in file: %s after %d bad attempts" - t.tracker_url file.file_name i + (show_tracker_url t.tracker_url) file.file_name i | _ -> ()); (* Received good message from tracker after failures, re-enable tracker *) t.tracker_status <- Enabled); @@ -1373,9 +1500,9 @@ let get_sources_from_tracker file = lprintf_file_nl (as_file file) "Failure no. %d%s from Tracker %s in file: %s Reason: %s" (match t.tracker_status with | Disabled_failure (i,_) -> i | _ -> 1) (if !!tracker_retries = 0 then "" else Printf.sprintf "/%d" !!tracker_retries) - t.tracker_url file.file_name (Charset.to_utf8 failure) + tracker_url file.file_name (Charset.to_utf8 failure) | String "warning message", String warning -> - lprintf_file_nl (as_file file) "Warning from Tracker %s in file: %s Reason: %s" t.tracker_url file.file_name warning + lprintf_file_nl (as_file file) "Warning from Tracker %s in file: %s Reason: %s" tracker_url file.file_name warning | String "interval", Int n -> t.tracker_interval <- chk_keyval (Bencode.print key) n t.tracker_url file.file_name; (* in case we don't receive "min interval" *) @@ -1406,11 +1533,11 @@ let get_sources_from_tracker file = | String "key", String n -> t.tracker_key <- n; if !verbose_msg_clients then - lprintf_file_nl (as_file file) "%s in file: %s has key: %s" t.tracker_url file.file_name n + lprintf_file_nl (as_file file) "%s in file: %s has key: %s" tracker_url file.file_name n | String "tracker id", String n -> t.tracker_id <- n; if !verbose_msg_clients then - lprintf_file_nl (as_file file) "%s in file: %s has tracker id %s" t.tracker_url file.file_name n + lprintf_file_nl (as_file file) "%s in file: %s has tracker id %s" tracker_url file.file_name n | String "peers", List list -> List.iter (fun v -> @@ -1478,6 +1605,11 @@ let get_sources_from_tracker file = lprintf_file_nl (as_file file) "get_sources_from_tracker: got %i source(s) for file %s" t.tracker_last_clients_num file.file_name; resume_clients file + (* + lprintf_file_nl (as_file file) "talk_to_tracker: got %i source(s) from %s" + t.tracker_last_clients_num tracker_url; + if need_sources then resume_clients file + *) | _ -> assert false in diff --git a/src/networks/bittorrent/bTComplexOptions.ml b/src/networks/bittorrent/bTComplexOptions.ml index 8ce1a77..566729d 100644 --- a/src/networks/bittorrent/bTComplexOptions.ml +++ b/src/networks/bittorrent/bTComplexOptions.ml @@ -219,7 +219,7 @@ let file_to_value file = "file_uploaded", int64_to_value (file.file_uploaded); "file_id", string_to_value (Sha1.to_string file.file_id); "file_trackers", (list_to_value string_to_value) - (List.map (fun t -> t.tracker_url) file.file_trackers); + (List.map (fun t -> show_tracker_url t.tracker_url) file.file_trackers); (* OK, but I still don't like the idea of forgetting all the clients. We should have a better strategy, ie rating the clients and connecting to them depending on the results of our last connections. And then, diff --git a/src/networks/bittorrent/bTGlobals.ml b/src/networks/bittorrent/bTGlobals.ml index 1f23a51..dd92e0f 100644 --- a/src/networks/bittorrent/bTGlobals.ml +++ b/src/networks/bittorrent/bTGlobals.ml @@ -230,17 +230,31 @@ let create_temp_file file_temp file_files file_state = file_temp); file_fd -let can_handle_tracker t = - String2.check_prefix (String.lowercase t.tracker_url) "http://" +let make_tracker_url url = + let url = String.lowercase url in + if String2.check_prefix url "http://" then + `Http url + else + try Scanf.sscanf url "udp://%s@:%d" (fun host port -> `Udp (host,port)) + with _ -> `Other url + +(** invariant: [make_tracker_url (show_tracker_url url) = url] *) +let show_tracker_url : tracker_url -> string = function + | `Http url | `Other url -> url + | `Udp (host,port) -> Printf.sprintf "udp://%s:%d" host port + +let can_handle_tracker = function + | `Http _ -> false + | `Udp _ -> true + | `Other _ -> false let rec set_trackers file file_trackers = match file_trackers with | [] -> () | url :: q -> - if not (List.exists (fun tracker -> - tracker.tracker_url = url - ) file.file_trackers) then - let t = { + let url = make_tracker_url url in + if not (List.exists (fun tracker -> tracker.tracker_url = url) file.file_trackers) then + let t = { tracker_url = url; tracker_interval = 600; tracker_min_interval = 600; @@ -253,12 +267,11 @@ let rec set_trackers file file_trackers = tracker_torrent_last_dl_req = 0; tracker_id = ""; tracker_key = ""; - tracker_status = Enabled - } in - if not (can_handle_tracker t) then - t.tracker_status <- Disabled_mld (intern "Tracker type not supported"); - file.file_trackers <- t :: file.file_trackers; - set_trackers file q + tracker_status = if can_handle_tracker url then Enabled + else Disabled_mld (intern "Tracker type not supported") + } in + file.file_trackers <- t :: file.file_trackers; + set_trackers file q let new_file file_id t torrent_diskname file_temp file_state user group = try @@ -867,7 +880,7 @@ let remove_client c = let remove_tracker url file = if !verbose_msg_servers then List.iter (fun tracker -> - lprintf_nl "Old tracker list :%s" tracker.tracker_url + lprintf_nl "Old tracker list: %s" (show_tracker_url tracker.tracker_url) ) file.file_trackers; List.iter (fun bad_tracker -> if bad_tracker.tracker_url = url then @@ -875,7 +888,7 @@ let remove_tracker url file = ) file.file_trackers; if !verbose_msg_servers then List.iter (fun tracker -> - lprintf_nl "New tracker list :%s" tracker.tracker_url + lprintf_nl "New tracker list: %s" (show_tracker_url tracker.tracker_url) ) file.file_trackers let tracker_is_enabled t = diff --git a/src/networks/bittorrent/bTInteractive.ml b/src/networks/bittorrent/bTInteractive.ml index d7930e2..70c02e8 100644 --- a/src/networks/bittorrent/bTInteractive.ml +++ b/src/networks/bittorrent/bTInteractive.ml @@ -193,14 +193,15 @@ let op_file_print file o = Printf.bprintf buf "\\\\