3 Copyright (c) 2003, Arvid Norberg
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #include "libtorrent/pch.hpp"
39 #include <boost/bind.hpp>
41 #include "libtorrent/peer_connection.hpp"
42 #include "libtorrent/identify_client.hpp"
43 #include "libtorrent/entry.hpp"
44 #include "libtorrent/bencode.hpp"
45 #include "libtorrent/alert_types.hpp"
46 #include "libtorrent/invariant_check.hpp"
47 #include "libtorrent/io.hpp"
48 #include "libtorrent/file.hpp"
49 #include "libtorrent/version.hpp"
50 #include "libtorrent/extensions.hpp"
51 #include "libtorrent/aux_/session_impl.hpp"
52 #include "libtorrent/policy.hpp"
53 #include "libtorrent/socket_type.hpp"
54 #include "libtorrent/assert.hpp"
56 //#define TORRENT_CORRUPT_DATA
59 using boost::shared_ptr
;
60 using libtorrent::aux::session_impl
;
64 // outbound connection
65 peer_connection::peer_connection(
67 , boost::weak_ptr
<torrent
> tor
68 , shared_ptr
<socket_type
> s
69 , tcp::endpoint
const& endp
70 , policy::peer
* peerinfo
)
73 m_last_choke(time_now() - hours(1))
77 , m_max_out_request_queue(m_ses
.settings().max_out_request_queue
)
78 , m_last_piece(time_now())
79 , m_last_request(time_now())
80 , m_last_incoming_request(min_time())
81 , m_last_unchoke(min_time())
82 , m_last_receive(time_now())
83 , m_last_sent(time_now())
84 , m_requested(min_time())
86 , m_remote_dl_update(time_now())
87 , m_connect(time_now())
88 , m_became_uninterested(time_now())
89 , m_became_uninteresting(time_now())
91 , m_downloaded_at_last_unchoke(0)
92 , m_disk_recv_buffer(ses
, 0)
97 , m_timeout(m_ses
.settings().peer_timeout
)
100 , m_disk_recv_buffer_size(0)
102 , m_num_invalid_requests(0)
104 , m_upload_limit(bandwidth_limit::inf
)
105 , m_download_limit(bandwidth_limit::inf
)
106 , m_peer_info(peerinfo
)
108 , m_connection_ticket(-1)
109 , m_remote_bytes_dled(0)
110 , m_remote_dl_rate(0)
111 , m_outstanding_writing_bytes(0)
112 , m_download_rate_peak(0)
113 , m_upload_rate_peak(0)
115 , m_prefer_whole_pieces(0)
116 , m_desired_queue_size(2)
117 , m_fast_reconnect(false)
119 , m_peer_interested(false)
120 , m_peer_choked(true)
121 , m_interesting(false)
124 , m_ignore_bandwidth_limits(false)
126 , m_disconnecting(false)
129 , m_request_large_blocks(false)
130 , m_upload_only(false)
132 , m_bitfield_received(false)
134 , m_in_constructor(true)
135 , m_disconnect_started(false)
136 , m_initialized(false)
139 m_channel_state
[upload_channel
] = peer_info::bw_idle
;
140 m_channel_state
[download_channel
] = peer_info::bw_idle
;
142 TORRENT_ASSERT(peerinfo
== 0 || peerinfo
->banned
== false);
143 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
144 std::fill(m_country
, m_country
+ 2, 0);
145 #ifndef TORRENT_DISABLE_GEO_IP
146 if (m_ses
.has_country_db())
148 char const *country
= m_ses
.country_for_ip(m_remote
.address());
151 m_country
[0] = country
[0];
152 m_country
[1] = country
[1];
157 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
158 m_logger
= m_ses
.create_log(m_remote
.address().to_string() + "_"
159 + boost::lexical_cast
<std::string
>(m_remote
.port()), m_ses
.listen_port());
160 (*m_logger
) << "*** OUTGOING CONNECTION\n";
163 piece_failed
= false;
165 #ifndef TORRENT_DISABLE_GEO_IP
166 m_inet_as_name
= m_ses
.as_name_for_ip(m_remote
.address());
169 std::fill(m_peer_id
.begin(), m_peer_id
.end(), 0);
172 // incoming connection
173 peer_connection::peer_connection(
175 , shared_ptr
<socket_type
> s
176 , tcp::endpoint
const& endp
177 , policy::peer
* peerinfo
)
180 m_last_choke(time_now() - hours(1))
184 , m_max_out_request_queue(m_ses
.settings().max_out_request_queue
)
185 , m_last_piece(time_now())
186 , m_last_request(time_now())
187 , m_last_incoming_request(min_time())
188 , m_last_unchoke(min_time())
189 , m_last_receive(time_now())
190 , m_last_sent(time_now())
191 , m_requested(min_time())
192 , m_timeout_extend(0)
193 , m_remote_dl_update(time_now())
194 , m_connect(time_now())
195 , m_became_uninterested(time_now())
196 , m_became_uninteresting(time_now())
198 , m_downloaded_at_last_unchoke(0)
199 , m_disk_recv_buffer(ses
, 0)
203 , m_timeout(m_ses
.settings().peer_timeout
)
206 , m_disk_recv_buffer_size(0)
208 , m_num_invalid_requests(0)
210 , m_upload_limit(bandwidth_limit::inf
)
211 , m_download_limit(bandwidth_limit::inf
)
212 , m_peer_info(peerinfo
)
214 , m_connection_ticket(-1)
215 , m_remote_bytes_dled(0)
216 , m_remote_dl_rate(0)
217 , m_outstanding_writing_bytes(0)
218 , m_download_rate_peak(0)
219 , m_upload_rate_peak(0)
221 , m_prefer_whole_pieces(0)
222 , m_desired_queue_size(2)
223 , m_fast_reconnect(false)
225 , m_peer_interested(false)
226 , m_peer_choked(true)
227 , m_interesting(false)
230 , m_ignore_bandwidth_limits(false)
232 , m_disconnecting(false)
233 , m_connecting(false)
235 , m_request_large_blocks(false)
236 , m_upload_only(false)
238 , m_bitfield_received(false)
240 , m_in_constructor(true)
241 , m_disconnect_started(false)
242 , m_initialized(false)
245 m_channel_state
[upload_channel
] = peer_info::bw_idle
;
246 m_channel_state
[download_channel
] = peer_info::bw_idle
;
248 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
249 std::fill(m_country
, m_country
+ 2, 0);
250 #ifndef TORRENT_DISABLE_GEO_IP
251 if (m_ses
.has_country_db())
253 char const *country
= m_ses
.country_for_ip(m_remote
.address());
256 m_country
[0] = country
[0];
257 m_country
[1] = country
[1];
263 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
265 TORRENT_ASSERT(m_socket
->remote_endpoint(ec
) == m_remote
|| ec
);
266 m_logger
= m_ses
.create_log(remote().address().to_string(ec
) + "_"
267 + boost::lexical_cast
<std::string
>(remote().port()), m_ses
.listen_port());
268 (*m_logger
) << "*** INCOMING CONNECTION\n";
271 #ifndef TORRENT_DISABLE_GEO_IP
272 m_inet_as_name
= m_ses
.as_name_for_ip(m_remote
.address());
275 piece_failed
= false;
277 std::fill(m_peer_id
.begin(), m_peer_id
.end(), 0);
280 bool peer_connection::unchoke_compare(boost::intrusive_ptr
<peer_connection
const> const& p
) const
283 peer_connection
const& rhs
= *p
;
285 // first compare how many bytes they've sent us
286 size_type c1
= m_statistics
.total_payload_download() - m_downloaded_at_last_unchoke
;
287 size_type c2
= rhs
.m_statistics
.total_payload_download() - rhs
.m_downloaded_at_last_unchoke
;
288 if (c1
> c2
) return true;
289 if (c1
< c2
) return false;
291 // if they are equal, compare how much we have uploaded
292 if (m_peer_info
) c1
= m_peer_info
->total_upload();
293 else c1
= m_statistics
.total_payload_upload();
294 if (rhs
.m_peer_info
) c2
= rhs
.m_peer_info
->total_upload();
295 else c2
= rhs
.m_statistics
.total_payload_upload();
300 void peer_connection::reset_choke_counters()
302 m_downloaded_at_last_unchoke
= m_statistics
.total_payload_download();
305 void peer_connection::start()
307 TORRENT_ASSERT(m_peer_info
== 0 || m_peer_info
->connection
== this);
308 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
312 tcp::socket::non_blocking_io
ioc(true);
314 m_socket
->io_control(ioc
, ec
);
317 disconnect(ec
.message().c_str());
320 m_remote
= m_socket
->remote_endpoint(ec
);
323 disconnect(ec
.message().c_str());
326 if (m_remote
.address().is_v4())
327 m_socket
->set_option(type_of_service(m_ses
.settings().peer_tos
), ec
);
329 else if (t
->ready_for_connections())
335 void peer_connection::update_interest()
337 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
340 // if m_have_piece is 0, it means the connections
341 // have not been initialized yet. The interested
342 // flag will be updated once they are.
343 if (m_have_piece
.size() == 0) return;
344 if (!t
->ready_for_connections()) return;
346 bool interested
= false;
347 if (!t
->is_finished())
349 piece_picker
const& p
= t
->picker();
350 int num_pieces
= p
.num_pieces();
351 for (int j
= 0; j
!= num_pieces
; ++j
)
354 && t
->piece_priority(j
) > 0
364 if (!interested
) send_not_interested();
365 else t
->get_policy().peer_is_interesting(*this);
367 // may throw an asio error if socket has disconnected
368 catch (std::exception
&) {}
370 TORRENT_ASSERT(in_handshake() || is_interesting() == interested
);
373 #ifndef TORRENT_DISABLE_EXTENSIONS
374 void peer_connection::add_extension(boost::shared_ptr
<peer_plugin
> ext
)
376 m_extensions
.push_back(ext
);
380 void peer_connection::send_allowed_set()
384 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
387 int num_allowed_pieces
= m_ses
.settings().allowed_fast_set_size
;
388 int num_pieces
= t
->torrent_file().num_pieces();
390 if (num_allowed_pieces
>= num_pieces
)
392 for (int i
= 0; i
< num_pieces
; ++i
)
394 #ifdef TORRENT_VERBOSE_LOGGING
395 (*m_logger
) << time_now_string()
396 << " ==> ALLOWED_FAST [ " << i
<< " ]\n";
399 m_accept_fast
.insert(i
);
405 address
const& addr
= m_remote
.address();
408 address_v4::bytes_type bytes
= addr
.to_v4().to_bytes();
409 x
.assign((char*)&bytes
[0], bytes
.size());
413 address_v6::bytes_type bytes
= addr
.to_v6().to_bytes();
414 x
.assign((char*)&bytes
[0], bytes
.size());
416 x
.append((char*)&t
->torrent_file().info_hash()[0], 20);
418 sha1_hash hash
= hasher(&x
[0], x
.size()).final();
421 char* p
= (char*)&hash
[0];
422 for (int i
= 0; i
< 5; ++i
)
424 int piece
= detail::read_uint32(p
) % num_pieces
;
425 if (m_accept_fast
.find(piece
) == m_accept_fast
.end())
427 #ifdef TORRENT_VERBOSE_LOGGING
428 (*m_logger
) << time_now_string()
429 << " ==> ALLOWED_FAST [ " << piece
<< " ]\n";
431 write_allow_fast(piece
);
432 m_accept_fast
.insert(piece
);
433 if (int(m_accept_fast
.size()) >= num_allowed_pieces
434 || int(m_accept_fast
.size()) == num_pieces
) return;
437 hash
= hasher((char*)&hash
[0], 20).final();
441 void peer_connection::on_metadata_impl()
443 boost::shared_ptr
<torrent
> t
= associated_torrent().lock();
444 m_have_piece
.resize(t
->torrent_file().num_pieces(), m_have_all
);
445 m_num_pieces
= m_have_piece
.count();
446 if (m_num_pieces
== int(m_have_piece
.size()))
448 #ifdef TORRENT_VERBOSE_LOGGING
449 (*m_logger
) << time_now_string()
450 << " *** on_metadata(): THIS IS A SEED ***\n";
452 // if this is a web seed. we don't have a peer_info struct
453 if (m_peer_info
) m_peer_info
->seed
= true;
454 m_upload_only
= true;
457 disconnect_if_redundant();
458 if (m_disconnecting
) return;
461 if (m_disconnecting
) return;
463 if (!t
->is_finished())
464 t
->get_policy().peer_is_interesting(*this);
468 TORRENT_ASSERT(!m_have_all
);
471 if (m_disconnecting
) return;
473 // let the torrent know which pieces the
475 // if we're a seed, we don't keep track of piece availability
476 bool interesting
= false;
479 t
->peer_has(m_have_piece
);
481 for (int i
= 0; i
< (int)m_have_piece
.size(); ++i
)
485 if (!t
->have_piece(i
) && t
->picker().piece_priority(i
) != 0)
491 if (interesting
) t
->get_policy().peer_is_interesting(*this);
492 else if (upload_only()) disconnect("upload to upload connections");
495 void peer_connection::init()
499 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
501 TORRENT_ASSERT(t
->valid_metadata());
502 TORRENT_ASSERT(t
->ready_for_connections());
504 m_have_piece
.resize(t
->torrent_file().num_pieces(), m_have_all
);
506 if (m_have_all
) m_num_pieces
= t
->torrent_file().num_pieces();
508 m_initialized
= true;
510 // now that we have a piece_picker,
511 // update it with this peer's pieces
513 TORRENT_ASSERT(m_num_pieces
== m_have_piece
.count());
515 if (m_num_pieces
== int(m_have_piece
.size()))
517 #ifdef TORRENT_VERBOSE_LOGGING
518 (*m_logger
) << " *** THIS IS A SEED ***\n";
520 // if this is a web seed. we don't have a peer_info struct
521 if (m_peer_info
) m_peer_info
->seed
= true;
522 m_upload_only
= true;
525 if (t
->is_finished()) send_not_interested();
526 else t
->get_policy().peer_is_interesting(*this);
530 // if we're a seed, we don't keep track of piece availability
533 t
->peer_has(m_have_piece
);
534 bool interesting
= false;
535 for (int i
= 0; i
< int(m_have_piece
.size()); ++i
)
539 // if the peer has a piece and we don't, the peer is interesting
540 if (!t
->have_piece(i
)
541 && t
->picker().piece_priority(i
) != 0)
545 if (interesting
) t
->get_policy().peer_is_interesting(*this);
546 else send_not_interested();
554 peer_connection::~peer_connection()
557 TORRENT_ASSERT(!m_in_constructor
);
558 TORRENT_ASSERT(m_disconnecting
);
559 TORRENT_ASSERT(m_disconnect_started
);
561 m_disk_recv_buffer_size
= 0;
563 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
566 (*m_logger
) << time_now_string()
567 << " *** CONNECTION CLOSED\n";
570 TORRENT_ASSERT(!m_ses
.has_peer(this));
572 for (aux::session_impl::torrent_map::const_iterator i
= m_ses
.m_torrents
.begin()
573 , end(m_ses
.m_torrents
.end()); i
!= end
; ++i
)
574 TORRENT_ASSERT(!i
->second
->has_peer(this));
576 TORRENT_ASSERT(m_peer_info
->connection
== 0);
578 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
582 int peer_connection::picker_options() const
585 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
589 if (t
->is_sequential_download())
591 ret
|= piece_picker::sequential
;
593 else if (t
->num_have() < t
->settings().initial_picker_threshold
)
595 // if we have fewer pieces than a certain threshols
596 // don't pick rare pieces, just pick random ones,
597 // and prioritize finishing them
598 ret
|= piece_picker::prioritize_partials
;
602 ret
|= piece_picker::rarest_first
;
607 // snubbed peers should request
608 // the common pieces first, just to make
609 // it more likely for all snubbed peers to
610 // request blocks from the same piece
611 ret
|= piece_picker::reverse
;
614 if (t
->settings().prioritize_partial_pieces
)
615 ret
|= piece_picker::prioritize_partials
;
617 if (on_parole()) ret
|= piece_picker::on_parole
618 | piece_picker::prioritize_partials
;
620 // only one of rarest_first, common_first and sequential can be set.
621 TORRENT_ASSERT(bool(ret
& piece_picker::rarest_first
)
622 + bool(ret
& piece_picker::sequential
) <= 1);
626 void peer_connection::fast_reconnect(bool r
)
628 if (!peer_info_struct() || peer_info_struct()->fast_reconnects
> 1)
630 m_fast_reconnect
= r
;
631 peer_info_struct()->connected
= time_now()
632 - seconds(m_ses
.settings().min_reconnect_time
633 * m_ses
.settings().max_failcount
);
634 ++peer_info_struct()->fast_reconnects
;
637 void peer_connection::announce_piece(int index
)
639 // dont announce during handshake
640 if (in_handshake()) return;
642 // remove suggested pieces that we have
643 std::vector
<int>::iterator i
= std::find(
644 m_suggested_pieces
.begin(), m_suggested_pieces
.end(), index
);
645 if (i
!= m_suggested_pieces
.end()) m_suggested_pieces
.erase(i
);
647 if (has_piece(index
))
649 // if we got a piece that this peer has
650 // it might have been the last interesting
651 // piece this peer had. We might not be
652 // interested anymore
654 if (is_disconnecting()) return;
656 // optimization, don't send have messages
657 // to peers that already have the piece
658 if (!m_ses
.settings().send_redundant_have
)
660 #ifdef TORRENT_VERBOSE_LOGGING
661 (*m_logger
) << time_now_string()
662 << " ==> HAVE [ piece: " << index
<< " ] SUPRESSED\n";
668 #ifdef TORRENT_VERBOSE_LOGGING
669 (*m_logger
) << time_now_string()
670 << " ==> HAVE [ piece: " << index
<< "]\n";
674 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
676 TORRENT_ASSERT(t
->have_piece(index
));
680 bool peer_connection::has_piece(int i
) const
682 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
684 TORRENT_ASSERT(t
->valid_metadata());
685 TORRENT_ASSERT(i
>= 0);
686 TORRENT_ASSERT(i
< t
->torrent_file().num_pieces());
687 return m_have_piece
[i
];
690 std::deque
<piece_block
> const& peer_connection::request_queue() const
692 return m_request_queue
;
695 std::deque
<pending_block
> const& peer_connection::download_queue() const
697 return m_download_queue
;
700 std::deque
<peer_request
> const& peer_connection::upload_queue() const
705 void peer_connection::add_stat(size_type downloaded
, size_type uploaded
)
707 m_statistics
.add_stat(downloaded
, uploaded
);
710 bitfield
const& peer_connection::get_bitfield() const
715 void peer_connection::received_valid_data(int index
)
719 #ifndef TORRENT_DISABLE_EXTENSIONS
720 for (extension_list_t::iterator i
= m_extensions
.begin()
721 , end(m_extensions
.end()); i
!= end
; ++i
)
723 #ifdef BOOST_NO_EXCEPTIONS
724 (*i
)->on_piece_pass(index
);
726 try { (*i
)->on_piece_pass(index
); } catch (std::exception
&) {}
732 void peer_connection::received_invalid_data(int index
)
736 #ifndef TORRENT_DISABLE_EXTENSIONS
737 for (extension_list_t::iterator i
= m_extensions
.begin()
738 , end(m_extensions
.end()); i
!= end
; ++i
)
740 #ifdef BOOST_NO_EXCEPTIONS
741 (*i
)->on_piece_failed(index
);
743 try { (*i
)->on_piece_failed(index
); } catch (std::exception
&) {}
747 if (is_disconnecting()) return;
749 if (peer_info_struct())
751 if (m_ses
.settings().use_parole_mode
)
752 peer_info_struct()->on_parole
= true;
754 ++peer_info_struct()->hashfails
;
755 boost::int8_t& trust_points
= peer_info_struct()->trust_points
;
757 // we decrease more than we increase, to keep the
758 // allowed failed/passed ratio low.
759 // TODO: make this limit user settable
761 if (trust_points
< -7) trust_points
= -7;
765 size_type
peer_connection::total_free_upload() const
767 return m_free_upload
;
770 void peer_connection::add_free_upload(size_type free_upload
)
774 m_free_upload
+= free_upload
;
777 // verifies a piece to see if it is valid (is within a valid range)
778 // and if it can correspond to a request generated by libtorrent.
779 bool peer_connection::verify_piece(const peer_request
& p
) const
783 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
786 TORRENT_ASSERT(t
->valid_metadata());
787 torrent_info
const& ti
= t
->torrent_file();
790 && p
.piece
< t
->torrent_file().num_pieces()
793 && (p
.length
== t
->block_size()
794 || (p
.length
< t
->block_size()
795 && p
.piece
== ti
.num_pieces()-1
796 && p
.start
+ p
.length
== ti
.piece_size(p
.piece
))
797 || (m_request_large_blocks
798 && p
.length
<= ti
.piece_length() * m_prefer_whole_pieces
== 0 ?
799 1 : m_prefer_whole_pieces
))
800 && p
.piece
* size_type(ti
.piece_length()) + p
.start
+ p
.length
802 && (p
.start
% t
->block_size() == 0);
805 void peer_connection::attach_to_torrent(sha1_hash
const& ih
)
809 TORRENT_ASSERT(!m_disconnecting
);
810 TORRENT_ASSERT(m_torrent
.expired());
811 boost::weak_ptr
<torrent
> wpt
= m_ses
.find_torrent(ih
);
812 boost::shared_ptr
<torrent
> t
= wpt
.lock();
814 if (t
&& t
->is_aborted())
816 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
817 (*m_logger
) << " *** the torrent has been aborted\n";
824 // we couldn't find the torrent!
825 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
826 (*m_logger
) << " *** couldn't find a torrent with the given info_hash: " << ih
<< "\n";
827 (*m_logger
) << " torrents:\n";
828 session_impl::torrent_map
const& torrents
= m_ses
.m_torrents
;
829 for (session_impl::torrent_map::const_iterator i
= torrents
.begin()
830 , end(torrents
.end()); i
!= end
; ++i
)
832 (*m_logger
) << " " << i
->second
->torrent_file().info_hash() << "\n";
835 disconnect("got invalid info-hash", 2);
841 // paused torrents will not accept
842 // incoming connections
843 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
844 (*m_logger
) << " rejected connection to paused torrent\n";
846 disconnect("connection rejected bacause torrent is paused");
850 TORRENT_ASSERT(m_torrent
.expired());
851 // check to make sure we don't have another connection with the same
852 // info_hash and peer_id. If we do. close this connection.
857 t
->attach_peer(this);
860 catch (std::exception
& e
)
862 std::cout
<< e
.what() << std::endl
;
863 TORRENT_ASSERT(false);
866 if (m_disconnecting
) return;
869 TORRENT_ASSERT(!m_torrent
.expired());
871 // if the torrent isn't ready to accept
872 // connections yet, we'll have to wait with
873 // our initialization
874 if (t
->ready_for_connections()) init();
876 TORRENT_ASSERT(!m_torrent
.expired());
878 // assume the other end has no pieces
879 // if we don't have valid metadata yet,
880 // leave the vector unallocated
881 TORRENT_ASSERT(m_num_pieces
== 0);
882 m_have_piece
.clear_all();
883 TORRENT_ASSERT(!m_torrent
.expired());
888 // -----------------------------
889 // --------- KEEPALIVE ---------
890 // -----------------------------
892 void peer_connection::incoming_keepalive()
896 #ifdef TORRENT_VERBOSE_LOGGING
897 (*m_logger
) << time_now_string() << " <== KEEPALIVE\n";
901 // -----------------------------
902 // ----------- CHOKE -----------
903 // -----------------------------
905 void peer_connection::incoming_choke()
909 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
912 #ifndef TORRENT_DISABLE_EXTENSIONS
913 for (extension_list_t::iterator i
= m_extensions
.begin()
914 , end(m_extensions
.end()); i
!= end
; ++i
)
916 if ((*i
)->on_choke()) return;
919 if (is_disconnecting()) return;
921 #ifdef TORRENT_VERBOSE_LOGGING
922 (*m_logger
) << time_now_string() << " <== CHOKE\n";
924 m_peer_choked
= true;
926 if (peer_info_struct() == 0 || !peer_info_struct()->on_parole
)
928 // if the peer is not in parole mode, clear the queued
932 piece_picker
& p
= t
->picker();
933 for (std::deque
<piece_block
>::const_iterator i
= m_request_queue
.begin()
934 , end(m_request_queue
.end()); i
!= end
; ++i
)
936 // since this piece was skipped, clear it and allow it to
937 // be requested from other peers
938 p
.abort_download(*i
);
941 m_request_queue
.clear();
945 bool match_request(peer_request
const& r
, piece_block
const& b
, int block_size
)
947 if (b
.piece_index
!= r
.piece
) return false;
948 if (b
.block_index
!= r
.start
/ block_size
) return false;
949 if (r
.start
% block_size
!= 0) return false;
953 // -----------------------------
954 // -------- REJECT PIECE -------
955 // -----------------------------
957 void peer_connection::incoming_reject_request(peer_request
const& r
)
961 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
964 #ifndef TORRENT_DISABLE_EXTENSIONS
965 for (extension_list_t::iterator i
= m_extensions
.begin()
966 , end(m_extensions
.end()); i
!= end
; ++i
)
968 if ((*i
)->on_reject(r
)) return;
972 if (is_disconnecting()) return;
974 std::deque
<pending_block
>::iterator i
= std::find_if(
975 m_download_queue
.begin(), m_download_queue
.end()
976 , bind(match_request
, boost::cref(r
), bind(&pending_block::block
, _1
)
979 #ifdef TORRENT_VERBOSE_LOGGING
980 (*m_logger
) << time_now_string()
981 << " <== REJECT_PIECE [ piece: " << r
.piece
<< " | s: " << r
.start
<< " | l: " << r
.length
<< " ]\n";
984 piece_block
b(-1, 0);
985 if (i
!= m_download_queue
.end())
988 m_download_queue
.erase(i
);
990 // if the peer is in parole mode, keep the request
991 if (peer_info_struct() && peer_info_struct()->on_parole
)
993 m_request_queue
.push_front(b
);
995 else if (!t
->is_seed())
997 piece_picker
& p
= t
->picker();
1001 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1004 (*m_logger
) << time_now_string()
1005 << " *** PIECE NOT IN REQUEST QUEUE\n";
1008 if (has_peer_choked())
1010 // if we're choked and we got a rejection of
1011 // a piece in the allowed fast set, remove it
1012 // from the allow fast set.
1013 std::vector
<int>::iterator i
= std::find(
1014 m_allowed_fast
.begin(), m_allowed_fast
.end(), r
.piece
);
1015 if (i
!= m_allowed_fast
.end()) m_allowed_fast
.erase(i
);
1019 std::vector
<int>::iterator i
= std::find(m_suggested_pieces
.begin()
1020 , m_suggested_pieces
.end(), r
.piece
);
1021 if (i
!= m_suggested_pieces
.end())
1022 m_suggested_pieces
.erase(i
);
1025 if (m_request_queue
.empty() && m_download_queue
.size() < 2)
1027 request_a_block(*t
, *this);
1028 send_block_requests();
1032 // -----------------------------
1033 // ------- SUGGEST PIECE -------
1034 // -----------------------------
1036 void peer_connection::incoming_suggest(int index
)
1040 #ifdef TORRENT_VERBOSE_LOGGING
1041 (*m_logger
) << time_now_string()
1042 << " <== SUGGEST_PIECE [ piece: " << index
<< " ]\n";
1044 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1047 #ifndef TORRENT_DISABLE_EXTENSIONS
1048 for (extension_list_t::iterator i
= m_extensions
.begin()
1049 , end(m_extensions
.end()); i
!= end
; ++i
)
1051 if ((*i
)->on_suggest(index
)) return;
1055 if (is_disconnecting()) return;
1056 if (t
->have_piece(index
)) return;
1058 if (m_suggested_pieces
.size() > 9)
1059 m_suggested_pieces
.erase(m_suggested_pieces
.begin());
1060 m_suggested_pieces
.push_back(index
);
1062 #ifdef TORRENT_VERBOSE_LOGGING
1063 (*m_logger
) << time_now_string()
1064 << " ** SUGGEST_PIECE [ piece: " << index
<< " added to set: " << m_suggested_pieces
.size() << " ]\n";
1068 // -----------------------------
1069 // ---------- UNCHOKE ----------
1070 // -----------------------------
1072 void peer_connection::incoming_unchoke()
1076 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1079 #ifndef TORRENT_DISABLE_EXTENSIONS
1080 for (extension_list_t::iterator i
= m_extensions
.begin()
1081 , end(m_extensions
.end()); i
!= end
; ++i
)
1083 if ((*i
)->on_unchoke()) return;
1087 #ifdef TORRENT_VERBOSE_LOGGING
1088 (*m_logger
) << time_now_string() << " <== UNCHOKE\n";
1090 m_peer_choked
= false;
1091 if (is_disconnecting()) return;
1093 t
->get_policy().unchoked(*this);
1096 // -----------------------------
1097 // -------- INTERESTED ---------
1098 // -----------------------------
1100 void peer_connection::incoming_interested()
1104 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1107 #ifndef TORRENT_DISABLE_EXTENSIONS
1108 for (extension_list_t::iterator i
= m_extensions
.begin()
1109 , end(m_extensions
.end()); i
!= end
; ++i
)
1111 if ((*i
)->on_interested()) return;
1115 #ifdef TORRENT_VERBOSE_LOGGING
1116 (*m_logger
) << time_now_string() << " <== INTERESTED\n";
1118 m_peer_interested
= true;
1119 if (is_disconnecting()) return;
1120 t
->get_policy().interested(*this);
1123 // -----------------------------
1124 // ------ NOT INTERESTED -------
1125 // -----------------------------
1127 void peer_connection::incoming_not_interested()
1131 #ifndef TORRENT_DISABLE_EXTENSIONS
1132 for (extension_list_t::iterator i
= m_extensions
.begin()
1133 , end(m_extensions
.end()); i
!= end
; ++i
)
1135 if ((*i
)->on_not_interested()) return;
1139 m_became_uninterested
= time_now();
1141 #ifdef TORRENT_VERBOSE_LOGGING
1142 (*m_logger
) << time_now_string() << " <== NOT_INTERESTED\n";
1144 m_peer_interested
= false;
1145 if (is_disconnecting()) return;
1147 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1150 t
->get_policy().not_interested(*this);
1153 // -----------------------------
1154 // ----------- HAVE ------------
1155 // -----------------------------
1157 void peer_connection::incoming_have(int index
)
1161 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1164 #ifndef TORRENT_DISABLE_EXTENSIONS
1165 for (extension_list_t::iterator i
= m_extensions
.begin()
1166 , end(m_extensions
.end()); i
!= end
; ++i
)
1168 if ((*i
)->on_have(index
)) return;
1172 if (is_disconnecting()) return;
1174 // if we haven't received a bitfield, it was
1175 // probably omitted, which is the same as 'have_none'
1176 if (!m_bitfield_received
) incoming_have_none();
1178 #ifdef TORRENT_VERBOSE_LOGGING
1179 (*m_logger
) << time_now_string()
1180 << " <== HAVE [ piece: " << index
<< "]\n";
1183 if (is_disconnecting()) return;
1185 if (!t
->valid_metadata() && index
> int(m_have_piece
.size()))
1189 // if we don't have metadata
1190 // and we might not have received a bitfield
1191 // extend the bitmask to fit the new
1193 m_have_piece
.resize(index
+ 1, false);
1197 // unless the index > 64k, in which case
1198 // we just ignore it
1203 // if we got an invalid message, abort
1204 if (index
>= int(m_have_piece
.size()) || index
< 0)
1206 disconnect("got 'have'-message with higher index than the number of pieces", 2);
1210 if (m_have_piece
[index
])
1212 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1213 (*m_logger
) << " got redundant HAVE message for index: " << index
<< "\n";
1218 m_have_piece
.set_bit(index
);
1221 // only update the piece_picker if
1222 // we have the metadata and if
1223 // we're not a seed (in which case
1224 // we won't have a piece picker)
1225 if (t
->valid_metadata())
1229 if (!t
->have_piece(index
)
1231 && !is_interesting()
1232 && t
->picker().piece_priority(index
) != 0)
1233 t
->get_policy().peer_is_interesting(*this);
1235 // this will disregard all have messages we get within
1236 // the first two seconds. Since some clients implements
1237 // lazy bitfields, these will not be reliable to use
1238 // for an estimated peer download rate.
1239 if (!peer_info_struct() || time_now() - peer_info_struct()->connected
> seconds(2))
1241 // update bytes downloaded since last timer
1242 m_remote_bytes_dled
+= t
->torrent_file().piece_size(index
);
1248 m_peer_info
->seed
= true;
1249 m_upload_only
= true;
1250 disconnect_if_redundant();
1251 if (is_disconnecting()) return;
1256 // -----------------------------
1257 // --------- BITFIELD ----------
1258 // -----------------------------
1260 void peer_connection::incoming_bitfield(bitfield
const& bits
)
1264 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1267 #ifndef TORRENT_DISABLE_EXTENSIONS
1268 for (extension_list_t::iterator i
= m_extensions
.begin()
1269 , end(m_extensions
.end()); i
!= end
; ++i
)
1271 if ((*i
)->on_bitfield(bits
)) return;
1275 if (is_disconnecting()) return;
1277 #ifdef TORRENT_VERBOSE_LOGGING
1278 (*m_logger
) << time_now_string() << " <== BITFIELD ";
1280 for (int i
= 0; i
< int(bits
.size()); ++i
)
1282 if (bits
[i
]) (*m_logger
) << "1";
1283 else (*m_logger
) << "0";
1285 (*m_logger
) << "\n";
1288 // if we don't have the metedata, we cannot
1289 // verify the bitfield size
1290 if (t
->valid_metadata()
1291 && (bits
.size() + 7) / 8 != (m_have_piece
.size() + 7) / 8)
1293 std::stringstream msg
;
1294 msg
<< "got bitfield with invalid size: " << ((bits
.size() + 7) / 8)
1295 << "bytes. expected: " << ((m_have_piece
.size() + 7) / 8)
1297 disconnect(msg
.str().c_str(), 2);
1301 m_bitfield_received
= true;
1303 // if we don't have metadata yet
1304 // just remember the bitmask
1305 // don't update the piecepicker
1306 // (since it doesn't exist yet)
1307 if (!t
->ready_for_connections())
1309 m_have_piece
= bits
;
1310 m_num_pieces
= bits
.count();
1311 if (m_peer_info
) m_peer_info
->seed
= (m_num_pieces
== int(bits
.size()));
1315 TORRENT_ASSERT(t
->valid_metadata());
1317 int num_pieces
= bits
.count();
1318 if (num_pieces
== int(m_have_piece
.size()))
1320 #ifdef TORRENT_VERBOSE_LOGGING
1321 (*m_logger
) << " *** THIS IS A SEED ***\n";
1323 // if this is a web seed. we don't have a peer_info struct
1324 if (m_peer_info
) m_peer_info
->seed
= true;
1325 m_upload_only
= true;
1327 m_have_piece
.set_all();
1328 m_num_pieces
= num_pieces
;
1330 if (!t
->is_finished())
1331 t
->get_policy().peer_is_interesting(*this);
1333 disconnect_if_redundant();
1338 // let the torrent know which pieces the
1340 // if we're a seed, we don't keep track of piece availability
1341 bool interesting
= false;
1346 for (int i
= 0; i
< (int)m_have_piece
.size(); ++i
)
1348 bool have
= bits
[i
];
1349 if (have
&& !m_have_piece
[i
])
1351 if (!t
->have_piece(i
) && t
->picker().piece_priority(i
) != 0)
1354 else if (!have
&& m_have_piece
[i
])
1356 // this should probably not be allowed
1362 m_have_piece
= bits
;
1363 m_num_pieces
= num_pieces
;
1365 if (interesting
) t
->get_policy().peer_is_interesting(*this);
1366 else if (upload_only()) disconnect("upload to upload connections");
1369 void peer_connection::disconnect_if_redundant()
1371 if (!m_ses
.settings().close_redundant_connections
) return;
1373 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1375 if (m_upload_only
&& t
->is_finished())
1376 disconnect("seed to seed");
1380 && m_bitfield_received
1381 && t
->are_files_checked())
1382 disconnect("uninteresting upload-only peer");
1385 // -----------------------------
1386 // ---------- REQUEST ----------
1387 // -----------------------------
1389 void peer_connection::incoming_request(peer_request
const& r
)
1393 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1396 // if we haven't received a bitfield, it was
1397 // probably omitted, which is the same as 'have_none'
1398 if (!m_bitfield_received
) incoming_have_none();
1399 if (is_disconnecting()) return;
1401 #ifndef TORRENT_DISABLE_EXTENSIONS
1402 for (extension_list_t::iterator i
= m_extensions
.begin()
1403 , end(m_extensions
.end()); i
!= end
; ++i
)
1405 if ((*i
)->on_request(r
)) return;
1408 if (is_disconnecting()) return;
1410 if (!t
->valid_metadata())
1412 // if we don't have valid metadata yet,
1413 // we shouldn't get a request
1414 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1415 (*m_logger
) << time_now_string()
1416 << " <== UNEXPECTED_REQUEST [ "
1417 "piece: " << r
.piece
<< " | "
1418 "s: " << r
.start
<< " | "
1419 "l: " << r
.length
<< " | "
1420 "i: " << m_peer_interested
<< " | "
1421 "t: " << t
->torrent_file().piece_size(r
.piece
) << " | "
1422 "n: " << t
->torrent_file().num_pieces() << " ]\n";
1424 (*m_logger
) << time_now_string()
1425 << " ==> REJECT_PIECE [ "
1426 "piece: " << r
.piece
<< " | "
1427 "s: " << r
.start
<< " | "
1428 "l: " << r
.length
<< " ]\n";
1430 write_reject_request(r
);
1434 if (int(m_requests
.size()) > m_ses
.settings().max_allowed_in_request_queue
)
1436 // don't allow clients to abuse our
1437 // memory consumption.
1438 // ignore requests if the client
1439 // is making too many of them.
1440 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1441 (*m_logger
) << time_now_string()
1442 << " <== TOO MANY REQUESTS [ "
1443 "piece: " << r
.piece
<< " | "
1444 "s: " << r
.start
<< " | "
1445 "l: " << r
.length
<< " | "
1446 "i: " << m_peer_interested
<< " | "
1447 "t: " << t
->torrent_file().piece_size(r
.piece
) << " | "
1448 "n: " << t
->torrent_file().num_pieces() << " ]\n";
1450 (*m_logger
) << time_now_string()
1451 << " ==> REJECT_PIECE [ "
1452 "piece: " << r
.piece
<< " | "
1453 "s: " << r
.start
<< " | "
1454 "l: " << r
.length
<< " ]\n";
1456 write_reject_request(r
);
1460 // make sure this request
1461 // is legal and that the peer
1464 && r
.piece
< t
->torrent_file().num_pieces()
1465 && t
->have_piece(r
.piece
)
1467 && r
.start
< t
->torrent_file().piece_size(r
.piece
)
1469 && r
.length
+ r
.start
<= t
->torrent_file().piece_size(r
.piece
)
1470 && m_peer_interested
1471 && r
.length
<= t
->block_size())
1473 #ifdef TORRENT_VERBOSE_LOGGING
1474 (*m_logger
) << time_now_string()
1475 << " <== REQUEST [ piece: " << r
.piece
<< " | s: " << r
.start
<< " | l: " << r
.length
<< " ]\n";
1477 // if we have choked the client
1478 // ignore the request
1479 if (m_choked
&& m_accept_fast
.find(r
.piece
) == m_accept_fast
.end())
1481 write_reject_request(r
);
1482 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1483 (*m_logger
) << time_now_string()
1484 << " *** REJECTING REQUEST [ peer choked and piece not in allowed fast set ]\n";
1485 (*m_logger
) << time_now_string()
1486 << " ==> REJECT_PIECE [ "
1487 "piece: " << r
.piece
<< " | "
1488 "s: " << r
.start
<< " | "
1489 "l: " << r
.length
<< " ]\n";
1494 m_requests
.push_back(r
);
1495 m_last_incoming_request
= time_now();
1501 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1502 (*m_logger
) << time_now_string()
1503 << " <== INVALID_REQUEST [ "
1504 "piece: " << r
.piece
<< " | "
1505 "s: " << r
.start
<< " | "
1506 "l: " << r
.length
<< " | "
1507 "i: " << m_peer_interested
<< " | "
1508 "t: " << t
->torrent_file().piece_size(r
.piece
) << " | "
1509 "n: " << t
->torrent_file().num_pieces() << " | "
1510 "h: " << t
->have_piece(r
.piece
) << " | "
1511 "block_limit: " << t
->block_size() << " ]\n";
1513 (*m_logger
) << time_now_string()
1514 << " ==> REJECT_PIECE [ "
1515 "piece: " << r
.piece
<< " | "
1516 "s: " << r
.start
<< " | "
1517 "l: " << r
.length
<< " ]\n";
1520 write_reject_request(r
);
1521 ++m_num_invalid_requests
;
1523 if (t
->alerts().should_post
<invalid_request_alert
>())
1525 t
->alerts().post_alert(invalid_request_alert(
1526 t
->get_handle(), m_remote
, m_peer_id
, r
));
1531 void peer_connection::incoming_piece_fragment()
1533 m_last_piece
= time_now();
1537 struct check_postcondition
1539 check_postcondition(boost::shared_ptr
<torrent
> const& t_
1540 , bool init_check
= true): t(t_
) { if (init_check
) check(); }
1542 ~check_postcondition() { check(); }
1548 const int blocks_per_piece
= static_cast<int>(
1549 t
->torrent_file().piece_length() / t
->block_size());
1551 std::vector
<piece_picker::downloading_piece
> const& dl_queue
1552 = t
->picker().get_download_queue();
1554 for (std::vector
<piece_picker::downloading_piece
>::const_iterator i
=
1555 dl_queue
.begin(); i
!= dl_queue
.end(); ++i
)
1557 TORRENT_ASSERT(i
->finished
<= blocks_per_piece
);
1562 shared_ptr
<torrent
> t
;
1567 // -----------------------------
1568 // ----------- PIECE -----------
1569 // -----------------------------
1571 void peer_connection::incoming_piece(peer_request
const& p
, char const* data
)
1573 char* buffer
= m_ses
.allocate_disk_buffer();
1576 disconnect("out of memory");
1579 disk_buffer_holder
holder(m_ses
, buffer
);
1580 std::memcpy(buffer
, data
, p
.length
);
1581 incoming_piece(p
, holder
);
1584 void peer_connection::incoming_piece(peer_request
const& p
, disk_buffer_holder
& data
)
1588 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1591 TORRENT_ASSERT(!m_disk_recv_buffer
);
1592 TORRENT_ASSERT(m_disk_recv_buffer_size
== 0);
1594 #ifdef TORRENT_CORRUPT_DATA
1595 // corrupt all pieces from certain peers
1596 if (m_remote
.address().is_v4()
1597 && (m_remote
.address().to_v4().to_ulong() & 0xf) == 0)
1599 data
.get()[0] = ~data
.get()[0];
1603 // if we haven't received a bitfield, it was
1604 // probably omitted, which is the same as 'have_none'
1605 if (!m_bitfield_received
) incoming_have_none();
1606 if (is_disconnecting()) return;
1608 #ifndef TORRENT_DISABLE_EXTENSIONS
1609 for (extension_list_t::iterator i
= m_extensions
.begin()
1610 , end(m_extensions
.end()); i
!= end
; ++i
)
1612 if ((*i
)->on_piece(p
, data
)) return;
1615 if (is_disconnecting()) return;
1618 check_postcondition
post_checker_(t
);
1619 #if !defined TORRENT_DISABLE_INVARIANT_CHECKS
1620 t
->check_invariant();
1624 #ifdef TORRENT_VERBOSE_LOGGING
1625 (*m_logger
) << time_now_string()
1626 << " <== PIECE [ piece: " << p
.piece
<< " | "
1627 "s: " << p
.start
<< " | "
1628 "l: " << p
.length
<< " | "
1629 "ds: " << statistics().download_rate() << " | "
1630 "qs: " << int(m_desired_queue_size
) << " ]\n";
1635 if (t
->alerts().should_post
<peer_error_alert
>())
1637 t
->alerts().post_alert(peer_error_alert(t
->get_handle(), m_remote
1638 , m_peer_id
, "peer sent 0 length piece"));
1643 if (!verify_piece(p
))
1645 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1646 (*m_logger
) << time_now_string()
1647 << " <== INVALID_PIECE [ piece: " << p
.piece
<< " | "
1648 "start: " << p
.start
<< " | "
1649 "length: " << p
.length
<< " ]\n";
1651 disconnect("got invalid piece packet", 2);
1655 // if we're already seeding, don't bother,
1659 t
->add_redundant_bytes(p
.length
);
1663 ptime now
= time_now();
1665 piece_picker
& picker
= t
->picker();
1666 piece_manager
& fs
= t
->filesystem();
1668 std::vector
<piece_block
> finished_blocks
;
1669 piece_block
block_finished(p
.piece
, p
.start
/ t
->block_size());
1670 TORRENT_ASSERT(p
.start
% t
->block_size() == 0);
1671 TORRENT_ASSERT(p
.length
== t
->block_size()
1672 || p
.length
== t
->torrent_file().total_size() % t
->block_size());
1674 std::deque
<pending_block
>::iterator b
1676 m_download_queue
.begin()
1677 , m_download_queue
.end()
1678 , has_block(block_finished
));
1680 if (b
== m_download_queue
.end())
1682 if (t
->alerts().should_post
<unwanted_block_alert
>())
1684 t
->alerts().post_alert(unwanted_block_alert(t
->get_handle(), m_remote
1685 , m_peer_id
, block_finished
.block_index
, block_finished
.piece_index
));
1687 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1688 (*m_logger
) << " *** The block we just got was not in the "
1689 "request queue ***\n";
1691 t
->add_redundant_bytes(p
.length
);
1692 request_a_block(*t
, *this);
1693 send_block_requests();
1697 pending_block pending_b
= *b
;
1700 int block_index
= b
- m_download_queue
.begin() - 1;
1701 for (int i
= 0; i
< block_index
; ++i
)
1703 pending_block
& qe
= m_download_queue
[i
];
1705 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1706 (*m_logger
) << time_now_string()
1707 << " *** SKIPPED_PIECE [ piece: " << qe
.block
.piece_index
<< " | "
1708 "b: " << qe
.block
.block_index
<< " ] ***\n";
1712 // if the number of times a block is skipped by out of order
1713 // blocks exceeds the size of the outstanding queue, assume that
1714 // the other end dropped the request.
1715 if (qe
.skipped
> m_desired_queue_size
)
1717 if (m_ses
.m_alerts
.should_post
<request_dropped_alert
>())
1718 m_ses
.m_alerts
.post_alert(request_dropped_alert(t
->get_handle()
1719 , remote(), pid(), qe
.block
.block_index
, qe
.block
.piece_index
));
1720 picker
.abort_download(qe
.block
);
1721 TORRENT_ASSERT(m_download_queue
.begin() + i
!= b
);
1722 m_download_queue
.erase(m_download_queue
.begin() + i
);
1727 TORRENT_ASSERT(int(m_download_queue
.size()) > block_index
+ 1);
1728 b
= m_download_queue
.begin() + (block_index
+ 1);
1729 TORRENT_ASSERT(b
->block
== pending_b
.block
);
1731 // if the block we got is already finished, then ignore it
1732 if (picker
.is_downloaded(block_finished
))
1734 t
->add_redundant_bytes(p
.length
);
1736 m_download_queue
.erase(b
);
1737 m_timeout_extend
= 0;
1739 if (!m_download_queue
.empty())
1742 request_a_block(*t
, *this);
1743 send_block_requests();
1747 if (total_seconds(now
- m_requested
)
1748 < m_ses
.settings().request_timeout
1752 if (m_ses
.m_alerts
.should_post
<peer_unsnubbed_alert
>())
1754 m_ses
.m_alerts
.post_alert(peer_unsnubbed_alert(t
->get_handle()
1755 , m_remote
, m_peer_id
));
1759 fs
.async_write(p
, data
, bind(&peer_connection::on_disk_write_complete
1760 , self(), _1
, _2
, p
, t
));
1761 m_outstanding_writing_bytes
+= p
.length
;
1762 TORRENT_ASSERT(m_channel_state
[download_channel
] == peer_info::bw_idle
);
1763 m_download_queue
.erase(b
);
1765 if (m_outstanding_writing_bytes
>= m_ses
.settings().max_outstanding_disk_bytes_per_connection
1766 && t
->alerts().should_post
<performance_alert
>())
1768 t
->alerts().post_alert(performance_alert(t
->get_handle()
1769 , performance_alert::outstanding_disk_buffer_limit_reached
));
1772 if (!m_download_queue
.empty())
1774 m_timeout_extend
= (std::max
)(m_timeout_extend
1775 - m_ses
.settings().request_timeout
, 0);
1776 m_requested
+= seconds(m_ses
.settings().request_timeout
);
1777 if (m_requested
> now
) m_requested
= now
;
1781 m_timeout_extend
= 0;
1784 // did we request this block from any other peers?
1785 bool multi
= picker
.num_peers(block_finished
) > 1;
1786 picker
.mark_as_writing(block_finished
, peer_info_struct());
1788 TORRENT_ASSERT(picker
.num_peers(block_finished
) == 0);
1789 // if we requested this block from other peers, cancel it now
1790 if (multi
) t
->cancel_block(block_finished
);
1792 TORRENT_ASSERT(picker
.num_peers(block_finished
) == 0);
1794 #if !defined NDEBUG && !defined TORRENT_DISABLE_INVARIANT_CHECKS
1795 t
->check_invariant();
1797 request_a_block(*t
, *this);
1798 send_block_requests();
1801 void peer_connection::on_disk_write_complete(int ret
, disk_io_job
const& j
1802 , peer_request p
, boost::shared_ptr
<torrent
> t
)
1804 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
1808 m_outstanding_writing_bytes
-= p
.length
;
1809 TORRENT_ASSERT(m_outstanding_writing_bytes
>= 0);
1811 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
1812 // (*m_ses.m_logger) << time_now_string() << " *** DISK_WRITE_COMPLETE [ p: "
1813 // << p.piece << " o: " << p.start << " ]\n";
1815 // in case the outstanding bytes just dropped down
1816 // to allow to receive more data
1819 piece_block
block_finished(p
.piece
, p
.start
/ t
->block_size());
1821 if (ret
== -1 || !t
)
1823 if (t
->has_picker()) t
->picker().write_failed(block_finished
);
1827 disconnect(j
.str
.c_str());
1831 if (t
->alerts().should_post
<file_error_alert
>())
1832 t
->alerts().post_alert(file_error_alert(j
.error_file
, t
->get_handle(), j
.str
));
1833 t
->set_error(j
.str
);
1838 if (t
->is_seed()) return;
1840 piece_picker
& picker
= t
->picker();
1842 TORRENT_ASSERT(p
.piece
== j
.piece
);
1843 TORRENT_ASSERT(p
.start
== j
.offset
);
1844 TORRENT_ASSERT(picker
.num_peers(block_finished
) == 0);
1845 picker
.mark_as_finished(block_finished
, peer_info_struct());
1846 if (t
->alerts().should_post
<block_finished_alert
>())
1848 t
->alerts().post_alert(block_finished_alert(t
->get_handle(),
1849 remote(), pid(), block_finished
.block_index
, block_finished
.piece_index
));
1852 // did we just finish the piece?
1853 if (picker
.is_piece_finished(p
.piece
))
1856 check_postcondition
post_checker2_(t
, false);
1858 t
->async_verify_piece(p
.piece
, bind(&torrent::piece_finished
, t
1862 if (!t
->is_seed() && !m_torrent
.expired())
1864 // this is a free function defined in policy.cpp
1865 request_a_block(*t
, *this);
1866 send_block_requests();
1871 // -----------------------------
1872 // ---------- CANCEL -----------
1873 // -----------------------------
1875 void peer_connection::incoming_cancel(peer_request
const& r
)
1879 #ifndef TORRENT_DISABLE_EXTENSIONS
1880 for (extension_list_t::iterator i
= m_extensions
.begin()
1881 , end(m_extensions
.end()); i
!= end
; ++i
)
1883 if ((*i
)->on_cancel(r
)) return;
1886 if (is_disconnecting()) return;
1888 #ifdef TORRENT_VERBOSE_LOGGING
1889 (*m_logger
) << time_now_string()
1890 << " <== CANCEL [ piece: " << r
.piece
<< " | s: " << r
.start
<< " | l: " << r
.length
<< " ]\n";
1893 std::deque
<peer_request
>::iterator i
1894 = std::find(m_requests
.begin(), m_requests
.end(), r
);
1896 if (i
!= m_requests
.end())
1898 m_requests
.erase(i
);
1899 #ifdef TORRENT_VERBOSE_LOGGING
1900 (*m_logger
) << time_now_string()
1901 << " ==> REJECT_PIECE [ "
1902 "piece: " << r
.piece
<< " | "
1903 "s: " << r
.start
<< " | "
1904 "l: " << r
.length
<< " ]\n";
1906 write_reject_request(r
);
1910 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1911 (*m_logger
) << time_now_string() << " *** GOT CANCEL NOT IN THE QUEUE\n";
1916 // -----------------------------
1917 // --------- DHT PORT ----------
1918 // -----------------------------
1920 void peer_connection::incoming_dht_port(int listen_port
)
1924 #ifdef TORRENT_VERBOSE_LOGGING
1925 (*m_logger
) << time_now_string()
1926 << " <== DHT_PORT [ p: " << listen_port
<< " ]\n";
1928 #ifndef TORRENT_DISABLE_DHT
1929 m_ses
.add_dht_node(udp::endpoint(
1930 m_remote
.address(), listen_port
));
1934 // -----------------------------
1935 // --------- HAVE ALL ----------
1936 // -----------------------------
1938 void peer_connection::incoming_have_all()
1942 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1945 #ifdef TORRENT_VERBOSE_LOGGING
1946 (*m_logger
) << time_now_string() << " <== HAVE_ALL\n";
1949 #ifndef TORRENT_DISABLE_EXTENSIONS
1950 for (extension_list_t::iterator i
= m_extensions
.begin()
1951 , end(m_extensions
.end()); i
!= end
; ++i
)
1953 if ((*i
)->on_have_all()) return;
1956 if (is_disconnecting()) return;
1960 if (m_peer_info
) m_peer_info
->seed
= true;
1961 m_upload_only
= true;
1962 m_bitfield_received
= true;
1964 #ifdef TORRENT_VERBOSE_LOGGING
1965 (*m_logger
) << " *** THIS IS A SEED ***\n";
1968 // if we don't have metadata yet
1969 // just remember the bitmask
1970 // don't update the piecepicker
1971 // (since it doesn't exist yet)
1972 if (!t
->ready_for_connections())
1974 // assume seeds are interesting when we
1975 // don't even have the metadata
1976 t
->get_policy().peer_is_interesting(*this);
1978 disconnect_if_redundant();
1979 // TODO: this might need something more
1980 // so that once we have the metadata
1981 // we can construct a full bitfield
1985 TORRENT_ASSERT(!m_have_piece
.empty());
1986 m_have_piece
.set_all();
1987 m_num_pieces
= m_have_piece
.size();
1991 // if we're finished, we're not interested
1992 if (t
->is_finished()) send_not_interested();
1993 else t
->get_policy().peer_is_interesting(*this);
1995 disconnect_if_redundant();
1998 // -----------------------------
1999 // --------- HAVE NONE ---------
2000 // -----------------------------
2002 void peer_connection::incoming_have_none()
2006 #ifdef TORRENT_VERBOSE_LOGGING
2007 (*m_logger
) << time_now_string() << " <== HAVE_NONE\n";
2010 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2013 #ifndef TORRENT_DISABLE_EXTENSIONS
2014 for (extension_list_t::iterator i
= m_extensions
.begin()
2015 , end(m_extensions
.end()); i
!= end
; ++i
)
2017 if ((*i
)->on_have_none()) return;
2020 if (is_disconnecting()) return;
2021 if (m_peer_info
) m_peer_info
->seed
= false;
2022 m_bitfield_received
= true;
2024 // we're never interested in a peer that doesn't have anything
2025 send_not_interested();
2027 TORRENT_ASSERT(!m_have_piece
.empty() || !t
->ready_for_connections());
2028 disconnect_if_redundant();
2031 // -----------------------------
2032 // ------- ALLOWED FAST --------
2033 // -----------------------------
2035 void peer_connection::incoming_allowed_fast(int index
)
2039 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2042 #ifdef TORRENT_VERBOSE_LOGGING
2043 (*m_logger
) << time_now_string() << " <== ALLOWED_FAST [ " << index
<< " ]\n";
2046 #ifndef TORRENT_DISABLE_EXTENSIONS
2047 for (extension_list_t::iterator i
= m_extensions
.begin()
2048 , end(m_extensions
.end()); i
!= end
; ++i
)
2050 if ((*i
)->on_allowed_fast(index
)) return;
2053 if (is_disconnecting()) return;
2055 if (t
->valid_metadata())
2057 if (index
< 0 || index
>= int(m_have_piece
.size()))
2059 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2060 (*m_logger
) << time_now_string() << " <== INVALID_ALLOWED_FAST [ " << index
<< " | s: "
2061 << int(m_have_piece
.size()) << " ]\n";
2066 // if we already have the piece, we can
2067 // ignore this message
2068 if (t
->have_piece(index
))
2072 m_allowed_fast
.push_back(index
);
2074 // if the peer has the piece and we want
2075 // to download it, request it
2076 if (int(m_have_piece
.size()) > index
2077 && m_have_piece
[index
]
2078 && t
->valid_metadata()
2080 && t
->picker().piece_priority(index
) > 0)
2082 t
->get_policy().peer_is_interesting(*this);
2086 std::vector
<int> const& peer_connection::allowed_fast()
2088 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2091 m_allowed_fast
.erase(std::remove_if(m_allowed_fast
.begin()
2092 , m_allowed_fast
.end(), bind(&torrent::have_piece
, t
, _1
))
2093 , m_allowed_fast
.end());
2095 // TODO: sort the allowed fast set in priority order
2096 return m_allowed_fast
;
2099 void peer_connection::add_request(piece_block
const& block
)
2103 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2106 TORRENT_ASSERT(t
->valid_metadata());
2107 TORRENT_ASSERT(block
.piece_index
>= 0);
2108 TORRENT_ASSERT(block
.piece_index
< t
->torrent_file().num_pieces());
2109 TORRENT_ASSERT(block
.block_index
>= 0);
2110 TORRENT_ASSERT(block
.block_index
< t
->torrent_file().piece_size(block
.piece_index
));
2111 TORRENT_ASSERT(!t
->picker().is_requested(block
) || (t
->picker().num_peers(block
) > 0));
2112 TORRENT_ASSERT(!t
->have_piece(block
.piece_index
));
2113 TORRENT_ASSERT(std::find_if(m_download_queue
.begin(), m_download_queue
.end()
2114 , has_block(block
)) == m_download_queue
.end());
2115 TORRENT_ASSERT(std::find(m_request_queue
.begin(), m_request_queue
.end()
2116 , block
) == m_request_queue
.end());
2118 piece_picker::piece_state_t state
;
2119 peer_speed_t speed
= peer_speed();
2120 char const* speedmsg
= 0;
2124 state
= piece_picker::fast
;
2126 else if (speed
== medium
)
2128 speedmsg
= "medium";
2129 state
= piece_picker::medium
;
2134 state
= piece_picker::slow
;
2137 if (!t
->picker().mark_as_downloading(block
, peer_info_struct(), state
))
2140 if (t
->alerts().should_post
<block_downloading_alert
>())
2142 t
->alerts().post_alert(block_downloading_alert(t
->get_handle(),
2143 remote(), pid(), speedmsg
, block
.block_index
, block
.piece_index
));
2146 m_request_queue
.push_back(block
);
2149 void peer_connection::cancel_request(piece_block
const& block
)
2153 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2154 // this peer might be disconnecting
2157 TORRENT_ASSERT(t
->valid_metadata());
2159 TORRENT_ASSERT(block
.piece_index
>= 0);
2160 TORRENT_ASSERT(block
.piece_index
< t
->torrent_file().num_pieces());
2161 TORRENT_ASSERT(block
.block_index
>= 0);
2162 TORRENT_ASSERT(block
.block_index
< t
->torrent_file().piece_size(block
.piece_index
));
2164 // if all the peers that requested this block has been
2165 // cancelled, then just ignore the cancel.
2166 if (!t
->picker().is_requested(block
)) return;
2168 std::deque
<pending_block
>::iterator it
2169 = std::find_if(m_download_queue
.begin(), m_download_queue
.end(), has_block(block
));
2170 if (it
== m_download_queue
.end())
2172 std::deque
<piece_block
>::iterator rit
= std::find(m_request_queue
.begin()
2173 , m_request_queue
.end(), block
);
2175 // when a multi block is received, it is cancelled
2176 // from all peers, so if this one hasn't requested
2177 // the block, just ignore to cancel it.
2178 if (rit
== m_request_queue
.end()) return;
2180 t
->picker().abort_download(block
);
2181 m_request_queue
.erase(rit
);
2182 // since we found it in the request queue, it means it hasn't been
2183 // sent yet, so we don't have to send a cancel.
2187 int block_offset
= block
.block_index
* t
->block_size();
2189 = (std::min
)(t
->torrent_file().piece_size(block
.piece_index
)-block_offset
,
2191 TORRENT_ASSERT(block_size
> 0);
2192 TORRENT_ASSERT(block_size
<= t
->block_size());
2195 r
.piece
= block
.piece_index
;
2196 r
.start
= block_offset
;
2197 r
.length
= block_size
;
2199 #ifdef TORRENT_VERBOSE_LOGGING
2200 (*m_logger
) << time_now_string()
2201 << " ==> CANCEL [ piece: " << block
.piece_index
<< " | s: "
2202 << block_offset
<< " | l: " << block_size
<< " | " << block
.block_index
<< " ]\n";
2207 void peer_connection::send_choke()
2211 TORRENT_ASSERT(!m_peer_info
|| !m_peer_info
->optimistically_unchoked
);
2213 if (m_choked
) return;
2217 #ifdef TORRENT_VERBOSE_LOGGING
2218 (*m_logger
) << time_now_string() << " ==> CHOKE\n";
2221 m_last_choke
= time_now();
2223 m_num_invalid_requests
= 0;
2225 // reject the requests we have in the queue
2226 // except the allowed fast pieces
2227 for (std::deque
<peer_request
>::iterator i
= m_requests
.begin();
2228 i
!= m_requests
.end();)
2230 if (m_accept_fast
.count(i
->piece
))
2236 peer_request
const& r
= *i
;
2237 write_reject_request(r
);
2239 #ifdef TORRENT_VERBOSE_LOGGING
2240 (*m_logger
) << time_now_string()
2241 << " ==> REJECT_PIECE [ "
2242 "piece: " << r
.piece
<< " | "
2243 "s: " << r
.start
<< " | "
2244 "l: " << r
.length
<< " ]\n";
2246 i
= m_requests
.erase(i
);
2250 bool peer_connection::send_unchoke()
2254 if (!m_choked
) return false;
2255 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2256 if (!t
->ready_for_connections()) return false;
2257 m_last_unchoke
= time_now();
2261 #ifdef TORRENT_VERBOSE_LOGGING
2262 (*m_logger
) << time_now_string() << " ==> UNCHOKE\n";
2267 void peer_connection::send_interested()
2269 if (m_interesting
) return;
2270 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2271 if (!t
->ready_for_connections()) return;
2272 m_interesting
= true;
2275 #ifdef TORRENT_VERBOSE_LOGGING
2276 (*m_logger
) << time_now_string() << " ==> INTERESTED\n";
2280 void peer_connection::send_not_interested()
2282 if (!m_interesting
) return;
2283 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2284 if (!t
->ready_for_connections()) return;
2285 m_interesting
= false;
2286 write_not_interested();
2288 m_became_uninteresting
= time_now();
2290 #ifdef TORRENT_VERBOSE_LOGGING
2291 (*m_logger
) << time_now_string() << " ==> NOT_INTERESTED\n";
2293 disconnect_if_redundant();
2296 void peer_connection::send_block_requests()
2300 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2303 if ((int)m_download_queue
.size() >= m_desired_queue_size
) return;
2305 bool empty_download_queue
= m_download_queue
.empty();
2307 while (!m_request_queue
.empty()
2308 && (int)m_download_queue
.size() < m_desired_queue_size
)
2310 piece_block block
= m_request_queue
.front();
2312 int block_offset
= block
.block_index
* t
->block_size();
2313 int block_size
= (std::min
)(t
->torrent_file().piece_size(
2314 block
.piece_index
) - block_offset
, t
->block_size());
2315 TORRENT_ASSERT(block_size
> 0);
2316 TORRENT_ASSERT(block_size
<= t
->block_size());
2319 r
.piece
= block
.piece_index
;
2320 r
.start
= block_offset
;
2321 r
.length
= block_size
;
2323 m_request_queue
.pop_front();
2324 if (t
->is_seed()) continue;
2325 // this can happen if a block times out, is re-requested and
2326 // then arrives "unexpectedly"
2327 if (t
->picker().is_finished(block
) || t
->picker().is_downloaded(block
))
2330 m_download_queue
.push_back(block
);
2332 #ifdef TORRENT_VERBOSE_LOGGING
2333 (*m_logger) << time_now_string()
2334 << " *** REQUEST-QUEUE** [ "
2335 "piece: " << block.piece_index << " | "
2336 "block: " << block.block_index << " ]\n";
2339 // if we are requesting large blocks, merge the smaller
2340 // blocks that are in the same piece into larger requests
2341 if (m_request_large_blocks
)
2343 int blocks_per_piece
= t
->torrent_file().piece_length() / t
->block_size();
2345 while (!m_request_queue
.empty())
2347 // check to see if this block is connected to the previous one
2348 // if it is, merge them, otherwise, break this merge loop
2349 piece_block
const& front
= m_request_queue
.front();
2350 if (front
.piece_index
* blocks_per_piece
+ front
.block_index
2351 != block
.piece_index
* blocks_per_piece
+ block
.block_index
+ 1)
2353 block
= m_request_queue
.front();
2354 m_request_queue
.pop_front();
2355 m_download_queue
.push_back(block
);
2357 #ifdef TORRENT_VERBOSE_LOGGING
2358 (*m_logger
) << time_now_string()
2359 << " *** MERGING REQUEST ** [ "
2360 "piece: " << block
.piece_index
<< " | "
2361 "block: " << block
.block_index
<< " ]\n";
2364 block_offset
= block
.block_index
* t
->block_size();
2365 block_size
= (std::min
)(t
->torrent_file().piece_size(
2366 block
.piece_index
) - block_offset
, t
->block_size());
2367 TORRENT_ASSERT(block_size
> 0);
2368 TORRENT_ASSERT(block_size
<= t
->block_size());
2370 r
.length
+= block_size
;
2374 TORRENT_ASSERT(verify_piece(r
));
2376 #ifndef TORRENT_DISABLE_EXTENSIONS
2377 bool handled
= false;
2378 for (extension_list_t::iterator i
= m_extensions
.begin()
2379 , end(m_extensions
.end()); i
!= end
; ++i
)
2381 if (handled
= (*i
)->write_request(r
)) break;
2383 if (is_disconnecting()) return;
2387 m_last_request
= time_now();
2391 m_last_request
= time_now();
2394 #ifdef TORRENT_VERBOSE_LOGGING
2395 (*m_logger
) << time_now_string()
2396 << " ==> REQUEST [ "
2397 "piece: " << r
.piece
<< " | "
2398 "s: " << r
.start
<< " | "
2399 "l: " << r
.length
<< " | "
2400 "ds: " << statistics().download_rate() << " B/s | "
2401 "qs: " << int(m_desired_queue_size
) << " "
2402 "blk: " << (m_request_large_blocks
?"large":"single") << " ]\n";
2405 m_last_piece
= time_now();
2407 if (!m_download_queue
.empty()
2408 && empty_download_queue
)
2410 // This means we just added a request to this connection
2411 m_requested
= time_now();
2415 void peer_connection::timed_out()
2417 TORRENT_ASSERT(m_connecting
);
2418 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
2419 (*m_ses
.m_logger
) << time_now_string() << " CONNECTION TIMED OUT: " << m_remote
.address().to_string()
2422 disconnect("timed out: connect", 1);
2425 // the error argument defaults to 0, which means deliberate disconnect
2426 // 1 means unexpected disconnect/error
2427 // 2 protocol error (client sent something invalid)
2428 void peer_connection::disconnect(char const* message
, int error
)
2430 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
2433 m_disconnect_started
= true;
2436 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2440 (*m_logger
) << "*** CONNECTION CLOSED " << message
<< "\n";
2443 (*m_logger
) << "*** CONNECTION FAILED " << message
<< "\n";
2446 (*m_logger
) << "*** PEER ERROR " << message
<< "\n";
2450 // we cannot do this in a constructor
2451 TORRENT_ASSERT(m_in_constructor
== false);
2452 if (error
> 0) m_failed
= true;
2453 if (m_disconnecting
) return;
2454 boost::intrusive_ptr
<peer_connection
> me(this);
2458 if (m_connecting
&& m_connection_ticket
>= 0)
2460 m_ses
.m_half_open
.done(m_connection_ticket
);
2461 m_connection_ticket
= -1;
2464 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2465 torrent_handle handle
;
2466 if (t
) handle
= t
->get_handle();
2470 if (error
> 1 && m_ses
.m_alerts
.should_post
<peer_error_alert
>())
2472 m_ses
.m_alerts
.post_alert(
2473 peer_error_alert(handle
, remote(), pid(), message
));
2475 else if (error
<= 1 && m_ses
.m_alerts
.should_post
<peer_disconnected_alert
>())
2477 m_ses
.m_alerts
.post_alert(
2478 peer_disconnected_alert(handle
, remote(), pid(), message
));
2484 // make sure we keep all the stats!
2486 t
->add_stats(statistics());
2488 if (t
->has_picker())
2490 piece_picker
& picker
= t
->picker();
2492 while (!m_download_queue
.empty())
2494 picker
.abort_download(m_download_queue
.back().block
);
2495 m_download_queue
.pop_back();
2497 while (!m_request_queue
.empty())
2499 picker
.abort_download(m_request_queue
.back());
2500 m_request_queue
.pop_back();
2504 t
->remove_peer(this);
2509 // since this connection doesn't have a torrent reference
2510 // no torrent should have a reference to this connection either
2511 for (aux::session_impl::torrent_map::const_iterator i
= m_ses
.m_torrents
.begin()
2512 , end(m_ses
.m_torrents
.end()); i
!= end
; ++i
)
2513 TORRENT_ASSERT(!i
->second
->has_peer(this));
2516 m_disconnecting
= true;
2518 m_socket
->close(ec
);
2519 m_ses
.close_connection(this, message
);
2522 void peer_connection::set_upload_limit(int limit
)
2524 TORRENT_ASSERT(limit
>= -1);
2525 if (limit
== -1) limit
= (std::numeric_limits
<int>::max
)();
2526 if (limit
< 10) limit
= 10;
2527 m_upload_limit
= limit
;
2528 m_bandwidth_limit
[upload_channel
].throttle(m_upload_limit
);
2531 void peer_connection::set_download_limit(int limit
)
2533 TORRENT_ASSERT(limit
>= -1);
2534 if (limit
== -1) limit
= (std::numeric_limits
<int>::max
)();
2535 if (limit
< 10) limit
= 10;
2536 m_download_limit
= limit
;
2537 m_bandwidth_limit
[download_channel
].throttle(m_download_limit
);
2540 size_type
peer_connection::share_diff() const
2544 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2547 float ratio
= t
->ratio();
2549 // if we have an infinite ratio, just say we have downloaded
2550 // much more than we have uploaded. And we'll keep uploading.
2552 return (std::numeric_limits
<size_type
>::max
)();
2554 return m_free_upload
2555 + static_cast<size_type
>(m_statistics
.total_payload_download() * ratio
)
2556 - m_statistics
.total_payload_upload();
2559 // defined in upnp.cpp
2560 bool is_local(address
const& a
);
2562 bool peer_connection::on_local_network() const
2564 if (libtorrent::is_local(m_remote
.address())
2565 || is_loopback(m_remote
.address())) return true;
2569 void peer_connection::get_peer_info(peer_info
& p
) const
2571 TORRENT_ASSERT(!associated_torrent().expired());
2573 ptime now
= time_now();
2575 p
.download_rate_peak
= m_download_rate_peak
;
2576 p
.upload_rate_peak
= m_upload_rate_peak
;
2578 p
.down_speed
= statistics().download_rate();
2579 p
.up_speed
= statistics().upload_rate();
2580 p
.payload_down_speed
= statistics().download_payload_rate();
2581 p
.payload_up_speed
= statistics().upload_payload_rate();
2584 p
.pending_disk_bytes
= m_outstanding_writing_bytes
;
2585 p
.send_quota
= m_bandwidth_limit
[upload_channel
].quota_left();
2586 p
.receive_quota
= m_bandwidth_limit
[download_channel
].quota_left();
2587 if (m_download_queue
.empty()) p
.request_timeout
= -1;
2588 else p
.request_timeout
= total_seconds(m_requested
- now
) + m_ses
.settings().request_timeout
2590 #ifndef TORRENT_DISABLE_GEO_IP
2591 p
.inet_as_name
= m_inet_as_name
;
2594 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
2595 p
.country
[0] = m_country
[0];
2596 p
.country
[1] = m_country
[1];
2599 p
.total_download
= statistics().total_payload_download();
2600 p
.total_upload
= statistics().total_payload_upload();
2602 if (m_bandwidth_limit
[upload_channel
].throttle() == bandwidth_limit::inf
)
2603 p
.upload_limit
= -1;
2605 p
.upload_limit
= m_bandwidth_limit
[upload_channel
].throttle();
2607 if (m_bandwidth_limit
[download_channel
].throttle() == bandwidth_limit::inf
)
2608 p
.download_limit
= -1;
2610 p
.download_limit
= m_bandwidth_limit
[download_channel
].throttle();
2612 p
.load_balancing
= total_free_upload();
2614 p
.download_queue_length
= int(download_queue().size() + m_request_queue
.size());
2615 p
.requests_in_buffer
= int(m_requests_in_buffer
.size());
2616 p
.target_dl_queue_length
= int(desired_queue_size());
2617 p
.upload_queue_length
= int(upload_queue().size());
2619 if (boost::optional
<piece_block_progress
> ret
= downloading_piece_progress())
2621 p
.downloading_piece_index
= ret
->piece_index
;
2622 p
.downloading_block_index
= ret
->block_index
;
2623 p
.downloading_progress
= ret
->bytes_downloaded
;
2624 p
.downloading_total
= ret
->full_block_bytes
;
2628 p
.downloading_piece_index
= -1;
2629 p
.downloading_block_index
= -1;
2630 p
.downloading_progress
= 0;
2631 p
.downloading_total
= 0;
2634 p
.pieces
= get_bitfield();
2635 p
.last_request
= now
- m_last_request
;
2636 p
.last_active
= now
- (std::max
)(m_last_sent
, m_last_receive
);
2638 // this will set the flags so that we can update them later
2640 get_specific_peer_info(p
);
2642 p
.flags
|= is_seed() ? peer_info::seed
: 0;
2643 p
.flags
|= m_snubbed
? peer_info::snubbed
: 0;
2644 p
.flags
|= m_upload_only
? peer_info::upload_only
: 0;
2645 if (peer_info_struct())
2647 policy::peer
* pi
= peer_info_struct();
2648 p
.source
= pi
->source
;
2649 p
.failcount
= pi
->failcount
;
2650 p
.num_hashfails
= pi
->hashfails
;
2651 p
.flags
|= pi
->on_parole
? peer_info::on_parole
: 0;
2652 p
.flags
|= pi
->optimistically_unchoked
? peer_info::optimistic_unchoke
: 0;
2653 #ifndef TORRENT_DISABLE_GEO_IP
2654 p
.inet_as
= pi
->inet_as
->first
;
2661 p
.num_hashfails
= 0;
2662 p
.remote_dl_rate
= 0;
2663 #ifndef TORRENT_DISABLE_GEO_IP
2668 p
.remote_dl_rate
= m_remote_dl_rate
;
2669 p
.send_buffer_size
= m_send_buffer
.capacity();
2670 p
.used_send_buffer
= m_send_buffer
.size();
2671 p
.receive_buffer_size
= m_recv_buffer
.capacity() + m_disk_recv_buffer_size
;
2672 p
.used_receive_buffer
= m_recv_pos
;
2673 p
.write_state
= m_channel_state
[upload_channel
];
2674 p
.read_state
= m_channel_state
[download_channel
];
2676 p
.progress
= (float)p
.pieces
.count() / (float)p
.pieces
.size();
2679 // allocates a disk buffer of size 'disk_buffer_size' and replaces the
2680 // end of the current receive buffer with it. i.e. the receive pos
2681 // must be <= packet_size - disk_buffer_size
2682 // the disk buffer can be accessed through release_disk_receive_buffer()
2683 // when it is queried, the responsibility to free it is transferred
2685 bool peer_connection::allocate_disk_receive_buffer(int disk_buffer_size
)
2689 TORRENT_ASSERT(m_packet_size
> 0);
2690 TORRENT_ASSERT(m_recv_pos
<= m_packet_size
- disk_buffer_size
);
2691 TORRENT_ASSERT(!m_disk_recv_buffer
);
2692 TORRENT_ASSERT(disk_buffer_size
<= 16 * 1024);
2694 if (disk_buffer_size
> 16 * 1024)
2696 disconnect("invalid piece size", 2);
2700 m_disk_recv_buffer
.reset(m_ses
.allocate_disk_buffer());
2701 if (!m_disk_recv_buffer
)
2703 disconnect("out of memory");
2706 m_disk_recv_buffer_size
= disk_buffer_size
;
2710 char* peer_connection::release_disk_receive_buffer()
2712 m_disk_recv_buffer_size
= 0;
2713 return m_disk_recv_buffer
.release();
2716 void peer_connection::cut_receive_buffer(int size
, int packet_size
)
2720 TORRENT_ASSERT(packet_size
> 0);
2721 TORRENT_ASSERT(int(m_recv_buffer
.size()) >= size
);
2722 TORRENT_ASSERT(int(m_recv_buffer
.size()) >= m_recv_pos
);
2723 TORRENT_ASSERT(m_recv_pos
>= size
);
2726 std::memmove(&m_recv_buffer
[0], &m_recv_buffer
[0] + size
, m_recv_pos
- size
);
2731 std::fill(m_recv_buffer
.begin() + m_recv_pos
, m_recv_buffer
.end(), 0);
2734 m_packet_size
= packet_size
;
2737 void peer_connection::calc_ip_overhead()
2739 m_statistics
.calc_ip_overhead();
2742 void peer_connection::second_tick(float tick_interval
)
2744 ptime
now(time_now());
2745 boost::intrusive_ptr
<peer_connection
> me(self());
2747 // the invariant check must be run before me is destructed
2748 // in case the peer got disconnected
2751 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2752 if (!t
|| m_disconnecting
)
2754 m_ses
.m_half_open
.done(m_connection_ticket
);
2755 m_connecting
= false;
2756 disconnect("torrent aborted");
2762 #ifndef TORRENT_DISABLE_EXTENSIONS
2763 for (extension_list_t::iterator i
= m_extensions
.begin()
2764 , end(m_extensions
.end()); i
!= end
; ++i
)
2768 if (is_disconnecting()) return;
2771 // if the peer hasn't said a thing for a certain
2772 // time, it is considered to have timed out
2774 d
= now
- m_last_receive
;
2775 if (d
> seconds(m_timeout
) && !m_connecting
)
2777 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2778 (*m_logger
) << time_now_string() << " *** LAST ACTIVITY [ "
2779 << total_seconds(d
) << " seconds ago ] ***\n";
2781 disconnect("timed out: inactivity");
2785 // do not stall waiting for a handshake
2788 && d
> seconds(m_ses
.settings().handshake_timeout
))
2790 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2791 (*m_logger
) << time_now_string() << " *** NO HANDSHAKE [ waited "
2792 << total_seconds(d
) << " seconds ] ***\n";
2794 disconnect("timed out: no handshake");
2798 // disconnect peers that we unchoked, but
2799 // they didn't send a request within 20 seconds.
2800 // but only if we're a seed
2801 d
= now
- (std::max
)(m_last_unchoke
, m_last_incoming_request
);
2803 && m_requests
.empty()
2805 && m_peer_interested
2806 && t
&& t
->is_finished()
2809 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2810 (*m_logger
) << time_now_string() << " *** NO REQUEST [ t: "
2811 << total_seconds(d
) << " ] ***\n";
2813 disconnect("timed out: no request when unchoked");
2817 // if the peer hasn't become interested and we haven't
2818 // become interested in the peer for 10 minutes, it
2819 // has also timed out.
2822 d1
= now
- m_became_uninterested
;
2823 d2
= now
- m_became_uninteresting
;
2824 time_duration time_limit
= seconds(
2825 m_ses
.settings().inactivity_timeout
);
2827 // don't bother disconnect peers we haven't been interested
2828 // in (and that hasn't been interested in us) for a while
2829 // unless we have used up all our connection slots
2831 && !m_peer_interested
2834 && (m_ses
.num_connections() >= m_ses
.max_connections()
2835 || (t
&& t
->num_peers() >= t
->max_connections())))
2837 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2838 (*m_logger
) << time_now_string() << " *** MUTUAL NO INTEREST [ "
2839 "t1: " << total_seconds(d1
) << " | "
2840 "t2: " << total_seconds(d2
) << " ] ***\n";
2842 disconnect("timed out: no interest");
2846 if (!m_download_queue
.empty()
2847 && now
> m_requested
+ seconds(m_ses
.settings().request_timeout
2848 + m_timeout_extend
))
2853 // if we haven't sent something in too long, send a keep-alive
2856 m_ignore_bandwidth_limits
= m_ses
.settings().ignore_limits_on_local_network
2857 && on_local_network();
2859 m_statistics
.second_tick(tick_interval
);
2861 if (m_statistics
.upload_payload_rate() > m_upload_rate_peak
)
2863 m_upload_rate_peak
= m_statistics
.upload_payload_rate();
2865 if (m_statistics
.download_payload_rate() > m_download_rate_peak
)
2867 m_download_rate_peak
= m_statistics
.download_payload_rate();
2868 #ifndef TORRENT_DISABLE_GEO_IP
2869 if (peer_info_struct())
2871 std::pair
<const int, int>* as_stats
= peer_info_struct()->inet_as
;
2872 if (as_stats
&& as_stats
->second
< m_download_rate_peak
)
2873 as_stats
->second
= m_download_rate_peak
;
2877 if (is_disconnecting()) return;
2879 if (!t
->ready_for_connections()) return;
2881 // calculate the desired download queue size
2882 const float queue_time
= m_ses
.settings().request_queue_time
;
2883 // (if the latency is more than this, the download will stall)
2884 // so, the queue size is queue_time * down_rate / 16 kiB
2885 // (16 kB is the size of each request)
2886 // the minimum number of requests is 2 and the maximum is 48
2887 // the block size doesn't have to be 16. So we first query the
2889 const int block_size
= m_request_large_blocks
2890 ? t
->torrent_file().piece_length() : t
->block_size();
2891 TORRENT_ASSERT(block_size
> 0);
2895 m_desired_queue_size
= 1;
2899 m_desired_queue_size
= static_cast<int>(queue_time
2900 * statistics().download_rate() / block_size
);
2901 if (m_desired_queue_size
> m_max_out_request_queue
)
2902 m_desired_queue_size
= m_max_out_request_queue
;
2903 if (m_desired_queue_size
< min_request_queue
)
2904 m_desired_queue_size
= min_request_queue
;
2906 if (m_desired_queue_size
== m_max_out_request_queue
2907 && t
->alerts().should_post
<performance_alert
>())
2909 t
->alerts().post_alert(performance_alert(t
->get_handle()
2910 , performance_alert::outstanding_request_limit_reached
));
2914 if (!m_download_queue
.empty()
2915 && now
- m_last_piece
> seconds(m_ses
.settings().piece_timeout
2916 + m_timeout_extend
))
2918 // this peer isn't sending the pieces we've
2919 // requested (this has been observed by BitComet)
2920 // in this case we'll clear our download queue and
2921 // re-request the blocks.
2922 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2923 (*m_logger
) << time_now_string()
2924 << " *** PIECE_REQUESTS TIMED OUT [ " << (int)m_download_queue
.size()
2925 << " " << total_seconds(now
- m_last_piece
) << "] ***\n";
2931 // If the client sends more data
2932 // we send it data faster, otherwise, slower.
2933 // It will also depend on how much data the
2934 // client has sent us. This is the mean to
2935 // maintain the share ratio given by m_ratio
2938 if (t
->is_finished() || is_choked() || t
->ratio() == 0.0f
)
2940 // if we have downloaded more than one piece more
2941 // than we have uploaded OR if we are a seed
2942 // have an unlimited upload rate
2943 m_bandwidth_limit
[upload_channel
].throttle(m_upload_limit
);
2947 size_type bias
= 0x10000 + 2 * t
->block_size() + m_free_upload
;
2949 double break_even_time
= 15; // seconds.
2950 size_type have_uploaded
= m_statistics
.total_payload_upload();
2951 size_type have_downloaded
= m_statistics
.total_payload_download();
2952 double download_speed
= m_statistics
.download_rate();
2954 size_type soon_downloaded
=
2955 have_downloaded
+ (size_type
)(download_speed
* break_even_time
*1.5);
2957 if (t
->ratio() != 1.f
)
2958 soon_downloaded
= (size_type
)(soon_downloaded
*(double)t
->ratio());
2960 double upload_speed_limit
= (std::min
)((soon_downloaded
- have_uploaded
2961 + bias
) / break_even_time
, double(m_upload_limit
));
2963 upload_speed_limit
= (std::min
)(upload_speed_limit
,
2964 (double)(std::numeric_limits
<int>::max
)());
2966 m_bandwidth_limit
[upload_channel
].throttle(
2967 (std::min
)((std::max
)((int)upload_speed_limit
, 20)
2971 // update once every minute
2972 if (now
- m_remote_dl_update
>= seconds(60))
2974 float factor
= 0.6666666666667f
;
2976 if (m_remote_dl_rate
== 0) factor
= 0.0f
;
2978 m_remote_dl_rate
= int((m_remote_dl_rate
* factor
) +
2979 ((m_remote_bytes_dled
* (1.0f
-factor
)) / 60.f
));
2981 m_remote_bytes_dled
= 0;
2982 m_remote_dl_update
= now
;
2988 void peer_connection::snub_peer()
2992 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2998 if (m_ses
.m_alerts
.should_post
<peer_snubbed_alert
>())
3000 m_ses
.m_alerts
.post_alert(peer_snubbed_alert(t
->get_handle()
3001 , m_remote
, m_peer_id
));
3004 m_desired_queue_size
= 1;
3008 m_timeout_extend
+= m_ses
.settings().request_timeout
;
3011 if (!t
->has_picker()) return;
3012 piece_picker
& picker
= t
->picker();
3014 piece_block
r(-1, -1);
3015 // time out the last request in the queue
3016 if (!m_request_queue
.empty())
3018 r
= m_request_queue
.back();
3019 m_request_queue
.pop_back();
3023 TORRENT_ASSERT(!m_download_queue
.empty());
3024 r
= m_download_queue
.back().block
;
3026 // only time out a request if it blocks the piece
3027 // from being completed (i.e. no free blocks to
3029 piece_picker::downloading_piece p
;
3030 picker
.piece_info(r
.piece_index
, p
);
3031 int free_blocks
= picker
.blocks_in_piece(r
.piece_index
)
3032 - p
.finished
- p
.writing
- p
.requested
;
3033 if (free_blocks
> 0)
3035 m_timeout_extend
+= m_ses
.settings().request_timeout
;
3039 if (m_ses
.m_alerts
.should_post
<block_timeout_alert
>())
3041 m_ses
.m_alerts
.post_alert(block_timeout_alert(t
->get_handle()
3042 , remote(), pid(), r
.block_index
, r
.piece_index
));
3044 m_download_queue
.pop_back();
3046 if (!m_download_queue
.empty() || !m_request_queue
.empty())
3047 m_timeout_extend
+= m_ses
.settings().request_timeout
;
3049 m_desired_queue_size
= 2;
3050 request_a_block(*t
, *this);
3051 m_desired_queue_size
= 1;
3053 // abort the block after the new one has
3054 // been requested in order to prevent it from
3055 // picking the same block again, stalling the
3056 // same piece indefinitely.
3057 if (r
!= piece_block(-1, -1))
3058 picker
.abort_download(r
);
3060 send_block_requests();
3063 void peer_connection::fill_send_buffer()
3067 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
3070 // only add new piece-chunks if the send buffer is small enough
3071 // otherwise there will be no end to how large it will be!
3073 int buffer_size_watermark
= int(m_statistics
.upload_rate()) / 2;
3074 if (buffer_size_watermark
< 512) buffer_size_watermark
= 512;
3075 else if (buffer_size_watermark
> m_ses
.settings().send_buffer_watermark
)
3076 buffer_size_watermark
= m_ses
.settings().send_buffer_watermark
;
3078 while (!m_requests
.empty()
3079 && (send_buffer_size() + m_reading_bytes
< buffer_size_watermark
))
3081 TORRENT_ASSERT(t
->ready_for_connections());
3082 peer_request
& r
= m_requests
.front();
3084 TORRENT_ASSERT(r
.piece
>= 0);
3085 TORRENT_ASSERT(r
.piece
< (int)m_have_piece
.size());
3086 TORRENT_ASSERT(t
->have_piece(r
.piece
));
3087 TORRENT_ASSERT(r
.start
+ r
.length
<= t
->torrent_file().piece_size(r
.piece
));
3088 TORRENT_ASSERT(r
.length
> 0 && r
.start
>= 0);
3090 t
->filesystem().async_read(r
, bind(&peer_connection::on_disk_read_complete
3091 , self(), _1
, _2
, r
));
3092 m_reading_bytes
+= r
.length
;
3094 m_requests
.erase(m_requests
.begin());
3098 void peer_connection::on_disk_read_complete(int ret
, disk_io_job
const& j
, peer_request r
)
3100 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3102 m_reading_bytes
-= r
.length
;
3104 disk_buffer_holder
buffer(m_ses
, j
.buffer
);
3106 if (ret
!= r
.length
|| m_torrent
.expired())
3108 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
3111 disconnect(j
.str
.c_str());
3115 if (t
->alerts().should_post
<file_error_alert
>())
3116 t
->alerts().post_alert(file_error_alert(j
.error_file
, t
->get_handle(), j
.str
));
3117 t
->set_error(j
.str
);
3122 #ifdef TORRENT_VERBOSE_LOGGING
3123 (*m_logger
) << time_now_string()
3124 << " ==> PIECE [ piece: " << r
.piece
<< " | s: " << r
.start
3125 << " | l: " << r
.length
<< " ]\n";
3128 write_piece(r
, buffer
);
3132 void peer_connection::assign_bandwidth(int channel
, int amount
)
3134 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3136 #ifdef TORRENT_VERBOSE_LOGGING
3137 (*m_logger
) << "bandwidth [ " << channel
<< " ] + " << amount
<< "\n";
3140 m_bandwidth_limit
[channel
].assign(amount
);
3141 TORRENT_ASSERT(m_channel_state
[channel
] == peer_info::bw_global
);
3142 m_channel_state
[channel
] = peer_info::bw_idle
;
3143 if (channel
== upload_channel
)
3147 else if (channel
== download_channel
)
3153 void peer_connection::expire_bandwidth(int channel
, int amount
)
3155 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3157 m_bandwidth_limit
[channel
].expire(amount
);
3158 if (channel
== upload_channel
)
3162 else if (channel
== download_channel
)
3168 void peer_connection::setup_send()
3170 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3172 if (m_channel_state
[upload_channel
] != peer_info::bw_idle
) return;
3174 shared_ptr
<torrent
> t
= m_torrent
.lock();
3176 if (m_bandwidth_limit
[upload_channel
].quota_left() == 0
3177 && !m_send_buffer
.empty()
3180 && !m_ignore_bandwidth_limits
)
3182 // in this case, we have data to send, but no
3183 // bandwidth. So, we simply request bandwidth
3186 if (m_bandwidth_limit
[upload_channel
].max_assignable() > 0)
3188 int priority
= is_interesting() * 2 + m_requests_in_buffer
.size();
3189 // peers that we are not interested in are non-prioritized
3190 m_channel_state
[upload_channel
] = peer_info::bw_torrent
;
3191 t
->request_bandwidth(upload_channel
, self()
3192 , m_send_buffer
.size(), priority
);
3193 #ifdef TORRENT_VERBOSE_LOGGING
3194 (*m_logger
) << time_now_string() << " *** REQUEST_BANDWIDTH [ upload prio: "
3195 << priority
<< "]\n";
3204 #ifdef TORRENT_VERBOSE_LOGGING
3205 (*m_logger
) << time_now_string() << " *** CANNOT WRITE ["
3206 " quota: " << m_bandwidth_limit
[download_channel
].quota_left() <<
3207 " ignore: " << (m_ignore_bandwidth_limits
?"yes":"no") <<
3208 " buf: " << m_send_buffer
.size() <<
3209 " connecting: " << (m_connecting
?"yes":"no") <<
3215 // send the actual buffer
3216 if (!m_send_buffer
.empty())
3218 int amount_to_send
= m_send_buffer
.size();
3219 int quota_left
= m_bandwidth_limit
[upload_channel
].quota_left();
3220 if (!m_ignore_bandwidth_limits
&& amount_to_send
> quota_left
)
3221 amount_to_send
= quota_left
;
3223 TORRENT_ASSERT(amount_to_send
> 0);
3225 #ifdef TORRENT_VERBOSE_LOGGING
3226 (*m_logger
) << time_now_string() << " *** ASYNC_WRITE [ bytes: " << amount_to_send
<< " ]\n";
3228 std::list
<asio::const_buffer
> const& vec
= m_send_buffer
.build_iovec(amount_to_send
);
3229 m_socket
->async_write_some(vec
, bind(&peer_connection::on_send_data
, self(), _1
, _2
));
3231 m_channel_state
[upload_channel
] = peer_info::bw_network
;
3235 void peer_connection::setup_receive()
3237 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3241 if (m_channel_state
[download_channel
] != peer_info::bw_idle
) return;
3243 shared_ptr
<torrent
> t
= m_torrent
.lock();
3245 if (m_bandwidth_limit
[download_channel
].quota_left() == 0
3248 && !m_ignore_bandwidth_limits
)
3250 if (m_bandwidth_limit
[download_channel
].max_assignable() > 0)
3252 #ifdef TORRENT_VERBOSE_LOGGING
3253 (*m_logger
) << time_now_string() << " *** REQUEST_BANDWIDTH [ download ]\n";
3255 TORRENT_ASSERT(m_channel_state
[download_channel
] == peer_info::bw_idle
);
3256 m_channel_state
[download_channel
] = peer_info::bw_torrent
;
3257 t
->request_bandwidth(download_channel
, self()
3258 , m_download_queue
.size() * 16 * 1024 + 30, m_priority
);
3265 #ifdef TORRENT_VERBOSE_LOGGING
3266 (*m_logger
) << time_now_string() << " *** CANNOT READ ["
3267 " quota: " << m_bandwidth_limit
[download_channel
].quota_left() <<
3268 " ignore: " << (m_ignore_bandwidth_limits
?"yes":"no") <<
3269 " outstanding: " << m_outstanding_writing_bytes
<<
3270 " outstanding-limit: " << m_ses
.settings().max_outstanding_disk_bytes_per_connection
<<
3276 TORRENT_ASSERT(m_packet_size
> 0);
3277 int max_receive
= m_packet_size
- m_recv_pos
;
3278 int quota_left
= m_bandwidth_limit
[download_channel
].quota_left();
3279 if (!m_ignore_bandwidth_limits
&& max_receive
> quota_left
)
3280 max_receive
= quota_left
;
3282 if (max_receive
== 0) return;
3284 TORRENT_ASSERT(m_recv_pos
>= 0);
3285 TORRENT_ASSERT(m_packet_size
> 0);
3286 TORRENT_ASSERT(can_read());
3287 #ifdef TORRENT_VERBOSE_LOGGING
3288 (*m_logger
) << time_now_string() << " *** ASYNC_READ [ max: " << max_receive
<< " bytes ]\n";
3291 int regular_buffer_size
= m_packet_size
- m_disk_recv_buffer_size
;
3293 if (int(m_recv_buffer
.size()) < regular_buffer_size
)
3294 m_recv_buffer
.resize(regular_buffer_size
);
3296 if (!m_disk_recv_buffer
|| regular_buffer_size
>= m_recv_pos
+ max_receive
)
3298 // only receive into regular buffer
3299 TORRENT_ASSERT(m_recv_pos
+ max_receive
<= int(m_recv_buffer
.size()));
3300 m_socket
->async_read_some(asio::buffer(&m_recv_buffer
[m_recv_pos
]
3301 , max_receive
), bind(&peer_connection::on_receive_data
, self(), _1
, _2
));
3303 else if (m_recv_pos
>= regular_buffer_size
)
3305 // only receive into disk buffer
3306 TORRENT_ASSERT(m_recv_pos
- regular_buffer_size
>= 0);
3307 TORRENT_ASSERT(m_recv_pos
- regular_buffer_size
+ max_receive
<= m_disk_recv_buffer_size
);
3308 m_socket
->async_read_some(asio::buffer(m_disk_recv_buffer
.get() + m_recv_pos
- regular_buffer_size
3310 , bind(&peer_connection::on_receive_data
, self(), _1
, _2
));
3314 // receive into both regular and disk buffer
3315 TORRENT_ASSERT(max_receive
+ m_recv_pos
> regular_buffer_size
);
3316 TORRENT_ASSERT(m_recv_pos
< regular_buffer_size
);
3317 TORRENT_ASSERT(max_receive
- regular_buffer_size
3318 + m_recv_pos
<= m_disk_recv_buffer_size
);
3320 boost::array
<asio::mutable_buffer
, 2> vec
;
3321 vec
[0] = asio::buffer(&m_recv_buffer
[m_recv_pos
]
3322 , regular_buffer_size
- m_recv_pos
);
3323 vec
[1] = asio::buffer(m_disk_recv_buffer
.get()
3324 , max_receive
- regular_buffer_size
+ m_recv_pos
);
3325 m_socket
->async_read_some(vec
, bind(&peer_connection::on_receive_data
3328 m_channel_state
[download_channel
] = peer_info::bw_network
;
3331 #ifndef TORRENT_DISABLE_ENCRYPTION
3333 // returns the last 'bytes' from the receive buffer
3334 std::pair
<buffer::interval
, buffer::interval
> peer_connection::wr_recv_buffers(int bytes
)
3336 TORRENT_ASSERT(bytes
<= m_recv_pos
);
3338 std::pair
<buffer::interval
, buffer::interval
> vec
;
3339 int regular_buffer_size
= m_packet_size
- m_disk_recv_buffer_size
;
3340 TORRENT_ASSERT(regular_buffer_size
>= 0);
3341 if (!m_disk_recv_buffer
|| regular_buffer_size
>= m_recv_pos
)
3343 vec
.first
= buffer::interval(&m_recv_buffer
[0]
3344 + m_recv_pos
- bytes
, &m_recv_buffer
[0] + m_recv_pos
);
3345 vec
.second
= buffer::interval(0,0);
3347 else if (m_recv_pos
- bytes
>= regular_buffer_size
)
3349 vec
.first
= buffer::interval(m_disk_recv_buffer
.get() + m_recv_pos
3350 - regular_buffer_size
- bytes
, m_disk_recv_buffer
.get() + m_recv_pos
3351 - regular_buffer_size
);
3352 vec
.second
= buffer::interval(0,0);
3356 TORRENT_ASSERT(m_recv_pos
- bytes
< regular_buffer_size
);
3357 TORRENT_ASSERT(m_recv_pos
> regular_buffer_size
);
3358 vec
.first
= buffer::interval(&m_recv_buffer
[0] + m_recv_pos
- bytes
3359 , &m_recv_buffer
[0] + regular_buffer_size
);
3360 vec
.second
= buffer::interval(m_disk_recv_buffer
.get()
3361 , m_disk_recv_buffer
.get() + m_recv_pos
- regular_buffer_size
);
3363 TORRENT_ASSERT(vec
.first
.left() + vec
.second
.left() == bytes
);
3368 void peer_connection::reset_recv_buffer(int packet_size
)
3370 TORRENT_ASSERT(packet_size
> 0);
3371 if (m_recv_pos
> m_packet_size
)
3373 cut_receive_buffer(m_packet_size
, packet_size
);
3377 m_packet_size
= packet_size
;
3380 void peer_connection::send_buffer(char const* buf
, int size
, int flags
)
3382 if (flags
== message_type_request
)
3383 m_requests_in_buffer
.push_back(m_send_buffer
.size() + size
);
3385 int free_space
= m_send_buffer
.space_in_last_buffer();
3386 if (free_space
> size
) free_space
= size
;
3389 m_send_buffer
.append(buf
, free_space
);
3392 #ifdef TORRENT_STATS
3393 m_ses
.m_buffer_usage_logger
<< log_time() << " send_buffer: "
3394 << free_space
<< std::endl
;
3395 m_ses
.log_buffer_usage();
3398 if (size
<= 0) return;
3400 std::pair
<char*, int> buffer
= m_ses
.allocate_buffer(size
);
3401 if (buffer
.first
== 0)
3403 disconnect("out of memory");
3406 TORRENT_ASSERT(buffer
.second
>= size
);
3407 std::memcpy(buffer
.first
, buf
, size
);
3408 m_send_buffer
.append_buffer(buffer
.first
, buffer
.second
, size
3409 , bind(&session_impl::free_buffer
, boost::ref(m_ses
), _1
, buffer
.second
));
3410 #ifdef TORRENT_STATS
3411 m_ses
.m_buffer_usage_logger
<< log_time() << " send_buffer_alloc: " << size
<< std::endl
;
3412 m_ses
.log_buffer_usage();
3417 // TODO: change this interface to automatically call setup_send() when the
3418 // return value is destructed
3419 buffer::interval
peer_connection::allocate_send_buffer(int size
)
3421 TORRENT_ASSERT(size
> 0);
3422 char* insert
= m_send_buffer
.allocate_appendix(size
);
3425 std::pair
<char*, int> buffer
= m_ses
.allocate_buffer(size
);
3426 if (buffer
.first
== 0)
3428 disconnect("out of memory");
3429 return buffer::interval(0, 0);
3431 TORRENT_ASSERT(buffer
.second
>= size
);
3432 m_send_buffer
.append_buffer(buffer
.first
, buffer
.second
, size
3433 , bind(&session_impl::free_buffer
, boost::ref(m_ses
), _1
, buffer
.second
));
3434 buffer::interval
ret(buffer
.first
, buffer
.first
+ size
);
3435 #ifdef TORRENT_STATS
3436 m_ses
.m_buffer_usage_logger
<< log_time() << " allocate_buffer_alloc: " << size
<< std::endl
;
3437 m_ses
.log_buffer_usage();
3443 #ifdef TORRENT_STATS
3444 m_ses
.m_buffer_usage_logger
<< log_time() << " allocate_buffer: " << size
<< std::endl
;
3445 m_ses
.log_buffer_usage();
3447 buffer::interval
ret(insert
, insert
+ size
);
3455 set_to_zero(T
& v
, bool cond
): m_val(v
), m_cond(cond
) {}
3456 void fire() { if (!m_cond
) return; m_cond
= false; m_val
= 0; }
3457 ~set_to_zero() { if (m_cond
) m_val
= 0; }
3463 // --------------------------
3465 // --------------------------
3467 // throws exception when the client should be disconnected
3468 void peer_connection::on_receive_data(const error_code
& error
3469 , std::size_t bytes_transferred
)
3471 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3475 TORRENT_ASSERT(m_channel_state
[download_channel
] == peer_info::bw_network
);
3476 m_channel_state
[download_channel
] = peer_info::bw_idle
;
3480 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
3481 (*m_logger
) << time_now_string() << " **ERROR**: "
3482 << error
.message() << "[in peer_connection::on_receive_data]\n";
3484 on_receive(error
, bytes_transferred
);
3485 disconnect(error
.message().c_str());
3489 int max_receive
= 0;
3492 #ifdef TORRENT_VERBOSE_LOGGING
3493 (*m_logger
) << "read " << bytes_transferred
<< " bytes\n";
3495 // correct the dl quota usage, if not all of the buffer was actually read
3496 if (!m_ignore_bandwidth_limits
)
3497 m_bandwidth_limit
[download_channel
].use_quota(bytes_transferred
);
3499 if (m_disconnecting
) return;
3501 TORRENT_ASSERT(m_packet_size
> 0);
3502 TORRENT_ASSERT(bytes_transferred
> 0);
3504 m_last_receive
= time_now();
3505 m_recv_pos
+= bytes_transferred
;
3506 TORRENT_ASSERT(m_recv_pos
<= int(m_recv_buffer
.size()
3507 + m_disk_recv_buffer_size
));
3510 size_type cur_payload_dl
= m_statistics
.last_payload_downloaded();
3511 size_type cur_protocol_dl
= m_statistics
.last_protocol_downloaded();
3513 on_receive(error
, bytes_transferred
);
3515 TORRENT_ASSERT(m_statistics
.last_payload_downloaded() - cur_payload_dl
>= 0);
3516 TORRENT_ASSERT(m_statistics
.last_protocol_downloaded() - cur_protocol_dl
>= 0);
3517 size_type stats_diff
= m_statistics
.last_payload_downloaded() - cur_payload_dl
+
3518 m_statistics
.last_protocol_downloaded() - cur_protocol_dl
;
3519 TORRENT_ASSERT(stats_diff
== bytes_transferred
);
3522 TORRENT_ASSERT(m_packet_size
> 0);
3526 && (m_recv_buffer
.capacity() - m_packet_size
) > 128)
3528 buffer(m_packet_size
).swap(m_recv_buffer
);
3531 max_receive
= m_packet_size
- m_recv_pos
;
3532 int quota_left
= m_bandwidth_limit
[download_channel
].quota_left();
3533 if (!m_ignore_bandwidth_limits
&& max_receive
> quota_left
)
3534 max_receive
= quota_left
;
3536 if (max_receive
== 0) break;
3538 int regular_buffer_size
= m_packet_size
- m_disk_recv_buffer_size
;
3540 if (int(m_recv_buffer
.size()) < regular_buffer_size
)
3541 m_recv_buffer
.resize(regular_buffer_size
);
3544 if (!m_disk_recv_buffer
|| regular_buffer_size
>= m_recv_pos
+ max_receive
)
3546 // only receive into regular buffer
3547 TORRENT_ASSERT(m_recv_pos
+ max_receive
<= int(m_recv_buffer
.size()));
3548 bytes_transferred
= m_socket
->read_some(asio::buffer(&m_recv_buffer
[m_recv_pos
]
3549 , max_receive
), ec
);
3551 else if (m_recv_pos
>= regular_buffer_size
)
3553 // only receive into disk buffer
3554 TORRENT_ASSERT(m_recv_pos
- regular_buffer_size
>= 0);
3555 TORRENT_ASSERT(m_recv_pos
- regular_buffer_size
+ max_receive
<= m_disk_recv_buffer_size
);
3556 bytes_transferred
= m_socket
->read_some(asio::buffer(m_disk_recv_buffer
.get()
3557 + m_recv_pos
- regular_buffer_size
, (std::min
)(m_packet_size
3558 - m_recv_pos
, max_receive
)), ec
);
3562 // receive into both regular and disk buffer
3563 TORRENT_ASSERT(max_receive
+ m_recv_pos
> regular_buffer_size
);
3564 TORRENT_ASSERT(m_recv_pos
< regular_buffer_size
);
3565 TORRENT_ASSERT(max_receive
- regular_buffer_size
3566 + m_recv_pos
<= m_disk_recv_buffer_size
);
3568 boost::array
<asio::mutable_buffer
, 2> vec
;
3569 vec
[0] = asio::buffer(&m_recv_buffer
[m_recv_pos
]
3570 , regular_buffer_size
- m_recv_pos
);
3571 vec
[1] = asio::buffer(m_disk_recv_buffer
.get()
3572 , (std::min
)(m_disk_recv_buffer_size
3573 , max_receive
- regular_buffer_size
+ m_recv_pos
));
3574 bytes_transferred
= m_socket
->read_some(vec
, ec
);
3576 if (ec
&& ec
!= asio::error::would_block
)
3578 disconnect(ec
.message().c_str());
3581 if (ec
== asio::error::would_block
) break;
3583 while (bytes_transferred
> 0);
3588 bool peer_connection::can_write() const
3590 // if we have requests or pending data to be sent or announcements to be made
3591 // we want to send data
3592 return !m_send_buffer
.empty()
3593 && (m_bandwidth_limit
[upload_channel
].quota_left() > 0
3594 || m_ignore_bandwidth_limits
)
3598 bool peer_connection::can_read() const
3600 bool ret
= (m_bandwidth_limit
[download_channel
].quota_left() > 0
3601 || m_ignore_bandwidth_limits
)
3603 && m_outstanding_writing_bytes
<
3604 m_ses
.settings().max_outstanding_disk_bytes_per_connection
;
3609 void peer_connection::connect(int ticket
)
3614 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
3615 (*m_ses
.m_logger
) << time_now_string() << " CONNECTING: " << m_remote
.address().to_string(ec
)
3616 << ":" << m_remote
.port() << "\n";
3619 m_connection_ticket
= ticket
;
3620 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
3623 TORRENT_ASSERT(m_connecting
);
3627 disconnect("torrent aborted");
3631 m_socket
->open(t
->get_interface().protocol(), ec
);
3634 disconnect(ec
.message().c_str());
3638 // set the socket to non-blocking, so that we can
3639 // read the entire buffer on each read event we get
3640 tcp::socket::non_blocking_io
ioc(true);
3641 m_socket
->io_control(ioc
, ec
);
3644 disconnect(ec
.message().c_str());
3648 tcp::endpoint bind_interface
= t
->get_interface();
3650 std::pair
<int, int> const& out_ports
= m_ses
.settings().outgoing_ports
;
3651 if (out_ports
.first
> 0 && out_ports
.second
>= out_ports
.first
)
3653 m_socket
->set_option(socket_acceptor::reuse_address(true), ec
);
3656 disconnect(ec
.message().c_str());
3659 bind_interface
.port(m_ses
.next_port());
3662 m_socket
->bind(bind_interface
, ec
);
3665 disconnect(ec
.message().c_str());
3668 m_socket
->async_connect(m_remote
3669 , bind(&peer_connection::on_connection_complete
, self(), _1
));
3670 m_connect
= time_now();
3671 m_statistics
.sent_syn();
3673 if (t
->alerts().should_post
<peer_connect_alert
>())
3675 t
->alerts().post_alert(peer_connect_alert(
3676 t
->get_handle(), remote(), pid()));
3680 void peer_connection::on_connection_complete(error_code
const& e
)
3682 ptime completed
= time_now();
3684 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3688 m_rtt
= total_milliseconds(completed
- m_connect
);
3690 if (m_disconnecting
) return;
3692 m_connecting
= false;
3693 m_ses
.m_half_open
.done(m_connection_ticket
);
3697 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
3698 (*m_ses
.m_logger
) << time_now_string() << " CONNECTION FAILED: " << m_remote
.address().to_string()
3699 << ": " << e
.message() << "\n";
3701 disconnect(e
.message().c_str(), 1);
3705 if (m_disconnecting
) return;
3706 m_last_receive
= time_now();
3708 // this means the connection just succeeded
3710 m_statistics
.received_synack();
3712 TORRENT_ASSERT(m_socket
);
3713 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING
3714 (*m_ses
.m_logger
) << time_now_string() << " COMPLETED: " << m_remote
.address().to_string()
3715 << " rtt = " << m_rtt
<< "\n";
3719 if (m_remote
== m_socket
->local_endpoint(ec
))
3721 // if the remote endpoint is the same as the local endpoint, we're connected
3723 disconnect("connected to ourselves", 1);
3727 if (m_remote
.address().is_v4())
3730 m_socket
->set_option(type_of_service(m_ses
.settings().peer_tos
), ec
);
3738 // --------------------------
3740 // --------------------------
3742 // throws exception when the client should be disconnected
3743 void peer_connection::on_send_data(error_code
const& error
3744 , std::size_t bytes_transferred
)
3746 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3750 TORRENT_ASSERT(m_channel_state
[upload_channel
] == peer_info::bw_network
);
3752 m_send_buffer
.pop_front(bytes_transferred
);
3754 for (std::vector
<int>::iterator i
= m_requests_in_buffer
.begin()
3755 , end(m_requests_in_buffer
.end()); i
!= end
; ++i
)
3756 *i
-= bytes_transferred
;
3758 while (!m_requests_in_buffer
.empty()
3759 && m_requests_in_buffer
.front() <= 0)
3760 m_requests_in_buffer
.erase(m_requests_in_buffer
.begin());
3762 m_channel_state
[upload_channel
] = peer_info::bw_idle
;
3764 if (!m_ignore_bandwidth_limits
)
3765 m_bandwidth_limit
[upload_channel
].use_quota(bytes_transferred
);
3767 #ifdef TORRENT_VERBOSE_LOGGING
3768 (*m_logger
) << "wrote " << bytes_transferred
<< " bytes\n";
3773 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
3774 (*m_logger
) << "**ERROR**: " << error
.message() << " [in peer_connection::on_send_data]\n";
3776 disconnect(error
.message().c_str());
3779 if (m_disconnecting
) return;
3781 TORRENT_ASSERT(!m_connecting
);
3782 TORRENT_ASSERT(bytes_transferred
> 0);
3784 m_last_sent
= time_now();
3787 size_type cur_payload_ul
= m_statistics
.last_payload_uploaded();
3788 size_type cur_protocol_ul
= m_statistics
.last_protocol_uploaded();
3790 on_sent(error
, bytes_transferred
);
3792 TORRENT_ASSERT(m_statistics
.last_payload_uploaded() - cur_payload_ul
>= 0);
3793 TORRENT_ASSERT(m_statistics
.last_protocol_uploaded() - cur_protocol_ul
>= 0);
3794 size_type stats_diff
= m_statistics
.last_payload_uploaded() - cur_payload_ul
3795 + m_statistics
.last_protocol_uploaded() - cur_protocol_ul
;
3796 TORRENT_ASSERT(stats_diff
== bytes_transferred
);
3805 void peer_connection::check_invariant() const
3807 TORRENT_ASSERT(bool(m_disk_recv_buffer
) == (m_disk_recv_buffer_size
> 0));
3809 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
3810 if (m_disconnecting
)
3813 TORRENT_ASSERT(m_disconnect_started
);
3815 else if (!m_in_constructor
)
3817 TORRENT_ASSERT(m_ses
.has_peer((peer_connection
*)this));
3821 // this assertion correct most of the time, but sometimes right when the
3822 // limit is changed it might break
3823 for (int i = 0; i < 2; ++i)
3825 // this peer is in the bandwidth history iff max_assignable < limit
3826 TORRENT_ASSERT((m_bandwidth_limit[i].max_assignable() < m_bandwidth_limit[i].throttle())
3827 == m_ses.m_bandwidth_manager[i]->is_in_history(this)
3828 || m_bandwidth_limit[i].throttle() == bandwidth_limit::inf);
3832 if (m_channel_state
[download_channel
] == peer_info::bw_torrent
3833 || m_channel_state
[download_channel
] == peer_info::bw_global
)
3834 TORRENT_ASSERT(m_bandwidth_limit
[download_channel
].quota_left() == 0);
3835 if (m_channel_state
[upload_channel
] == peer_info::bw_torrent
3836 || m_channel_state
[upload_channel
] == peer_info::bw_global
)
3837 TORRENT_ASSERT(m_bandwidth_limit
[upload_channel
].quota_left() == 0);
3839 std::set
<piece_block
> unique
;
3840 std::transform(m_download_queue
.begin(), m_download_queue
.end()
3841 , std::inserter(unique
, unique
.begin()), boost::bind(&pending_block::block
, _1
));
3842 std::copy(m_request_queue
.begin(), m_request_queue
.end(), std::inserter(unique
, unique
.begin()));
3843 TORRENT_ASSERT(unique
.size() == m_download_queue
.size() + m_request_queue
.size());
3846 TORRENT_ASSERT(m_peer_info
->prev_amount_upload
== 0);
3847 TORRENT_ASSERT(m_peer_info
->prev_amount_download
== 0);
3848 TORRENT_ASSERT(m_peer_info
->connection
== this
3849 || m_peer_info
->connection
== 0);
3851 if (m_peer_info
->optimistically_unchoked
)
3852 TORRENT_ASSERT(!is_choked());
3855 TORRENT_ASSERT(m_have_piece
.count() == m_num_pieces
);
3859 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3860 // since this connection doesn't have a torrent reference
3861 // no torrent should have a reference to this connection either
3862 for (aux::session_impl::torrent_map::const_iterator i
= m_ses
.m_torrents
.begin()
3863 , end(m_ses
.m_torrents
.end()); i
!= end
; ++i
)
3864 TORRENT_ASSERT(!i
->second
->has_peer((peer_connection
*)this));
3869 if (t
->ready_for_connections() && m_initialized
)
3870 TORRENT_ASSERT(t
->torrent_file().num_pieces() == int(m_have_piece
.size()));
3872 if (m_ses
.settings().close_redundant_connections
)
3874 // make sure upload only peers are disconnected
3875 if (t
->is_finished() && m_upload_only
)
3876 TORRENT_ASSERT(m_disconnect_started
);
3879 && m_bitfield_received
3880 && t
->are_files_checked())
3881 TORRENT_ASSERT(m_disconnect_started
);
3884 if (!m_disconnect_started
)
3886 // none of this matters if we're disconnecting anyway
3887 if (t
->is_finished())
3888 TORRENT_ASSERT(!m_interesting
);
3890 TORRENT_ASSERT(m_upload_only
);
3893 if (t
->has_picker())
3895 std::map
<piece_block
, int> num_requests
;
3896 for (torrent::const_peer_iterator i
= t
->begin(); i
!= t
->end(); ++i
)
3898 // make sure this peer is not a dangling pointer
3899 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3900 TORRENT_ASSERT(m_ses
.has_peer(*i
));
3902 peer_connection
const& p
= *(*i
);
3903 for (std::deque
<piece_block
>::const_iterator i
= p
.request_queue().begin()
3904 , end(p
.request_queue().end()); i
!= end
; ++i
)
3906 for (std::deque
<pending_block
>::const_iterator i
= p
.download_queue().begin()
3907 , end(p
.download_queue().end()); i
!= end
; ++i
)
3908 ++num_requests
[i
->block
];
3910 for (std::map
<piece_block
, int>::iterator i
= num_requests
.begin()
3911 , end(num_requests
.end()); i
!= end
; ++i
)
3913 if (!t
->picker().is_downloaded(i
->first
))
3914 TORRENT_ASSERT(t
->picker().num_peers(i
->first
) == i
->second
);
3917 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3920 policy::const_iterator i
= t
->get_policy().begin_peer();
3921 policy::const_iterator end
= t
->get_policy().end_peer();
3922 for (; i
!= end
; ++i
)
3924 if (&i
->second
== m_peer_info
) break;
3926 TORRENT_ASSERT(i
!= end
);
3929 if (t
->has_picker() && !t
->is_aborted())
3931 // make sure that pieces that have completed the download
3932 // of all their blocks are in the disk io thread's queue
3934 const std::vector
<piece_picker::downloading_piece
>& dl_queue
3935 = t
->picker().get_download_queue();
3936 for (std::vector
<piece_picker::downloading_piece
>::const_iterator i
=
3937 dl_queue
.begin(); i
!= dl_queue
.end(); ++i
)
3939 const int blocks_per_piece
= t
->picker().blocks_in_piece(i
->index
);
3941 bool complete
= true;
3942 for (int j
= 0; j
< blocks_per_piece
; ++j
)
3944 if (i
->info
[j
].state
== piece_picker::block_info::state_finished
)
3950 // this invariant is not valid anymore since the completion event
3951 // might be queued in the io service
3952 if (complete && !piece_failed)
3954 disk_io_job ret = m_ses.m_disk_thread.find_job(
3955 &t->filesystem(), -1, i->index);
3956 TORRENT_ASSERT(ret.action == disk_io_job::hash || ret.action == disk_io_job::write);
3957 TORRENT_ASSERT(ret.piece == i->index);
3963 // extremely expensive invariant check
3967 piece_picker& p = t->picker();
3968 const std::vector<piece_picker::downloading_piece>& dlq = p.get_download_queue();
3969 const int blocks_per_piece = static_cast<int>(
3970 t->torrent_file().piece_length() / t->block_size());
3972 for (std::vector<piece_picker::downloading_piece>::const_iterator i =
3973 dlq.begin(); i != dlq.end(); ++i)
3975 for (int j = 0; j < blocks_per_piece; ++j)
3977 if (std::find(m_request_queue.begin(), m_request_queue.end()
3978 , piece_block(i->index, j)) != m_request_queue.end()
3980 std::find(m_download_queue.begin(), m_download_queue.end()
3981 , piece_block(i->index, j)) != m_download_queue.end())
3983 TORRENT_ASSERT(i->info[j].peer == m_remote);
3987 TORRENT_ASSERT(i->info[j].peer != m_remote || i->info[j].finished);
3996 peer_connection::peer_speed_t
peer_connection::peer_speed()
3998 shared_ptr
<torrent
> t
= m_torrent
.lock();
4001 int download_rate
= int(statistics().download_payload_rate());
4002 int torrent_download_rate
= int(t
->statistics().download_payload_rate());
4004 if (download_rate
> 512 && download_rate
> torrent_download_rate
/ 16)
4006 else if (download_rate
> 4096 && download_rate
> torrent_download_rate
/ 64)
4008 else if (download_rate
< torrent_download_rate
/ 15 && m_speed
== fast
)
4016 void peer_connection::keep_alive()
4021 d
= time_now() - m_last_sent
;
4022 if (total_seconds(d
) < m_timeout
/ 2) return;
4024 if (m_connecting
) return;
4025 if (in_handshake()) return;
4027 // if the last send has not completed yet, do not send a keep
4029 if (m_channel_state
[upload_channel
] != peer_info::bw_idle
) return;
4031 #ifdef TORRENT_VERBOSE_LOGGING
4032 (*m_logger
) << time_now_string() << " ==> KEEPALIVE\n";
4035 m_last_sent
= time_now();
4039 bool peer_connection::is_seed() const
4041 // if m_num_pieces == 0, we probably don't have the
4043 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
4044 return m_num_pieces
== (int)m_have_piece
.size() && m_num_pieces
> 0 && t
&& t
->valid_metadata();