1 %%%-------------------------------------------------------------------
2 %%% File : etorrent_t_peer_send.erl
3 %%% Author : Jesper Louis Andersen
4 %%% License : See COPYING
5 %%% Description : Send out events to a foreign socket.
7 %%% Created : 27 Jan 2007 by
8 %%% Jesper Louis Andersen <jesper.louis.andersen@gmail.com>
9 %%%-------------------------------------------------------------------
10 -module(etorrent_t_peer_send
).
12 -include("etorrent_mnesia_table.hrl").
13 -include("etorrent_rate.hrl").
15 -behaviour(gen_server
).
18 -export([start_link
/4,
22 local_request
/2, remote_request
/4, cancel
/4,
25 not_interested
/1, interested
/1,
29 %% gen_server callbacks
30 -export([init
/1, handle_info
/2, terminate
/2, code_change
/3,
31 handle_call
/3, handle_cast
/2]).
33 -record(state
, {socket
= none
,
38 interested
= false
, % Are we interested in the peer?
44 file_system_pid
= none
}).
46 -define(DEFAULT_KEEP_ALIVE_INTERVAL
, 120*1000). % From proto. spec.
47 -define(MAX_REQUESTS
, 1024). % Maximal number of requests a peer may make.
48 %%====================================================================
50 %%====================================================================
51 start_link(Socket
, FilesystemPid
, TorrentId
, RecvPid
) ->
52 gen_server:start_link(?MODULE
,
53 [Socket
, FilesystemPid
, TorrentId
, RecvPid
], []).
55 %%--------------------------------------------------------------------
56 %% Func: remote_request(Pid, Index, Offset, Len)
57 %% Description: The remote end (ie, the peer) requested a chunk
58 %% {Index, Offset, Len}
59 %%--------------------------------------------------------------------
60 remote_request(Pid
, Index
, Offset
, Len
) ->
61 gen_server:cast(Pid
, {remote_request
, Index
, Offset
, Len
}).
63 %%--------------------------------------------------------------------
64 %% Func: local_request(Pid, Index, Offset, Len)
65 %% Description: We request a piece from the peer: {Index, Offset, Len}
66 %%--------------------------------------------------------------------
67 local_request(Pid
, {Index
, Offset
, Size
}) ->
68 gen_server:cast(Pid
, {local_request
, {Index
, Offset
, Size
}}).
70 %%--------------------------------------------------------------------
71 %% Func: cancel(Pid, Index, Offset, Len)
72 %% Description: Cancel the {Index, Offset, Len} at the peer.
73 %%--------------------------------------------------------------------
74 cancel(Pid
, Index
, Offset
, Len
) ->
75 gen_server:cast(Pid
, {cancel_piece
, Index
, Offset
, Len
}).
77 %%--------------------------------------------------------------------
79 %% Description: Choke the peer.
80 %%--------------------------------------------------------------------
82 gen_server:cast(Pid
, choke
).
84 %%--------------------------------------------------------------------
86 %% Description: Unchoke the peer.
87 %%--------------------------------------------------------------------
89 gen_server:cast(Pid
, unchoke
).
91 check_choke(Pid
) -> gen_server:cast(Pid
, check_choke
).
93 %%--------------------------------------------------------------------
94 %% Func: not_interested(Pid)
95 %% Description: Tell the peer we are not interested in him anymore
96 %%--------------------------------------------------------------------
97 not_interested(Pid
) ->
98 gen_server:cast(Pid
, not_interested
).
101 gen_server:cast(Pid
, interested
).
103 %%--------------------------------------------------------------------
104 %% Func: send_have_piece(Pid, PieceNumber)
105 %% Description: Tell the peer we have the piece PieceNumber
106 %%--------------------------------------------------------------------
107 send_have_piece(Pid
, PieceNumber
) ->
108 gen_server:cast(Pid
, {have
, PieceNumber
}).
110 bitfield(Pid
, BitField
) ->
111 gen_server:cast(Pid
, {bitfield
, BitField
}).
114 %%--------------------------------------------------------------------
116 %% Description: Tell the send process to stop the communication.
117 %%--------------------------------------------------------------------
119 gen_server:cast(Pid
, stop
).
121 %%====================================================================
122 %% gen_server callbacks
123 %%====================================================================
124 init([Socket
, FilesystemPid
, TorrentId
, Parent
]) ->
125 process_flag(trap_exit
, true
),
126 {ok
, TRef
} = timer:send_interval(?DEFAULT_KEEP_ALIVE_INTERVAL
, self(), keep_alive_tick
),
127 {ok
, Tref2
} = timer:send_interval(?RATE_UPDATE
, self(), rate_update
),
129 #state
{socket
= Socket
,
132 request_queue
= queue:new(),
133 rate
= etorrent_rate:init(?RATE_FUDGE
),
135 torrent_id
= TorrentId
,
136 file_system_pid
= FilesystemPid
},
139 %%--------------------------------------------------------------------
140 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
141 %% {reply, Reply, State, Timeout} |
142 %% {noreply, State} |
143 %% {noreply, State, Timeout} |
144 %% {stop, Reason, Reply, State} |
145 %% {stop, Reason, State}
146 %% Description: Handling call messages
147 %%--------------------------------------------------------------------
148 handle_call(_Request
, _From
, State
) ->
150 {reply
, Reply
, State
}.
152 %%--------------------------------------------------------------------
153 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
154 %% Description: Convert process state when code is changed
155 %%--------------------------------------------------------------------
156 code_change(_OldVsn
, State
, _Extra
) ->
159 handle_info(keep_alive_tick
, S
) ->
160 send_message(keep_alive
, S
, 0);
161 handle_info(rate_update
, S
) ->
162 Rate
= etorrent_rate:update(S#state
.rate
, 0),
163 ok
= etorrent_rate_mgr:send_rate(S#state
.torrent_id
,
167 {noreply
, S#state
{ rate
= Rate
}};
168 handle_info(timeout
, S
)
169 when S#state
.choke
=:= true andalso S#state
.piece_cache
=:= none
->
172 handle_info(timeout
, S
) when S#state
.choke
=:= true
->
174 handle_info(timeout
, S
) when S#state
.choke
=:= false
->
175 case queue:out(S#state
.request_queue
) of
178 {{value
, {Index
, Offset
, Len
}}, NewQ
} ->
179 send_piece(Index
, Offset
, Len
, S#state
{ request_queue
= NewQ
} )
181 handle_info(Msg
, S
) ->
182 error_logger:info_report([got_unknown_message
, Msg
, S
]),
183 {stop
, {unknown_msg
, Msg
}}.
185 handle_cast(choke
, S
) when S#state
.choke
== true
->
187 handle_cast(choke
, S
) when S#state
.choke
== false
->
188 ok
= etorrent_rate_mgr:local_choke(S#state
.torrent_id
, S#state
.parent
),
189 send_message(choke
, S#state
{choke
= true
, piece_cache
= none
});
190 handle_cast(unchoke
, S
) when S#state
.choke
== false
->
192 handle_cast(unchoke
, S
) when S#state
.choke
== true
->
193 ok
= etorrent_rate_mgr:local_unchoke(S#state
.torrent_id
, S#state
.parent
),
194 send_message(unchoke
, S#state
{choke
= false
,
195 request_queue
= queue:new()});
196 handle_cast(check_choke
, S
) when S#state
.choke
=:= true
->
198 handle_cast(check_choke
, S
) when S#state
.choke
=:= false
->
199 ok
= etorrent_choker:perform_rechoke(),
201 handle_cast({bitfield
, BF
}, S
) ->
202 send_message({bitfield
, BF
}, S
);
203 handle_cast(not_interested
, S
) when S#state
.interested
=:= false
->
205 handle_cast(not_interested
, S
) when S#state
.interested
=:= true
->
206 send_message(not_interested
, S#state
{ interested
= false
});
207 handle_cast(interested
, S
) when S#state
.interested
=:= true
->
209 handle_cast(interested
, S
) when S#state
.interested
=:= false
->
210 send_message(interested
, S#state
{ interested
= true
});
211 handle_cast({have
, Pn
}, S
) ->
212 send_message({have
, Pn
}, S
);
213 handle_cast({local_request
, {Index
, Offset
, Size
}}, S
) ->
214 send_message({request
, Index
, Offset
, Size
}, S
);
215 handle_cast({remote_request
, _Index
, _Offset
, _Len
}, S
)
216 when S#state
.choke
== true
->
218 handle_cast({remote_request
, Index
, Offset
, Len
}, S
)
219 when S#state
.choke
== false
->
220 Requests
= queue:len(S#state
.request_queue
),
221 case Requests
> ?MAX_REQUESTS
of
223 {stop
, max_queue_len_exceeded
, S
};
225 NQ
= queue:in({Index
, Offset
, Len
}, S#state
.request_queue
),
226 {noreply
, S#state
{request_queue
= NQ
}, 0}
228 handle_cast({cancel_piece
, Index
, OffSet
, Len
}, S
) ->
229 NQ
= etorrent_utils:queue_remove({Index
, OffSet
, Len
}, S#state
.request_queue
),
230 {noreply
, S#state
{request_queue
= NQ
}, 0};
231 handle_cast(stop
, S
) ->
235 %% Terminating normally means we should inform our recv pair
236 terminate(_Reason
, S
) ->
237 timer:cancel(S#state
.timer
),
238 timer:cancel(S#state
.rate_timer
),
241 %%--------------------------------------------------------------------
242 %%% Internal functions
243 %%--------------------------------------------------------------------
245 %%--------------------------------------------------------------------
246 %% Function: send_piece_message/2
247 %% Description: Send the message Msg and handle an eventual connection
249 %%--------------------------------------------------------------------
250 send_piece_message(Msg
, S
, Timeout
) ->
251 case etorrent_peer_communication:send_message(S#state
.rate
, S#state
.socket
, Msg
) of
253 ok
= etorrent_rate_mgr:send_rate(S#state
.torrent_id
,
257 {noreply
, S#state
{ rate
= R
}, Timeout
};
258 {{error
, closed
}, R
, _Amount
} ->
259 {stop
, normal
, S#state
{ rate
= R
}}
262 send_piece(Index
, Offset
, Len
, S
) ->
263 case S#state
.piece_cache
of
264 {I
, Binary
} when I
== Index
->
265 <<_Skip:Offset
/binary, Data:Len
/binary, _R
/binary>> = Binary
,
266 Msg
= {piece
, Index
, Offset
, Data
},
267 %% Track uploaded size for torrent (for the tracker)
268 ok
= etorrent_torrent:statechange(S#state
.torrent_id
,
270 %% Track the amount uploaded by this peer.
272 send_piece_message(Msg
, S
, 0);
273 %% Update cache and try again...
274 {I
, _Binary
} when I
/= Index
->
275 NS
= load_piece(Index
, S
),
276 send_piece(Index
, Offset
, Len
, NS
);
278 NS
= load_piece(Index
, S
),
279 send_piece(Index
, Offset
, Len
, NS
)
282 load_piece(Index
, S
) ->
283 {ok
, Piece
} = etorrent_fs:read_piece(S#state
.file_system_pid
, Index
),
284 S#state
{piece_cache
= {Index
, Piece
}}.
286 send_message(Msg
, S
) ->
287 send_message(Msg
, S
, 0).
289 send_message(Msg
, S
, Timeout
) ->
290 case etorrent_peer_communication:send_message(S#state
.rate
, S#state
.socket
, Msg
) of
291 {ok
, Rate
, Amount
} ->
292 ok
= etorrent_rate_mgr:send_rate(
297 {noreply
, S#state
{ rate
= Rate
}, Timeout
};
298 {{error
, ebadf
}, R
, _Amount
} ->
299 error_logger:info_report([caught_ebadf
, S#state
.socket
]),
300 {stop
, normal
, S#state
{ rate
= R
}};
301 {{error
, closed
}, R
, _Amount
} ->
302 {stop
, normal
, S#state
{ rate
= R
}}