From d988bcc70626010cb8fb26b232c7eb7549658f77 Mon Sep 17 00:00:00 2001 From: Jesper Louis Andersen Date: Thu, 24 Jul 2008 02:25:30 +0200 Subject: [PATCH] Globalize the etorrent_t_peer_group_mgr and rename it to etorrent_choker while here. --- lib/etorrent-1.0/src/etorrent.erl | 11 +- lib/etorrent-1.0/src/etorrent_acceptor.erl | 50 +++---- lib/etorrent-1.0/src/etorrent_bad_peer_mgr.erl | 15 ++- ...nt_t_peer_group_mgr.erl => etorrent_choker.erl} | 149 +++++++-------------- lib/etorrent-1.0/src/etorrent_fs.erl | 18 ++- lib/etorrent-1.0/src/etorrent_sup.erl | 18 ++- lib/etorrent-1.0/src/etorrent_t_control.erl | 13 +- lib/etorrent-1.0/src/etorrent_t_manager.erl | 22 ++- lib/etorrent-1.0/src/etorrent_t_peer_pool_sup.erl | 6 +- lib/etorrent-1.0/src/etorrent_t_peer_recv.erl | 40 +++--- lib/etorrent-1.0/src/etorrent_t_peer_sup.erl | 9 +- lib/etorrent-1.0/src/etorrent_t_sup.erl | 28 ++-- .../src/etorrent_tracker_communication.erl | 14 +- 13 files changed, 169 insertions(+), 224 deletions(-) rename lib/etorrent-1.0/src/{etorrent_t_peer_group_mgr.erl => etorrent_choker.erl} (70%) diff --git a/lib/etorrent-1.0/src/etorrent.erl b/lib/etorrent-1.0/src/etorrent.erl index d12d4e5..516b1ea 100644 --- a/lib/etorrent-1.0/src/etorrent.erl +++ b/lib/etorrent-1.0/src/etorrent.erl @@ -1,12 +1,15 @@ -module(etorrent). -behaviour(application). +-include("etorrent_version.hrl"). -include("etorrent_mnesia_table.hrl"). -export([stop/0, start/0, db_create_schema/0]). -export([start/2, stop/1]). -export([help/0, h/0, list/0, l/0, show/0, s/0, show/1, s/1, check/1]). +-define(RANDOM_MAX_SIZE, 999999999999). + start() -> ok = application:start(crypto), ok = application:start(inets), @@ -16,7 +19,9 @@ start() -> application:start(etorrent). start(_Type, _Args) -> - etorrent_sup:start_link(). + PeerId = generate_peer_id(), + {ok, Pid} = etorrent_sup:start_link(PeerId), + {ok, Pid}. stop() -> ok = application:stop(etorrent), @@ -121,3 +126,7 @@ percent_complete(R) -> %% left / complete * 100 = % done (R#torrent.total - R#torrent.left) / R#torrent.total * 100. +generate_peer_id() -> + Number = crypto:rand_uniform(0, ?RANDOM_MAX_SIZE), + Rand = io_lib:fwrite("~B----------", [Number]), + lists:flatten(io_lib:format("-ET~s-~12s", [?VERSION, Rand])). diff --git a/lib/etorrent-1.0/src/etorrent_acceptor.erl b/lib/etorrent-1.0/src/etorrent_acceptor.erl index 33d42cd..eb487ff 100644 --- a/lib/etorrent-1.0/src/etorrent_acceptor.erl +++ b/lib/etorrent-1.0/src/etorrent_acceptor.erl @@ -115,41 +115,35 @@ handshake(Socket) -> lookup_infohash(Socket, ReservedBytes, InfoHash, PeerId) -> case etorrent_tracking_map:select({infohash, InfoHash}) of - {atomic, [#tracking_map {supervisor_pid = Pid}]} -> - start_peer(Socket, Pid, ReservedBytes, PeerId); + {atomic, [#tracking_map { _ = _}]} -> + start_peer(Socket, ReservedBytes, PeerId, InfoHash); {atomic, []} -> gen_tcp:close(Socket), ok end. -start_peer(Socket, Pid, ReservedBytes, PeerId) -> - case etorrent_t_sup:get_pid(Pid, peer_group) of - PeerGroupPid when is_pid(PeerGroupPid) -> - {ok, {Address, Port}} = inet:peername(Socket), - case etorrent_t_peer_group_mgr:new_incoming_peer(PeerGroupPid, Address, Port, PeerId) of - {ok, PeerProcessPid} -> - case gen_tcp:controlling_process(Socket, PeerProcessPid) of - ok -> etorrent_t_peer_recv:complete_handshake(PeerProcessPid, - ReservedBytes, - Socket, - PeerId), - ok; - {error, enotconn} -> - etorrent_t_peer_recv:stop(PeerProcessPid), - ok - end; - already_enough_connections -> - ok; - connect_to_ourselves -> - gen_tcp:close(Socket), - ok; - bad_peer -> - error_logger:info_report([peer_id_is_bad, PeerId]), - gen_tcp:close(Socket), +start_peer(Socket, ReservedBytes, PeerId, InfoHash) -> + {ok, {Address, Port}} = inet:peername(Socket), + case etorrent_choker:new_incoming_peer(Address, Port, PeerId, InfoHash) of + {ok, PeerProcessPid} -> + case gen_tcp:controlling_process(Socket, PeerProcessPid) of + ok -> etorrent_t_peer_recv:complete_handshake(PeerProcessPid, + ReservedBytes, + Socket, + PeerId), + ok; + {error, enotconn} -> + etorrent_t_peer_recv:stop(PeerProcessPid), ok end; - false -> - %% The peer is not yet ready! + already_enough_connections -> + ok; + connect_to_ourselves -> + gen_tcp:close(Socket), + ok; + bad_peer -> + error_logger:info_report([peer_id_is_bad, PeerId]), + gen_tcp:close(Socket), ok end. diff --git a/lib/etorrent-1.0/src/etorrent_bad_peer_mgr.erl b/lib/etorrent-1.0/src/etorrent_bad_peer_mgr.erl index 2f025d6..1bee97c 100644 --- a/lib/etorrent-1.0/src/etorrent_bad_peer_mgr.erl +++ b/lib/etorrent-1.0/src/etorrent_bad_peer_mgr.erl @@ -73,12 +73,7 @@ init([]) -> %%-------------------------------------------------------------------- %% A peer is bad if it has offended us or if it is already connected. handle_call({is_bad_peer, IP, Port, TorrentId}, _From, S) -> - Reply = case ets:lookup(etorrent_bad_peer, {IP, Port}) of - [] -> - etorrent_peer:connected(IP, Port, TorrentId); - [P] -> - P#bad_peer.offenses > ?DEFAULT_BAD_COUNT - end, + Reply = bad_peer(IP, Port, TorrentId), {reply, Reply, S}; handle_call(_Request, _From, State) -> Reply = ok, @@ -145,3 +140,11 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- + +bad_peer(IP, Port, TorrentId) -> + case ets:lookup(etorrent_bad_peer, {IP, Port}) of + [] -> + etorrent_peer:connected(IP, Port, TorrentId); + [P] -> + P#bad_peer.offenses > ?DEFAULT_BAD_COUNT + end. diff --git a/lib/etorrent-1.0/src/etorrent_t_peer_group_mgr.erl b/lib/etorrent-1.0/src/etorrent_choker.erl similarity index 70% rename from lib/etorrent-1.0/src/etorrent_t_peer_group_mgr.erl rename to lib/etorrent-1.0/src/etorrent_choker.erl index ee021c4..004e60a 100644 --- a/lib/etorrent-1.0/src/etorrent_t_peer_group_mgr.erl +++ b/lib/etorrent-1.0/src/etorrent_choker.erl @@ -7,8 +7,7 @@ %%% Created : 18 Jul 2007 by %%% Jesper Louis Andersen %%%------------------------------------------------------------------- - --module(etorrent_t_peer_group_mgr). +-module(etorrent_choker). -behaviour(gen_server). @@ -18,9 +17,7 @@ -include("etorrent_mnesia_table.hrl"). %% API --export([start_link/5, add_peers/2, broadcast_have/2, new_incoming_peer/4, - broadcast_got_chunk/2, perform_rechoke/1, - broadcast_queue_pieces/1]). +-export([start_link/1, add_peers/2, new_incoming_peer/4, perform_rechoke/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -36,12 +33,9 @@ round = 0, optimistic_unchoke_pid = none, - opt_unchoke_chain = [], - - file_system_pid = none, - peer_group_sup = none, - torrent_id = none}). + opt_unchoke_chain = []}). +-define(SERVER, ?MODULE). -define(MAX_PEER_PROCESSES, 40). -define(ROUND_TIME, 10000). -define(DEFAULT_NUM_DOWNLOADERS, 4). @@ -49,56 +43,42 @@ %%==================================================================== %% API %%==================================================================== -start_link(OurPeerId, PeerGroup, InfoHash, - FileSystemPid, TorrentHandle) -> - gen_server:start_link(?MODULE, [OurPeerId, PeerGroup, InfoHash, - FileSystemPid, TorrentHandle], []). - -add_peers(Pid, IPList) -> - gen_server:cast(Pid, {add_peers, IPList}). - -broadcast_have(Pid, Index) -> - gen_server:cast(Pid, {broadcast_have, Index}). +start_link(OurPeerId) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [OurPeerId], []). -broadcast_got_chunk(Pid, Chunk) -> - gen_server:cast(Pid, {broadcast_got_chunk, Chunk}). +add_peers(TorrentId, IPList) -> + gen_server:cast(?SERVER, {add_peers, [{TorrentId, IPP} || IPP <- IPList]}). -broadcast_queue_pieces(Pid) -> - gen_server:cast(Pid, broadcast_queue_pieces). +perform_rechoke() -> + gen_server:cast(?SERVER, rechoke). -perform_rechoke(Pid) -> - gen_server:cast(Pid, rechoke). - -new_incoming_peer(Pid, IP, Port, PeerId) -> +new_incoming_peer(IP, Port, PeerId, InfoHash) -> %% Set a pretty graceful timeout here as the peer_group can be pretty heavily %% loaded at times. We have 5 acceptors by default anyway. - gen_server:call(Pid, {new_incoming_peer, IP, Port, PeerId}, 15000). + gen_server:call(?SERVER, {new_incoming_peer, IP, Port, PeerId, InfoHash}, 15000). %%==================================================================== %% gen_server callbacks %%==================================================================== -init([OurPeerId, PeerGroup, InfoHash, - FileSystemPid, TorrentId]) when is_integer(TorrentId) -> +init([OurPeerId]) -> process_flag(trap_exit, true), {ok, Tref} = timer:send_interval(?ROUND_TIME, self(), round_tick), {ok, #state{ our_peer_id = OurPeerId, - peer_group_sup = PeerGroup, - bad_peers = dict:new(), - info_hash = InfoHash, - timer_ref = Tref, - torrent_id = TorrentId, - file_system_pid = FileSystemPid}}. - -handle_call({new_incoming_peer, _IP, _Port, PeerId}, _From, S) + bad_peers = dict:new(), + timer_ref = Tref}}. + + +handle_call({new_incoming_peer, _IP, _Port, PeerId, _InfoHash}, _From, S) when S#state.our_peer_id =:= PeerId -> {reply, connect_to_ourselves, S}; -handle_call({new_incoming_peer, IP, Port, _PeerId}, _From, S) -> - case etorrent_bad_peer_mgr:is_bad_peer(IP, Port, S#state.torrent_id) of +handle_call({new_incoming_peer, IP, Port, _PeerId, InfoHash}, _From, S) -> + {atomic, [TM]} = etorrent_tracking_map:select({infohash, InfoHash}), + case etorrent_bad_peer_mgr:is_bad_peer(IP, Port, TM#tracking_map.id) of true -> {reply, bad_peer, S}; false -> - start_new_incoming_peer(IP, Port, S) + start_new_incoming_peer(IP, Port, InfoHash, S) end; handle_call(Request, _From, State) -> error_logger:error_report([unknown_peer_group_call, Request]), @@ -108,18 +88,9 @@ handle_call(Request, _From, State) -> handle_cast({add_peers, IPList}, S) -> {ok, NS} = start_new_peers(IPList, S), {noreply, NS}; -handle_cast({broadcast_have, Index}, S) -> - broadcast_have_message(Index, S), - {noreply, S}; -handle_cast({broadcast_got_chunk, Chunk}, S) -> - bcast_got_chunk(Chunk, S), - {noreply, S}; handle_cast(rechoke, S) -> rechoke(S), {noreply, S}; -handle_cast(broadcast_queue_pieces, S) -> - bcast_queue_pieces(S), - {noreply, S}; handle_cast(_Msg, State) -> {noreply, State}. @@ -175,18 +146,17 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%-------------------------------------------------------------------- -start_new_incoming_peer(IP, Port, S) -> +start_new_incoming_peer(IP, Port, InfoHash, S) -> case ?MAX_PEER_PROCESSES - S#state.num_peers of N when N =< 0 -> {reply, already_enough_connections, S}; N when is_integer(N), N > 0 -> - {ok, Pid} = etorrent_t_peer_pool_sup:add_peer( - S#state.peer_group_sup, + {atomic, [T]} = etorrent_tracking_map:select({infohash, InfoHash}), + {ok, Pid} = etorrent_t_sup:add_peer( + T#tracking_map.supervisor_pid, S#state.our_peer_id, - S#state.info_hash, - S#state.file_system_pid, - self(), - S#state.torrent_id, + InfoHash, + T#tracking_map.id, {IP, Port}), erlang:monitor(process, Pid), NewChain = insert_new_peer_into_chain(Pid, S#state.opt_unchoke_chain), @@ -196,30 +166,6 @@ start_new_incoming_peer(IP, Port, S) -> opt_unchoke_chain = NewChain}} end. -%% -%% Apply F to each Peer Pid -foreach_pid(F, S) -> - Peers = etorrent_peer:all(S#state.torrent_id), - lists:foreach(F, Peers), - ok. - -bcast_got_chunk(Chunk, S) -> - foreach_pid(fun (Peer) -> - etorrent_t_peer_recv:endgame_got_chunk(Peer#peer.pid, Chunk) - end, - S). - -bcast_queue_pieces(S) -> - foreach_pid(fun (P) -> - etorrent_t_peer_recv:queue_pieces(P#peer.pid) - end, - S). - -broadcast_have_message(Index, S) -> - foreach_pid(fun (Peer) -> - etorrent_t_peer_recv:send_have_piece(Peer#peer.pid, Index) - end, - S). start_new_peers(IPList, State) -> %% Update the PeerList with the new incoming peers @@ -237,13 +183,13 @@ fill_peers(N, S) -> [] -> % No peers available, just stop trying to fill peers {ok, S}; - [{IP, Port} | R] -> + [{TorrentId, {IP, Port}} | R] -> % Possible peer. Check it. - case etorrent_bad_peer_mgr:is_bad_peer(IP, Port, S#state.torrent_id) of + case etorrent_bad_peer_mgr:is_bad_peer(IP, Port, TorrentId) of true -> fill_peers(N, S#state{available_peers = R}); false -> - spawn_new_peer(IP, Port, N, S#state{available_peers = R}) + spawn_new_peer(IP, Port, TorrentId, N, S#state{available_peers = R}) end end. @@ -253,18 +199,17 @@ fill_peers(N, S) -> %% peers we still need to spawn and S is the current state. Returns %% a new state to be put into the process. %%-------------------------------------------------------------------- -spawn_new_peer(IP, Port, N, S) -> - case etorrent_peer:connected(IP, Port, S#state.torrent_id) of +spawn_new_peer(IP, Port, TorrentId, N, S) -> + case etorrent_peer:connected(IP, Port, TorrentId) of true -> fill_peers(N, S); false -> - {ok, Pid} = etorrent_t_peer_pool_sup:add_peer( - S#state.peer_group_sup, + {atomic, [TM]} = etorrent_tracking_map:select(TorrentId), + {ok, Pid} = etorrent_t_sup:add_peer( + TM#tracking_map.supervisor_pid, S#state.our_peer_id, - S#state.info_hash, - S#state.file_system_pid, - self(), - S#state.torrent_id, + TM#tracking_map.info_hash, + TorrentId, {IP, Port}), erlang:monitor(process, Pid), etorrent_t_peer_recv:connect(Pid, IP, Port), @@ -279,12 +224,8 @@ spawn_new_peer(IP, Port, N, S) -> %% Description: Recalculate the choke/unchoke state of peers %%-------------------------------------------------------------------- rechoke(S) -> - Table = case etorrent_torrent:mode(S#state.torrent_id) of - seeding -> etorrent_send_state; - leeching -> etorrent_recv_state; - endgame -> etorrent_recv_state - end, - Peers = select_fastest(S#state.torrent_id, Table), + Table = etorrent_recv_state, + Peers = select_fastest(todo_rewrite_choking_algo, Table), rechoke(Peers, calculate_num_downloaders(S), S). rechoke(Peers, 0, S) -> @@ -319,7 +260,7 @@ optimistic_unchoke_handler(#rate_mgr { pid = {_Id, Pid} }, S) -> %% TODO: Make number of downloaders depend on current rate! calculate_num_downloaders(S) -> - case ets:lookup(etorrent_peer_state, {S#state.torrent_id, + case ets:lookup(etorrent_peer_state, {todo_redefine_optimistics, S#state.optimistic_unchoke_pid}) of [] -> ?DEFAULT_NUM_DOWNLOADERS; @@ -333,7 +274,7 @@ calculate_num_downloaders(S) -> end. advance_optimistic_unchoke(S) -> - NewChain = move_cyclic_chain(S#state.opt_unchoke_chain, S), + NewChain = move_cyclic_chain(S#state.opt_unchoke_chain), case NewChain of [] -> {ok, S}; %% No peers yet @@ -343,9 +284,9 @@ advance_optimistic_unchoke(S) -> optimistic_unchoke_pid = H }} end. -move_cyclic_chain([], _S) -> []; -move_cyclic_chain(Chain, S) -> - F = fun (P) -> local_unchoked(P, S#state.torrent_id) end, +move_cyclic_chain([]) -> []; +move_cyclic_chain(Chain) -> + F = fun (P) -> local_unchoked(P, todo_move_cyclic_chain_all) end, {Front, Back} = lists:splitwith(F, Chain), %% Advance chain Back ++ Front. diff --git a/lib/etorrent-1.0/src/etorrent_fs.erl b/lib/etorrent-1.0/src/etorrent_fs.erl index e17819a..755dd47 100644 --- a/lib/etorrent-1.0/src/etorrent_fs.erl +++ b/lib/etorrent-1.0/src/etorrent_fs.erl @@ -17,7 +17,7 @@ %% API -export([start_link/2, stop/1, read_piece/2, size_of_ops/1, - write_chunk/2, check_piece/3]). + write_chunk/2, check_piece/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -65,9 +65,8 @@ read_piece(Pid, Pn) when is_integer(Pn) -> %% Description: Search the mnesia tables for the Piece with Index and %% write it back to disk. %%-------------------------------------------------------------------- -%%% TODO: The PeerGroupPid could be obtained by the process itself. -check_piece(Pid, PeerGroupPid, Index) -> - gen_server:cast(Pid, {check_piece, PeerGroupPid, Index}). +check_piece(Pid, Index) -> + gen_server:cast(Pid, {check_piece, Index}). write_chunk(Pid, {Index, Data, Ops}) -> @@ -102,7 +101,7 @@ handle_cast({write_chunk, {Index, Data, Ops}}, S) -> {ok, NS} = fs_write(Data, Ops, S), {noreply, NS} end; -handle_cast({check_piece, PeerGroupPid, Index}, S) -> +handle_cast({check_piece, Index}, S) -> [#piece { hash = Hash, files = Operations}] = etorrent_piece_mgr:select(S#state.torrent_id, Index), {ok, Data, NS} = read_pieces_and_assemble(Operations, [], S), @@ -119,8 +118,7 @@ handle_cast({check_piece, PeerGroupPid, Index}, S) -> fetched), %% Make sure there is no chunks left for this piece. ok = etorrent_chunk_mgr:remove_chunks(S#state.torrent_id, Index), - ok = etorrent_t_peer_group_mgr:broadcast_have(PeerGroupPid, - Index), + broadcast_have_message(Index, S#state.torrent_id), {noreply, NS}; false -> ok = @@ -241,5 +239,11 @@ stop_all_fs_processes(Dict) -> dict:to_list(Dict)), ok. +broadcast_have_message(Index, TorrentId) -> + Peers = etorrent_peer:all(TorrentId), + lists:foreach(fun (Peer) -> + etorrent_t_peer_recv:send_have_piece(Peer#peer.pid, Index) + end, + Peers). diff --git a/lib/etorrent-1.0/src/etorrent_sup.erl b/lib/etorrent-1.0/src/etorrent_sup.erl index d5ecfea..ce83e3d 100644 --- a/lib/etorrent-1.0/src/etorrent_sup.erl +++ b/lib/etorrent-1.0/src/etorrent_sup.erl @@ -11,7 +11,7 @@ -behaviour(supervisor). %% API --export([start_link/0]). +-export([start_link/1]). %% Supervisor callbacks -export([init/1]). @@ -21,13 +21,14 @@ %%==================================================================== %% API functions %%==================================================================== -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). +start_link(PeerId) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, [PeerId]). %%==================================================================== %% Supervisor callbacks %%==================================================================== -init([]) -> +init([PeerId]) -> + error_logger:info_report([etorrent_supervisor_starting, PeerId]), EventManager = {event_manager, {etorrent_event_mgr, start_link, []}, permanent, 2000, worker, [etorrent_event_mgr]}, @@ -46,6 +47,9 @@ init([]) -> ChunkManager = {etorrent_chunk_mgr, {etorrent_chunk_mgr, start_link, []}, permanent, 15000, worker, [etorrent_chunk_mgr]}, + Choker = {choker, + {etorrent_choker, start_link, [PeerId]}, + permanent, 5000, worker, [etorrent_choker]}, Listener = {listener, {etorrent_listener, start_link, []}, permanent, 2000, worker, [etorrent_listener]}, @@ -56,15 +60,15 @@ init([]) -> {etorrent_dirwatcher_sup, start_link, []}, transient, infinity, supervisor, [etorrent_dirwatcher_sup]}, TorrentMgr = {manager, - {etorrent_t_manager, start_link, []}, + {etorrent_t_manager, start_link, [PeerId]}, permanent, 2000, worker, [etorrent_t_manager]}, TorrentPool = {torrent_pool_sup, {etorrent_t_pool_sup, start_link, []}, transient, infinity, supervisor, [etorrent_t_pool_sup]}, {ok, {{one_for_all, 1, 60}, - [EventManager, BadPeerMgr, FastResume, PieceManager, - ChunkManager, RateManager, Listener, AcceptorSup, DirWatcherSup, TorrentMgr, + [EventManager, BadPeerMgr, FastResume, RateManager, PieceManager, + ChunkManager, Choker, Listener, AcceptorSup, DirWatcherSup, TorrentMgr, TorrentPool]}}. %%==================================================================== diff --git a/lib/etorrent-1.0/src/etorrent_t_control.erl b/lib/etorrent-1.0/src/etorrent_t_control.erl index dcb1c1d..71cb8a9 100644 --- a/lib/etorrent-1.0/src/etorrent_t_control.erl +++ b/lib/etorrent-1.0/src/etorrent_t_control.erl @@ -33,7 +33,6 @@ parent_pid = none, tracker_pid = none, file_system_pid = none, - peer_group_pid = none, disk_state = none, available_peers = []}). @@ -134,19 +133,10 @@ initializing(timeout, S) -> {total, etorrent_metainfo:get_length(Torrent)}}, NumberOfPieces), - %% And a process for controlling the peers for this torrent. - {ok, PeerGroupPid} = - etorrent_t_sup:add_peer_group( - S#state.parent_pid, - S#state.peer_id, - InfoHash, - S#state.id), - %% Start the tracker {ok, TrackerPid} = etorrent_t_sup:add_tracker( S#state.parent_pid, - PeerGroupPid, etorrent_metainfo:get_url(Torrent), etorrent_metainfo:get_infohash(Torrent), S#state.peer_id, @@ -157,8 +147,7 @@ initializing(timeout, S) -> garbage_collect(), {next_state, started, S#state{file_system_pid = FSPid, - tracker_pid = TrackerPid, - peer_group_pid = PeerGroupPid}} + tracker_pid = TrackerPid}} end. diff --git a/lib/etorrent-1.0/src/etorrent_t_manager.erl b/lib/etorrent-1.0/src/etorrent_t_manager.erl index 3728ae6..ca398e8 100644 --- a/lib/etorrent-1.0/src/etorrent_t_manager.erl +++ b/lib/etorrent-1.0/src/etorrent_t_manager.erl @@ -4,25 +4,25 @@ -module(etorrent_t_manager). -behaviour(gen_server). --include("etorrent_version.hrl"). -include("etorrent_mnesia_table.hrl"). --export([start_link/0, start_torrent/1, stop_torrent/1, +-export([start_link/1, + + start_torrent/1, stop_torrent/1, check_torrent/1]). -export([handle_cast/2, handle_call/3, init/1, terminate/2]). -export([handle_info/2, code_change/3]). --export([generate_peer_id/0]). -define(SERVER, ?MODULE). --define(RANDOM_MAX_SIZE, 999999999999). + -record(state, {local_peer_id}). %% API %% Start a new etorrent_t_manager process -start_link() -> - gen_server:start_link({local, ?SERVER}, etorrent_t_manager, [], []). +start_link(PeerId) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [PeerId], []). %% Ask the manager process to start a new torrent, given in File. start_torrent(File) -> @@ -37,8 +37,8 @@ stop_torrent(File) -> gen_server:cast(?SERVER, {stop_torrent, File}). %% Callbacks -init(_Args) -> - {ok, #state { local_peer_id = generate_peer_id()}}. +init([PeerId]) -> + {ok, #state { local_peer_id = PeerId}}. handle_cast({start_torrent, F}, S) -> case torrent_duplicate(F) of @@ -86,12 +86,6 @@ stop_torrent(F, S) -> end, {noreply, S}. -generate_peer_id() -> - Number = crypto:rand_uniform(0, ?RANDOM_MAX_SIZE), - Rand = io_lib:fwrite("~B----------", [Number]), - PeerId = lists:flatten(io_lib:format("-ET~s-~12s", [?VERSION, Rand])), - error_logger:info_report([peer_id, PeerId]), - PeerId. torrent_duplicate(F) -> case etorrent_tracking_map:select({filename, F}) of diff --git a/lib/etorrent-1.0/src/etorrent_t_peer_pool_sup.erl b/lib/etorrent-1.0/src/etorrent_t_peer_pool_sup.erl index 9eca7ae..3acb9bc 100644 --- a/lib/etorrent-1.0/src/etorrent_t_peer_pool_sup.erl +++ b/lib/etorrent-1.0/src/etorrent_t_peer_pool_sup.erl @@ -10,7 +10,7 @@ -behaviour(supervisor). %% API --export([start_link/0, add_peer/7]). +-export([start_link/0, add_peer/6]). %% Supervisor callbacks -export([init/1]). @@ -32,10 +32,10 @@ start_link() -> %% Description: Add a peer to the supervisor pool. Returns the reciever %% process hooked on the supervisor. %%-------------------------------------------------------------------- -add_peer(GroupPid, LocalPeerId, InfoHash, FilesystemPid, Parent, Id, +add_peer(GroupPid, LocalPeerId, InfoHash, FilesystemPid, Id, {IP, Port}) -> {ok, Pid} = supervisor:start_child(GroupPid, [LocalPeerId, InfoHash, - FilesystemPid, Parent, Id, + FilesystemPid, Id, {IP, Port}]), Children = supervisor:which_children(Pid), {value, {_, Child, _, _}} = lists:keysearch(reciever, 1, Children), diff --git a/lib/etorrent-1.0/src/etorrent_t_peer_recv.erl b/lib/etorrent-1.0/src/etorrent_t_peer_recv.erl index a104bbf..95b9c7c 100644 --- a/lib/etorrent-1.0/src/etorrent_t_peer_recv.erl +++ b/lib/etorrent-1.0/src/etorrent_t_peer_recv.erl @@ -15,7 +15,7 @@ -include("etorrent_rate.hrl"). %% API --export([start_link/7, connect/3, choke/1, unchoke/1, interested/1, +-export([start_link/6, connect/3, choke/1, unchoke/1, interested/1, send_have_piece/2, complete_handshake/4, endgame_got_chunk/2, queue_pieces/1, stop/1]). @@ -46,7 +46,6 @@ parent = none, file_system_pid = none, - peer_group_pid = none, send_pid = none, rate = none, @@ -64,10 +63,10 @@ %% Function: start_link() -> {ok,Pid} | ignore | {error,Error} %% Description: Starts the server %%-------------------------------------------------------------------- -start_link(LocalPeerId, InfoHash, FilesystemPid, GroupPid, Id, Parent, +start_link(LocalPeerId, InfoHash, FilesystemPid, Id, Parent, {IP, Port}) -> gen_server:start_link(?MODULE, [LocalPeerId, InfoHash, - FilesystemPid, GroupPid, Id, Parent, + FilesystemPid, Id, Parent, {IP, Port}], []). %%-------------------------------------------------------------------- @@ -145,7 +144,7 @@ queue_pieces(Pid) -> %% {stop, Reason} %% Description: Initiates the server %%-------------------------------------------------------------------- -init([LocalPeerId, InfoHash, FilesystemPid, GroupPid, Id, Parent, {IP, Port}]) -> +init([LocalPeerId, InfoHash, FilesystemPid, Id, Parent, {IP, Port}]) -> process_flag(trap_exit, true), {ok, TRef} = timer:send_interval(?RATE_UPDATE, self(), rate_update), ok = etorrent_peer:new(IP, Port, Id, self()), @@ -155,7 +154,6 @@ init([LocalPeerId, InfoHash, FilesystemPid, GroupPid, Id, Parent, {IP, Port}]) - piece_set = gb_sets:new(), remote_request_set = gb_trees:empty(), info_hash = InfoHash, - peer_group_pid = GroupPid, torrent_id = Id, rate = etorrent_rate:init(?RATE_FUDGE), rate_timer = TRef, @@ -309,11 +307,13 @@ handle_message(unchoke, S) -> try_to_queue_up_pieces(S#state{remote_choked = false}); handle_message(interested, S) -> ok = etorrent_rate_mgr:interested(S#state.torrent_id, self()), - etorrent_t_peer_group_mgr:perform_rechoke(S#state.peer_group_pid), + %%TODO: Only call this if the guy is *not* choked by us + etorrent_choker:perform_rechoke(), {ok, S}; handle_message(not_interested, S) -> ok = etorrent_rate_mgr:not_interested(S#state.torrent_id, self()), - etorrent_t_peer_group_mgr:perform_rechoke(S#state.peer_group_pid), + %%TODO: Only call this if the guy is *not* choked by us + etorrent_choker:perform_rechoke(), {ok, S}; handle_message({request, Index, Offset, Len}, S) -> etorrent_t_peer_send:remote_request(S#state.send_pid, Index, Offset, Len), @@ -410,9 +410,7 @@ handle_got_chunk(Index, Offset, Data, Len, S) -> {Offset, Len}, self()) of full -> - etorrent_fs:check_piece(S#state.file_system_pid, - S#state.peer_group_pid, - Index); + etorrent_fs:check_piece(S#state.file_system_pid, Index); ok -> ok end, @@ -424,9 +422,7 @@ handle_got_chunk(Index, Offset, Data, Len, S) -> found -> ok; assigned -> - etorrent_t_peer_group_mgr:broadcast_got_chunk( - S#state.peer_group_pid, - {Index, Offset, Len}) + broadcast_got_chunk({Index, Offset, Len}, S#state.torrent_id) end; false -> ok @@ -449,7 +445,7 @@ unqueue_all_pieces(S) -> %% Put chunks back ok = etorrent_chunk_mgr:putback_chunks(self()), %% Tell other peers that there is 0xf00d! - etorrent_t_peer_group_mgr:broadcast_queue_pieces(S#state.peer_group_pid), + broadcast_queue_pieces(S#state.torrent_id), %% Clean up the request set. S#state{remote_request_set = gb_trees:empty()}. @@ -596,3 +592,17 @@ handle_read_from_socket(S, Packet) {noreply, S#state { packet_iolist = [Packet | S#state.packet_iolist], packet_left = S#state.packet_left - size(Packet) }, 0}. +broadcast_queue_pieces(TorrentId) -> + Peers = etorrent_peer:all(TorrentId), + lists:foreach(fun (P) -> + etorrent_t_peer_recv:queue_pieces(P#peer.pid) + end, + Peers). + +broadcast_got_chunk(Chunk, TorrentId) -> + Peers = etorrent_peer:all(TorrentId), + lists:foreach(fun (Peer) -> + etorrent_t_peer_recv:endgame_got_chunk(Peer#peer.pid, Chunk) + end, + Peers). + diff --git a/lib/etorrent-1.0/src/etorrent_t_peer_sup.erl b/lib/etorrent-1.0/src/etorrent_t_peer_sup.erl index 841c144..37d1c3e 100644 --- a/lib/etorrent-1.0/src/etorrent_t_peer_sup.erl +++ b/lib/etorrent-1.0/src/etorrent_t_peer_sup.erl @@ -10,7 +10,7 @@ -behaviour(supervisor). %% API --export([start_link/6, add_sender/5]). +-export([start_link/5, add_sender/5]). %% Supervisor callbacks -export([init/1]). @@ -24,11 +24,10 @@ %% Function: start_link() -> {ok,Pid} | ignore | {error,Error} %% Description: Starts the supervisor %%-------------------------------------------------------------------- -start_link(LocalPeerId, InfoHash, FilesystemPid, GroupPid, Id, {IP, Port}) -> +start_link(LocalPeerId, InfoHash, FilesystemPid, Id, {IP, Port}) -> supervisor:start_link(?MODULE, [LocalPeerId, InfoHash, FilesystemPid, - GroupPid, Id, {IP, Port}]). @@ -50,9 +49,9 @@ add_sender(Pid, Socket, FileSystemPid, Id, RecvPid) -> %% to find out about restart strategy, maximum restart frequency and child %% specifications. %%-------------------------------------------------------------------- -init([LocalPeerId, InfoHash, FilesystemPid, GroupPid, Id, {IP, Port}]) -> +init([LocalPeerId, InfoHash, FilesystemPid, Id, {IP, Port}]) -> Reciever = {reciever, {etorrent_t_peer_recv, start_link, - [LocalPeerId, InfoHash, FilesystemPid, GroupPid, Id, self(), + [LocalPeerId, InfoHash, FilesystemPid, Id, self(), {IP, Port}]}, permanent, 15000, worker, [etorrent_t_peer_recv]}, {ok, {{one_for_all, 0, 1}, [Reciever]}}. diff --git a/lib/etorrent-1.0/src/etorrent_t_sup.erl b/lib/etorrent-1.0/src/etorrent_t_sup.erl index 566c5cc..0ef6f22 100644 --- a/lib/etorrent-1.0/src/etorrent_t_sup.erl +++ b/lib/etorrent-1.0/src/etorrent_t_sup.erl @@ -12,7 +12,8 @@ -behaviour(supervisor). %% API --export([start_link/3, add_peer_group/4, add_tracker/6, get_pid/2]). +-export([start_link/3, add_tracker/5, get_pid/2, + add_peer/5]). %% Supervisor callbacks -export([init/1]). @@ -43,24 +44,23 @@ get_pid(Pid, Name) -> %% Func: add_file_system_pool/1 %% Description: Add a filesystem process to the torrent. %%-------------------------------------------------------------------- - -add_peer_group(Pid, Local_Peer_Id, InfoHash, TorrentId) -> - GroupPid = get_pid(Pid, peer_pool_sup), - FSPid = get_pid(Pid, fs), - PeerGroup = {peer_group, - {etorrent_t_peer_group_mgr, start_link, - [Local_Peer_Id, GroupPid, - InfoHash, FSPid, TorrentId]}, - permanent, 2000, worker, [etorrent_t_peer_group]}, - supervisor:start_child(Pid, PeerGroup). - -add_tracker(Pid, PeerGroupPid, URL, InfoHash, Local_Peer_Id, TorrentId) -> +add_tracker(Pid, URL, InfoHash, Local_Peer_Id, TorrentId) -> Tracker = {tracker_communication, {etorrent_tracker_communication, start_link, - [self(), PeerGroupPid, URL, InfoHash, Local_Peer_Id, TorrentId]}, + [self(), URL, InfoHash, Local_Peer_Id, TorrentId]}, permanent, 15000, worker, [etorrent_tracker_communication]}, supervisor:start_child(Pid, Tracker). +add_peer(Pid, PeerId, InfoHash, TorrentId, {IP, Port}) -> + FSPid = get_pid(Pid, fs), + GroupPid = get_pid(Pid, peer_pool_sup), + etorrent_t_peer_pool_sup:add_peer(GroupPid, + PeerId, + InfoHash, + FSPid, + TorrentId, + {IP, Port}). + %%==================================================================== %% Supervisor callbacks %%==================================================================== diff --git a/lib/etorrent-1.0/src/etorrent_tracker_communication.erl b/lib/etorrent-1.0/src/etorrent_tracker_communication.erl index 224d411..3c6b3b7 100644 --- a/lib/etorrent-1.0/src/etorrent_tracker_communication.erl +++ b/lib/etorrent-1.0/src/etorrent_tracker_communication.erl @@ -13,7 +13,7 @@ -include("etorrent_mnesia_table.hrl"). %% API --export([start_link/6, contact/1, stopped/1, completed/1, started/1]). +-export([start_link/5, contact/1, stopped/1, completed/1, started/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -25,7 +25,6 @@ %% soft timer may be overridden if we want to change state. soft_timer = none, hard_timer = none, - peer_group_pid = none, url = none, info_hash = none, peer_id = none, @@ -45,9 +44,9 @@ %% Function: start_link() -> {ok,Pid} | ignore | {error,Error} %% Description: Starts the server %%-------------------------------------------------------------------- -start_link(ControlPid, PeerGroupPid, Url, InfoHash, PeerId, TorrentId) -> +start_link(ControlPid, Url, InfoHash, PeerId, TorrentId) -> gen_server:start_link(?MODULE, - [ControlPid, PeerGroupPid, + [ControlPid, Url, InfoHash, PeerId, TorrentId], []). @@ -91,13 +90,12 @@ completed(Pid) -> %% {stop, Reason} %% Description: Initiates the server %%-------------------------------------------------------------------- -init([ControlPid, PeerGroupPid, Url, InfoHash, PeerId, TorrentId]) -> +init([ControlPid, Url, InfoHash, PeerId, TorrentId]) -> {ok, HardRef} = timer:send_after(0, hard_timeout), {ok, SoftRef} = timer:send_after(timer:seconds(?DEFAULT_CONNECTION_TIMEOUT_INTERVAL), soft_timeout), {ok, #state{should_contact_tracker = false, - peer_group_pid = PeerGroupPid, control_pid = ControlPid, torrent_id = TorrentId, url = Url, @@ -216,8 +214,8 @@ handle_tracker_response(BC, none, {string, W}, S) -> handle_tracker_response(BC, none, none, S); handle_tracker_response(BC, none, none, S) -> %% Add new peers - etorrent_t_peer_group_mgr:add_peers(S#state.peer_group_pid, - response_ips(BC)), + etorrent_choker:add_peers(S#state.torrent_id, + response_ips(BC)), %% Update the state of the torrent ok = etorrent_torrent:statechange(S#state.torrent_id, {tracker_report, -- 2.11.4.GIT