1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_t_peer_send.erl
3 %%% Author : Jesper Louis Andersen
4 %%% License : See COPYING
5 %%% Description : Send out events to a foreign socket.
7 %%% Created : 27 Jan 2007 by
8 %%% Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
10 -module(etorrent_t_peer_send
).
12 -include("etorrent_mnesia_table.hrl").
13 -include("etorrent_rate.hrl").
15 -behaviour(gen_server
).
18 -export([start_link
/4,
22 local_request
/2, remote_request
/4, cancel
/4,
23 choke
/1, unchoke
/1, have
/2,
25 not_interested
/1, interested
/1,
28 %% gen_server callbacks
29 -export([init
/1, handle_info
/2, terminate
/2, code_change
/3,
30 handle_call
/3, handle_cast
/2]).
32 -record(state
, {socket
= none
,
37 interested
= false
, % Are we interested in the peer?
43 file_system_pid
= none
}).
45 -define(DEFAULT_KEEP_ALIVE_INTERVAL
, 120*1000). % From proto. spec.
46 -define(MAX_REQUESTS
, 1024). % Maximal number of requests a peer may make.
47 %%====================================================================
49 %%====================================================================
50 start_link(Socket
, FilesystemPid
, TorrentId
, RecvPid
) ->
51 gen_server:start_link(?MODULE
,
52 [Socket
, FilesystemPid
, TorrentId
, RecvPid
], []).
54 %%--------------------------------------------------------------------
55 %% Func: remote_request(Pid, Index, Offset, Len)
56 %% Description: The remote end (ie, the peer) requested a chunk
57 %% {Index, Offset, Len}
58 %%--------------------------------------------------------------------
59 remote_request(Pid
, Index
, Offset
, Len
) ->
60 gen_server:cast(Pid
, {remote_request
, Index
, Offset
, Len
}).
62 %%--------------------------------------------------------------------
63 %% Func: local_request(Pid, Index, Offset, Len)
64 %% Description: We request a piece from the peer: {Index, Offset, Len}
65 %%--------------------------------------------------------------------
66 local_request(Pid
, {Index
, Offset
, Size
}) ->
67 gen_server:cast(Pid
, {local_request
, {Index
, Offset
, Size
}}).
69 %%--------------------------------------------------------------------
70 %% Func: cancel(Pid, Index, Offset, Len)
71 %% Description: Cancel the {Index, Offset, Len} at the peer.
72 %%--------------------------------------------------------------------
73 cancel(Pid
, Index
, Offset
, Len
) ->
74 gen_server:cast(Pid
, {cancel_piece
, Index
, Offset
, Len
}).
76 %%--------------------------------------------------------------------
78 %% Description: Choke the peer.
79 %%--------------------------------------------------------------------
81 gen_server:cast(Pid
, choke
).
83 %%--------------------------------------------------------------------
85 %% Description: Unchoke the peer.
86 %%--------------------------------------------------------------------
88 gen_server:cast(Pid
, unchoke
).
90 check_choke(Pid
) -> gen_server:cast(Pid
, check_choke
).
92 %%--------------------------------------------------------------------
93 %% Func: not_interested(Pid)
94 %% Description: Tell the peer we are not interested in him anymore
95 %%--------------------------------------------------------------------
96 not_interested(Pid
) ->
97 gen_server:cast(Pid
, not_interested
).
100 gen_server:cast(Pid
, interested
).
102 %%--------------------------------------------------------------------
103 %% Func: have(Pid, PieceNumber)
104 %% Description: Tell the peer we have the piece PieceNumber
105 %%--------------------------------------------------------------------
106 have(Pid
, PieceNumber
) ->
107 gen_server:cast(Pid
, {have
, PieceNumber
}).
109 bitfield(Pid
, BitField
) ->
110 gen_server:cast(Pid
, {bitfield
, BitField
}).
113 %%--------------------------------------------------------------------
115 %% Description: Tell the send process to stop the communication.
116 %%--------------------------------------------------------------------
118 gen_server:cast(Pid
, stop
).
120 %%====================================================================
121 %% gen_server callbacks
122 %%====================================================================
123 init([Socket
, FilesystemPid
, TorrentId
, Parent
]) ->
124 process_flag(trap_exit
, true
),
125 {ok
, TRef
} = timer:send_interval(?DEFAULT_KEEP_ALIVE_INTERVAL
, self(), keep_alive_tick
),
126 {ok
, Tref2
} = timer:send_interval(?RATE_UPDATE
, self(), rate_update
),
128 #state
{socket
= Socket
,
131 request_queue
= queue:new(),
132 rate
= etorrent_rate:init(?RATE_FUDGE
),
134 torrent_id
= TorrentId
,
135 file_system_pid
= FilesystemPid
},
138 %%--------------------------------------------------------------------
139 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
140 %% {reply, Reply, State, Timeout} |
141 %% {noreply, State} |
142 %% {noreply, State, Timeout} |
143 %% {stop, Reason, Reply, State} |
144 %% {stop, Reason, State}
145 %% Description: Handling call messages
146 %%--------------------------------------------------------------------
147 handle_call(_Request
, _From
, State
) ->
149 {reply
, Reply
, State
}.
151 %%--------------------------------------------------------------------
152 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
153 %% Description: Convert process state when code is changed
154 %%--------------------------------------------------------------------
155 code_change(_OldVsn
, State
, _Extra
) ->
158 handle_info(keep_alive_tick
, S
) ->
159 send_message(keep_alive
, S
, 0);
160 handle_info(rate_update
, S
) ->
161 Rate
= etorrent_rate:update(S#state
.rate
, 0),
162 ok
= etorrent_rate_mgr:send_rate(S#state
.torrent_id
,
166 {noreply
, S#state
{ rate
= Rate
}};
167 handle_info(timeout
, S
)
168 when S#state
.choke
=:= true andalso S#state
.piece_cache
=:= none
->
171 handle_info(timeout
, S
) when S#state
.choke
=:= true
->
173 handle_info(timeout
, S
) when S#state
.choke
=:= false
->
174 case queue:out(S#state
.request_queue
) of
177 {{value
, {Index
, Offset
, Len
}}, NewQ
} ->
178 send_piece(Index
, Offset
, Len
, S#state
{ request_queue
= NewQ
} )
180 handle_info(Msg
, S
) ->
181 error_logger:info_report([got_unknown_message
, Msg
, S
]),
182 {stop
, {unknown_msg
, Msg
}}.
184 handle_cast(choke
, S
) when S#state
.choke
== true
->
186 handle_cast(choke
, S
) when S#state
.choke
== false
->
187 ok
= etorrent_rate_mgr:local_choke(S#state
.torrent_id
, S#state
.parent
),
188 send_message(choke
, S#state
{choke
= true
, piece_cache
= none
});
189 handle_cast(unchoke
, S
) when S#state
.choke
== false
->
191 handle_cast(unchoke
, S
) when S#state
.choke
== true
->
192 ok
= etorrent_rate_mgr:local_unchoke(S#state
.torrent_id
, S#state
.parent
),
193 send_message(unchoke
, S#state
{choke
= false
,
194 request_queue
= queue:new()});
195 handle_cast(check_choke
, S
) when S#state
.choke
=:= true
->
197 handle_cast(check_choke
, S
) when S#state
.choke
=:= false
->
198 ok
= etorrent_choker:perform_rechoke(),
200 handle_cast({bitfield
, BF
}, S
) ->
201 send_message({bitfield
, BF
}, S
);
202 handle_cast(not_interested
, S
) when S#state
.interested
=:= false
->
204 handle_cast(not_interested
, S
) when S#state
.interested
=:= true
->
205 send_message(not_interested
, S#state
{ interested
= false
});
206 handle_cast(interested
, S
) when S#state
.interested
=:= true
->
208 handle_cast(interested
, S
) when S#state
.interested
=:= false
->
209 send_message(interested
, S#state
{ interested
= true
});
210 handle_cast({have
, Pn
}, S
) ->
211 send_message({have
, Pn
}, S
);
212 handle_cast({local_request
, {Index
, Offset
, Size
}}, S
) ->
213 send_message({request
, Index
, Offset
, Size
}, S
);
214 handle_cast({remote_request
, _Index
, _Offset
, _Len
}, S
)
215 when S#state
.choke
== true
->
217 handle_cast({remote_request
, Index
, Offset
, Len
}, S
)
218 when S#state
.choke
== false
->
219 Requests
= queue:len(S#state
.request_queue
),
220 case Requests
> ?MAX_REQUESTS
of
222 {stop
, max_queue_len_exceeded
, S
};
224 NQ
= queue:in({Index
, Offset
, Len
}, S#state
.request_queue
),
225 {noreply
, S#state
{request_queue
= NQ
}, 0}
227 handle_cast({cancel_piece
, Index
, OffSet
, Len
}, S
) ->
228 NQ
= etorrent_utils:queue_remove({Index
, OffSet
, Len
}, S#state
.request_queue
),
229 {noreply
, S#state
{request_queue
= NQ
}, 0};
230 handle_cast(stop
, S
) ->
234 %% Terminating normally means we should inform our recv pair
235 terminate(_Reason
, S
) ->
236 timer:cancel(S#state
.timer
),
237 timer:cancel(S#state
.rate_timer
),
240 %%--------------------------------------------------------------------
241 %%% Internal functions
242 %%--------------------------------------------------------------------
244 %%--------------------------------------------------------------------
245 %% Function: send_piece_message/2
246 %% Description: Send the message Msg and handle an eventual connection
248 %%--------------------------------------------------------------------
249 send_piece_message(Msg
, S
, Timeout
) ->
250 case etorrent_peer_communication:send_message(S#state
.rate
, S#state
.socket
, Msg
) of
252 ok
= etorrent_rate_mgr:send_rate(S#state
.torrent_id
,
256 {noreply
, S#state
{ rate
= R
}, Timeout
};
257 {{error
, closed
}, R
, _Amount
} ->
258 {stop
, normal
, S#state
{ rate
= R
}}
261 send_piece(Index
, Offset
, Len
, S
) ->
262 case S#state
.piece_cache
of
263 {I
, Binary
} when I
== Index
->
264 <<_Skip:Offset
/binary, Data:Len
/binary, _R
/binary>> = Binary
,
265 Msg
= {piece
, Index
, Offset
, Data
},
266 %% Track uploaded size for torrent (for the tracker)
267 ok
= etorrent_torrent:statechange(S#state
.torrent_id
,
269 %% Track the amount uploaded by this peer.
271 send_piece_message(Msg
, S
, 0);
272 %% Update cache and try again...
273 {I
, _Binary
} when I
/= Index
->
274 NS
= load_piece(Index
, S
),
275 send_piece(Index
, Offset
, Len
, NS
);
277 NS
= load_piece(Index
, S
),
278 send_piece(Index
, Offset
, Len
, NS
)
281 load_piece(Index
, S
) ->
282 {ok
, Piece
} = etorrent_fs:read_piece(S#state
.file_system_pid
, Index
),
283 S#state
{piece_cache
= {Index
, Piece
}}.
285 send_message(Msg
, S
) ->
286 send_message(Msg
, S
, 0).
288 send_message(Msg
, S
, Timeout
) ->
289 case etorrent_peer_communication:send_message(S#state
.rate
, S#state
.socket
, Msg
) of
290 {ok
, Rate
, Amount
} ->
291 ok
= etorrent_rate_mgr:send_rate(
296 {noreply
, S#state
{ rate
= Rate
}, Timeout
};
297 {{error
, ebadf
}, R
, _Amount
} ->
298 error_logger:info_report([caught_ebadf
, S#state
.socket
]),
299 {stop
, normal
, S#state
{ rate
= R
}};
300 {{error
, closed
}, R
, _Amount
} ->
301 {stop
, normal
, S#state
{ rate
= R
}}