3 Copyright (c) 2003, Arvid Norberg
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #include "libtorrent/pch.hpp"
39 #include <boost/bind.hpp>
41 #include "libtorrent/peer_connection.hpp"
42 #include "libtorrent/identify_client.hpp"
43 #include "libtorrent/entry.hpp"
44 #include "libtorrent/bencode.hpp"
45 #include "libtorrent/alert_types.hpp"
46 #include "libtorrent/invariant_check.hpp"
47 #include "libtorrent/io.hpp"
48 #include "libtorrent/file.hpp"
49 #include "libtorrent/version.hpp"
50 #include "libtorrent/extensions.hpp"
51 #include "libtorrent/aux_/session_impl.hpp"
52 #include "libtorrent/policy.hpp"
53 #include "libtorrent/socket_type.hpp"
54 #include "libtorrent/assert.hpp"
56 //#define TORRENT_CORRUPT_DATA
59 using boost::shared_ptr
;
60 using libtorrent::aux::session_impl
;
64 // outbound connection
65 peer_connection::peer_connection(
67 , boost::weak_ptr
<torrent
> tor
68 , shared_ptr
<socket_type
> s
69 , tcp::endpoint
const& endp
70 , policy::peer
* peerinfo
)
73 m_last_choke(time_now() - hours(1))
77 , m_max_out_request_queue(m_ses
.settings().max_out_request_queue
)
78 , m_last_piece(time_now())
79 , m_last_request(time_now())
80 , m_last_incoming_request(min_time())
81 , m_last_unchoke(min_time())
82 , m_last_receive(time_now())
83 , m_last_sent(time_now())
84 , m_requested(min_time())
86 , m_remote_dl_update(time_now())
87 , m_connect(time_now())
88 , m_became_uninterested(time_now())
89 , m_became_uninteresting(time_now())
91 , m_downloaded_at_last_unchoke(0)
92 , m_disk_recv_buffer(ses
, 0)
97 , m_timeout(m_ses
.settings().peer_timeout
)
100 , m_disk_recv_buffer_size(0)
102 , m_num_invalid_requests(0)
104 , m_upload_limit(bandwidth_limit::inf
)
105 , m_download_limit(bandwidth_limit::inf
)
106 , m_peer_info(peerinfo
)
108 , m_connection_ticket(-1)
109 , m_remote_bytes_dled(0)
110 , m_remote_dl_rate(0)
111 , m_outstanding_writing_bytes(0)
112 , m_download_rate_peak(0)
113 , m_upload_rate_peak(0)
115 , m_prefer_whole_pieces(0)
116 , m_desired_queue_size(2)
117 , m_fast_reconnect(false)
119 , m_peer_interested(false)
120 , m_peer_choked(true)
121 , m_interesting(false)
124 , m_ignore_bandwidth_limits(false)
126 , m_disconnecting(false)
129 , m_request_large_blocks(false)
130 , m_upload_only(false)
132 , m_bitfield_received(false)
134 , m_in_constructor(true)
135 , m_disconnect_started(false)
136 , m_initialized(false)
139 m_channel_state
[upload_channel
] = peer_info::bw_idle
;
140 m_channel_state
[download_channel
] = peer_info::bw_idle
;
142 TORRENT_ASSERT(peerinfo
== 0 || peerinfo
->banned
== false);
143 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
144 std::fill(m_country
, m_country
+ 2, 0);
145 #ifndef TORRENT_DISABLE_GEO_IP
146 if (m_ses
.has_country_db())
148 char const *country
= m_ses
.country_for_ip(m_remote
.address());
151 m_country
[0] = country
[0];
152 m_country
[1] = country
[1];
157 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
158 m_logger
= m_ses
.create_log(m_remote
.address().to_string() + "_"
159 + boost::lexical_cast
<std::string
>(m_remote
.port()), m_ses
.listen_port());
160 (*m_logger
) << "*** OUTGOING CONNECTION\n";
163 piece_failed
= false;
165 #ifndef TORRENT_DISABLE_GEO_IP
166 m_inet_as_name
= m_ses
.as_name_for_ip(m_remote
.address());
169 std::fill(m_peer_id
.begin(), m_peer_id
.end(), 0);
172 // incoming connection
173 peer_connection::peer_connection(
175 , shared_ptr
<socket_type
> s
176 , tcp::endpoint
const& endp
177 , policy::peer
* peerinfo
)
180 m_last_choke(time_now() - hours(1))
184 , m_max_out_request_queue(m_ses
.settings().max_out_request_queue
)
185 , m_last_piece(time_now())
186 , m_last_request(time_now())
187 , m_last_incoming_request(min_time())
188 , m_last_unchoke(min_time())
189 , m_last_receive(time_now())
190 , m_last_sent(time_now())
191 , m_requested(min_time())
192 , m_timeout_extend(0)
193 , m_remote_dl_update(time_now())
194 , m_connect(time_now())
195 , m_became_uninterested(time_now())
196 , m_became_uninteresting(time_now())
198 , m_downloaded_at_last_unchoke(0)
199 , m_disk_recv_buffer(ses
, 0)
203 , m_timeout(m_ses
.settings().peer_timeout
)
206 , m_disk_recv_buffer_size(0)
208 , m_num_invalid_requests(0)
210 , m_upload_limit(bandwidth_limit::inf
)
211 , m_download_limit(bandwidth_limit::inf
)
212 , m_peer_info(peerinfo
)
214 , m_connection_ticket(-1)
215 , m_remote_bytes_dled(0)
216 , m_remote_dl_rate(0)
217 , m_outstanding_writing_bytes(0)
218 , m_download_rate_peak(0)
219 , m_upload_rate_peak(0)
221 , m_prefer_whole_pieces(0)
222 , m_desired_queue_size(2)
223 , m_fast_reconnect(false)
225 , m_peer_interested(false)
226 , m_peer_choked(true)
227 , m_interesting(false)
230 , m_ignore_bandwidth_limits(false)
232 , m_disconnecting(false)
233 , m_connecting(false)
235 , m_request_large_blocks(false)
236 , m_upload_only(false)
238 , m_bitfield_received(false)
240 , m_in_constructor(true)
241 , m_disconnect_started(false)
242 , m_initialized(false)
245 m_channel_state
[upload_channel
] = peer_info::bw_idle
;
246 m_channel_state
[download_channel
] = peer_info::bw_idle
;
248 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
249 std::fill(m_country
, m_country
+ 2, 0);
250 #ifndef TORRENT_DISABLE_GEO_IP
251 if (m_ses
.has_country_db())
253 char const *country
= m_ses
.country_for_ip(m_remote
.address());
256 m_country
[0] = country
[0];
257 m_country
[1] = country
[1];
263 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
265 TORRENT_ASSERT(m_socket
->remote_endpoint(ec
) == m_remote
|| ec
);
266 m_logger
= m_ses
.create_log(remote().address().to_string(ec
) + "_"
267 + boost::lexical_cast
<std::string
>(remote().port()), m_ses
.listen_port());
268 (*m_logger
) << "*** INCOMING CONNECTION\n";
271 #ifndef TORRENT_DISABLE_GEO_IP
272 m_inet_as_name
= m_ses
.as_name_for_ip(m_remote
.address());
275 piece_failed
= false;
277 std::fill(m_peer_id
.begin(), m_peer_id
.end(), 0);
280 bool peer_connection::unchoke_compare(boost::intrusive_ptr
<peer_connection
const> const& p
) const
283 peer_connection
const& rhs
= *p
;
285 // first compare how many bytes they've sent us
286 size_type c1
= m_statistics
.total_payload_download() - m_downloaded_at_last_unchoke
;
287 size_type c2
= rhs
.m_statistics
.total_payload_download() - rhs
.m_downloaded_at_last_unchoke
;
288 if (c1
> c2
) return true;
289 if (c1
< c2
) return false;
291 // if they are equal, compare how much we have uploaded
292 if (m_peer_info
) c1
= m_peer_info
->total_upload();
293 else c1
= m_statistics
.total_payload_upload();
294 if (rhs
.m_peer_info
) c2
= rhs
.m_peer_info
->total_upload();
295 else c2
= rhs
.m_statistics
.total_payload_upload();
297 // in order to not switch back and forth too often,
298 // unchoked peers must be at least one piece ahead
299 // of a choked peer to be sorted at a lower unchoke-priority
300 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
302 if (!is_choked()) c1
-= t
->torrent_file().piece_length();
303 if (!rhs
.is_choked()) c2
-= t
->torrent_file().piece_length();
308 void peer_connection::reset_choke_counters()
310 m_downloaded_at_last_unchoke
= m_statistics
.total_payload_download();
313 void peer_connection::start()
315 TORRENT_ASSERT(m_peer_info
== 0 || m_peer_info
->connection
== this);
316 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
320 tcp::socket::non_blocking_io
ioc(true);
322 m_socket
->io_control(ioc
, ec
);
325 disconnect(ec
.message().c_str());
328 m_remote
= m_socket
->remote_endpoint(ec
);
331 disconnect(ec
.message().c_str());
334 if (m_remote
.address().is_v4())
335 m_socket
->set_option(type_of_service(m_ses
.settings().peer_tos
), ec
);
337 else if (t
->ready_for_connections())
343 void peer_connection::update_interest()
345 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
348 // if m_have_piece is 0, it means the connections
349 // have not been initialized yet. The interested
350 // flag will be updated once they are.
351 if (m_have_piece
.size() == 0) return;
352 if (!t
->ready_for_connections()) return;
354 bool interested
= false;
355 if (!t
->is_finished())
357 piece_picker
const& p
= t
->picker();
358 int num_pieces
= p
.num_pieces();
359 for (int j
= 0; j
!= num_pieces
; ++j
)
362 && t
->piece_priority(j
) > 0
372 if (!interested
) send_not_interested();
373 else t
->get_policy().peer_is_interesting(*this);
375 // may throw an asio error if socket has disconnected
376 catch (std::exception
&) {}
378 TORRENT_ASSERT(in_handshake() || is_interesting() == interested
);
381 #ifndef TORRENT_DISABLE_EXTENSIONS
382 void peer_connection::add_extension(boost::shared_ptr
<peer_plugin
> ext
)
384 m_extensions
.push_back(ext
);
388 void peer_connection::send_allowed_set()
392 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
395 int num_allowed_pieces
= m_ses
.settings().allowed_fast_set_size
;
396 int num_pieces
= t
->torrent_file().num_pieces();
398 if (num_allowed_pieces
>= num_pieces
)
400 for (int i
= 0; i
< num_pieces
; ++i
)
402 #ifdef TORRENT_VERBOSE_LOGGING
403 (*m_logger
) << time_now_string()
404 << " ==> ALLOWED_FAST [ " << i
<< " ]\n";
407 m_accept_fast
.insert(i
);
413 address
const& addr
= m_remote
.address();
416 address_v4::bytes_type bytes
= addr
.to_v4().to_bytes();
417 x
.assign((char*)&bytes
[0], bytes
.size());
421 address_v6::bytes_type bytes
= addr
.to_v6().to_bytes();
422 x
.assign((char*)&bytes
[0], bytes
.size());
424 x
.append((char*)&t
->torrent_file().info_hash()[0], 20);
426 sha1_hash hash
= hasher(&x
[0], x
.size()).final();
429 char* p
= (char*)&hash
[0];
430 for (int i
= 0; i
< 5; ++i
)
432 int piece
= detail::read_uint32(p
) % num_pieces
;
433 if (m_accept_fast
.find(piece
) == m_accept_fast
.end())
435 #ifdef TORRENT_VERBOSE_LOGGING
436 (*m_logger
) << time_now_string()
437 << " ==> ALLOWED_FAST [ " << piece
<< " ]\n";
439 write_allow_fast(piece
);
440 m_accept_fast
.insert(piece
);
441 if (int(m_accept_fast
.size()) >= num_allowed_pieces
442 || int(m_accept_fast
.size()) == num_pieces
) return;
445 hash
= hasher((char*)&hash
[0], 20).final();
449 void peer_connection::on_metadata_impl()
451 boost::shared_ptr
<torrent
> t
= associated_torrent().lock();
452 m_have_piece
.resize(t
->torrent_file().num_pieces(), m_have_all
);
453 m_num_pieces
= m_have_piece
.count();
454 if (m_num_pieces
== int(m_have_piece
.size()))
456 #ifdef TORRENT_VERBOSE_LOGGING
457 (*m_logger
) << time_now_string()
458 << " *** on_metadata(): THIS IS A SEED ***\n";
460 // if this is a web seed. we don't have a peer_info struct
461 if (m_peer_info
) m_peer_info
->seed
= true;
462 m_upload_only
= true;
465 disconnect_if_redundant();
466 if (m_disconnecting
) return;
469 if (m_disconnecting
) return;
471 if (!t
->is_finished())
472 t
->get_policy().peer_is_interesting(*this);
476 TORRENT_ASSERT(!m_have_all
);
479 if (m_disconnecting
) return;
481 // let the torrent know which pieces the
483 // if we're a seed, we don't keep track of piece availability
484 bool interesting
= false;
487 t
->peer_has(m_have_piece
);
489 for (int i
= 0; i
< (int)m_have_piece
.size(); ++i
)
493 if (!t
->have_piece(i
) && t
->picker().piece_priority(i
) != 0)
499 if (interesting
) t
->get_policy().peer_is_interesting(*this);
500 else if (upload_only()) disconnect("upload to upload connections");
503 void peer_connection::init()
507 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
509 TORRENT_ASSERT(t
->valid_metadata());
510 TORRENT_ASSERT(t
->ready_for_connections());
512 m_have_piece
.resize(t
->torrent_file().num_pieces(), m_have_all
);
514 if (m_have_all
) m_num_pieces
= t
->torrent_file().num_pieces();
516 m_initialized
= true;
518 // now that we have a piece_picker,
519 // update it with this peer's pieces
521 TORRENT_ASSERT(m_num_pieces
== m_have_piece
.count());
523 if (m_num_pieces
== int(m_have_piece
.size()))
525 #ifdef TORRENT_VERBOSE_LOGGING
526 (*m_logger
) << " *** THIS IS A SEED ***\n";
528 // if this is a web seed. we don't have a peer_info struct
529 if (m_peer_info
) m_peer_info
->seed
= true;
530 m_upload_only
= true;
533 if (t
->is_finished()) send_not_interested();
534 else t
->get_policy().peer_is_interesting(*this);
538 // if we're a seed, we don't keep track of piece availability
541 t
->peer_has(m_have_piece
);
542 bool interesting
= false;
543 for (int i
= 0; i
< int(m_have_piece
.size()); ++i
)
547 // if the peer has a piece and we don't, the peer is interesting
548 if (!t
->have_piece(i
)
549 && t
->picker().piece_priority(i
) != 0)
553 if (interesting
) t
->get_policy().peer_is_interesting(*this);
554 else send_not_interested();
562 peer_connection::~peer_connection()
565 TORRENT_ASSERT(!m_in_constructor
);
566 TORRENT_ASSERT(m_disconnecting
);
567 TORRENT_ASSERT(m_disconnect_started
);
569 m_disk_recv_buffer_size
= 0;
571 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
574 (*m_logger
) << time_now_string()
575 << " *** CONNECTION CLOSED\n";
578 TORRENT_ASSERT(!m_ses
.has_peer(this));
580 for (aux::session_impl::torrent_map::const_iterator i
= m_ses
.m_torrents
.begin()
581 , end(m_ses
.m_torrents
.end()); i
!= end
; ++i
)
582 TORRENT_ASSERT(!i
->second
->has_peer(this));
584 TORRENT_ASSERT(m_peer_info
->connection
== 0);
586 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
590 int peer_connection::picker_options() const
593 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
597 if (t
->is_sequential_download())
599 ret
|= piece_picker::sequential
;
601 else if (t
->num_have() < t
->settings().initial_picker_threshold
)
603 // if we have fewer pieces than a certain threshols
604 // don't pick rare pieces, just pick random ones,
605 // and prioritize finishing them
606 ret
|= piece_picker::prioritize_partials
;
610 ret
|= piece_picker::rarest_first
;
615 // snubbed peers should request
616 // the common pieces first, just to make
617 // it more likely for all snubbed peers to
618 // request blocks from the same piece
619 ret
|= piece_picker::reverse
;
622 if (t
->settings().prioritize_partial_pieces
)
623 ret
|= piece_picker::prioritize_partials
;
625 if (on_parole()) ret
|= piece_picker::on_parole
626 | piece_picker::prioritize_partials
;
628 // only one of rarest_first, common_first and sequential can be set.
629 TORRENT_ASSERT(bool(ret
& piece_picker::rarest_first
)
630 + bool(ret
& piece_picker::sequential
) <= 1);
634 void peer_connection::fast_reconnect(bool r
)
636 if (!peer_info_struct() || peer_info_struct()->fast_reconnects
> 1)
638 m_fast_reconnect
= r
;
639 peer_info_struct()->connected
= time_now()
640 - seconds(m_ses
.settings().min_reconnect_time
641 * m_ses
.settings().max_failcount
);
642 ++peer_info_struct()->fast_reconnects
;
645 void peer_connection::announce_piece(int index
)
647 // dont announce during handshake
648 if (in_handshake()) return;
650 // remove suggested pieces that we have
651 std::vector
<int>::iterator i
= std::find(
652 m_suggested_pieces
.begin(), m_suggested_pieces
.end(), index
);
653 if (i
!= m_suggested_pieces
.end()) m_suggested_pieces
.erase(i
);
655 if (has_piece(index
))
657 // if we got a piece that this peer has
658 // it might have been the last interesting
659 // piece this peer had. We might not be
660 // interested anymore
662 if (is_disconnecting()) return;
664 // optimization, don't send have messages
665 // to peers that already have the piece
666 if (!m_ses
.settings().send_redundant_have
)
668 #ifdef TORRENT_VERBOSE_LOGGING
669 (*m_logger
) << time_now_string()
670 << " ==> HAVE [ piece: " << index
<< " ] SUPRESSED\n";
676 #ifdef TORRENT_VERBOSE_LOGGING
677 (*m_logger
) << time_now_string()
678 << " ==> HAVE [ piece: " << index
<< "]\n";
682 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
684 TORRENT_ASSERT(t
->have_piece(index
));
688 bool peer_connection::has_piece(int i
) const
690 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
692 TORRENT_ASSERT(t
->valid_metadata());
693 TORRENT_ASSERT(i
>= 0);
694 TORRENT_ASSERT(i
< t
->torrent_file().num_pieces());
695 return m_have_piece
[i
];
698 std::deque
<piece_block
> const& peer_connection::request_queue() const
700 return m_request_queue
;
703 std::deque
<pending_block
> const& peer_connection::download_queue() const
705 return m_download_queue
;
708 std::deque
<peer_request
> const& peer_connection::upload_queue() const
713 void peer_connection::add_stat(size_type downloaded
, size_type uploaded
)
715 m_statistics
.add_stat(downloaded
, uploaded
);
718 bitfield
const& peer_connection::get_bitfield() const
723 void peer_connection::received_valid_data(int index
)
727 #ifndef TORRENT_DISABLE_EXTENSIONS
728 for (extension_list_t::iterator i
= m_extensions
.begin()
729 , end(m_extensions
.end()); i
!= end
; ++i
)
731 #ifdef BOOST_NO_EXCEPTIONS
732 (*i
)->on_piece_pass(index
);
734 try { (*i
)->on_piece_pass(index
); } catch (std::exception
&) {}
740 void peer_connection::received_invalid_data(int index
)
744 #ifndef TORRENT_DISABLE_EXTENSIONS
745 for (extension_list_t::iterator i
= m_extensions
.begin()
746 , end(m_extensions
.end()); i
!= end
; ++i
)
748 #ifdef BOOST_NO_EXCEPTIONS
749 (*i
)->on_piece_failed(index
);
751 try { (*i
)->on_piece_failed(index
); } catch (std::exception
&) {}
755 if (is_disconnecting()) return;
757 if (peer_info_struct())
759 if (m_ses
.settings().use_parole_mode
)
760 peer_info_struct()->on_parole
= true;
762 ++peer_info_struct()->hashfails
;
763 boost::int8_t& trust_points
= peer_info_struct()->trust_points
;
765 // we decrease more than we increase, to keep the
766 // allowed failed/passed ratio low.
767 // TODO: make this limit user settable
769 if (trust_points
< -7) trust_points
= -7;
773 size_type
peer_connection::total_free_upload() const
775 return m_free_upload
;
778 void peer_connection::add_free_upload(size_type free_upload
)
782 m_free_upload
+= free_upload
;
785 // verifies a piece to see if it is valid (is within a valid range)
786 // and if it can correspond to a request generated by libtorrent.
787 bool peer_connection::verify_piece(const peer_request
& p
) const
789 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
792 TORRENT_ASSERT(t
->valid_metadata());
793 torrent_info
const& ti
= t
->torrent_file();
796 && p
.piece
< t
->torrent_file().num_pieces()
799 && (p
.length
== t
->block_size()
800 || (p
.length
< t
->block_size()
801 && p
.piece
== ti
.num_pieces()-1
802 && p
.start
+ p
.length
== ti
.piece_size(p
.piece
))
803 || (m_request_large_blocks
804 && p
.length
<= ti
.piece_length() * m_prefer_whole_pieces
== 0 ?
805 1 : m_prefer_whole_pieces
))
806 && p
.piece
* size_type(ti
.piece_length()) + p
.start
+ p
.length
808 && (p
.start
% t
->block_size() == 0);
811 void peer_connection::attach_to_torrent(sha1_hash
const& ih
)
815 TORRENT_ASSERT(!m_disconnecting
);
816 TORRENT_ASSERT(m_torrent
.expired());
817 boost::weak_ptr
<torrent
> wpt
= m_ses
.find_torrent(ih
);
818 boost::shared_ptr
<torrent
> t
= wpt
.lock();
820 if (t
&& t
->is_aborted())
822 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
823 (*m_logger
) << " *** the torrent has been aborted\n";
830 // we couldn't find the torrent!
831 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
832 (*m_logger
) << " *** couldn't find a torrent with the given info_hash: " << ih
<< "\n";
833 (*m_logger
) << " torrents:\n";
834 session_impl::torrent_map
const& torrents
= m_ses
.m_torrents
;
835 for (session_impl::torrent_map::const_iterator i
= torrents
.begin()
836 , end(torrents
.end()); i
!= end
; ++i
)
838 (*m_logger
) << " " << i
->second
->torrent_file().info_hash() << "\n";
841 disconnect("got invalid info-hash", 2);
847 // paused torrents will not accept
848 // incoming connections
849 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
850 (*m_logger
) << " rejected connection to paused torrent\n";
852 disconnect("connection rejected bacause torrent is paused");
856 TORRENT_ASSERT(m_torrent
.expired());
857 // check to make sure we don't have another connection with the same
858 // info_hash and peer_id. If we do. close this connection.
863 t
->attach_peer(this);
866 catch (std::exception
& e
)
868 std::cout
<< e
.what() << std::endl
;
869 TORRENT_ASSERT(false);
872 if (m_disconnecting
) return;
875 TORRENT_ASSERT(!m_torrent
.expired());
877 // if the torrent isn't ready to accept
878 // connections yet, we'll have to wait with
879 // our initialization
880 if (t
->ready_for_connections()) init();
882 TORRENT_ASSERT(!m_torrent
.expired());
884 // assume the other end has no pieces
885 // if we don't have valid metadata yet,
886 // leave the vector unallocated
887 TORRENT_ASSERT(m_num_pieces
== 0);
888 m_have_piece
.clear_all();
889 TORRENT_ASSERT(!m_torrent
.expired());
894 // -----------------------------
895 // --------- KEEPALIVE ---------
896 // -----------------------------
898 void peer_connection::incoming_keepalive()
902 #ifdef TORRENT_VERBOSE_LOGGING
903 (*m_logger
) << time_now_string() << " <== KEEPALIVE\n";
907 // -----------------------------
908 // ----------- CHOKE -----------
909 // -----------------------------
911 void peer_connection::incoming_choke()
915 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
918 #ifndef TORRENT_DISABLE_EXTENSIONS
919 for (extension_list_t::iterator i
= m_extensions
.begin()
920 , end(m_extensions
.end()); i
!= end
; ++i
)
922 if ((*i
)->on_choke()) return;
925 if (is_disconnecting()) return;
927 #ifdef TORRENT_VERBOSE_LOGGING
928 (*m_logger
) << time_now_string() << " <== CHOKE\n";
930 m_peer_choked
= true;
932 if (peer_info_struct() == 0 || !peer_info_struct()->on_parole
)
934 // if the peer is not in parole mode, clear the queued
938 piece_picker
& p
= t
->picker();
939 for (std::deque
<piece_block
>::const_iterator i
= m_request_queue
.begin()
940 , end(m_request_queue
.end()); i
!= end
; ++i
)
942 // since this piece was skipped, clear it and allow it to
943 // be requested from other peers
944 p
.abort_download(*i
);
947 m_request_queue
.clear();
951 bool match_request(peer_request
const& r
, piece_block
const& b
, int block_size
)
953 if (b
.piece_index
!= r
.piece
) return false;
954 if (b
.block_index
!= r
.start
/ block_size
) return false;
955 if (r
.start
% block_size
!= 0) return false;
959 // -----------------------------
960 // -------- REJECT PIECE -------
961 // -----------------------------
963 void peer_connection::incoming_reject_request(peer_request
const& r
)
967 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
970 #ifndef TORRENT_DISABLE_EXTENSIONS
971 for (extension_list_t::iterator i
= m_extensions
.begin()
972 , end(m_extensions
.end()); i
!= end
; ++i
)
974 if ((*i
)->on_reject(r
)) return;
978 if (is_disconnecting()) return;
980 std::deque
<pending_block
>::iterator i
= std::find_if(
981 m_download_queue
.begin(), m_download_queue
.end()
982 , bind(match_request
, boost::cref(r
), bind(&pending_block::block
, _1
)
985 #ifdef TORRENT_VERBOSE_LOGGING
986 (*m_logger
) << time_now_string()
987 << " <== REJECT_PIECE [ piece: " << r
.piece
<< " | s: " << r
.start
<< " | l: " << r
.length
<< " ]\n";
990 piece_block
b(-1, 0);
991 if (i
!= m_download_queue
.end())
994 m_download_queue
.erase(i
);
996 // if the peer is in parole mode, keep the request
997 if (peer_info_struct() && peer_info_struct()->on_parole
)
999 m_request_queue
.push_front(b
);
1001 else if (!t
->is_seed())
1003 piece_picker
& p
= t
->picker();
1004 p
.abort_download(b
);
1007 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1010 (*m_logger
) << time_now_string()
1011 << " *** PIECE NOT IN REQUEST QUEUE\n";
1014 if (has_peer_choked())
1016 // if we're choked and we got a rejection of
1017 // a piece in the allowed fast set, remove it
1018 // from the allow fast set.
1019 std::vector
<int>::iterator i
= std::find(
1020 m_allowed_fast
.begin(), m_allowed_fast
.end(), r
.piece
);
1021 if (i
!= m_allowed_fast
.end()) m_allowed_fast
.erase(i
);
1025 std::vector
<int>::iterator i
= std::find(m_suggested_pieces
.begin()
1026 , m_suggested_pieces
.end(), r
.piece
);
1027 if (i
!= m_suggested_pieces
.end())
1028 m_suggested_pieces
.erase(i
);
1031 if (m_request_queue
.empty() && m_download_queue
.size() < 2)
1033 request_a_block(*t
, *this);
1034 send_block_requests();
1038 // -----------------------------
1039 // ------- SUGGEST PIECE -------
1040 // -----------------------------
1042 void peer_connection::incoming_suggest(int index
)
1046 #ifdef TORRENT_VERBOSE_LOGGING
1047 (*m_logger
) << time_now_string()
1048 << " <== SUGGEST_PIECE [ piece: " << index
<< " ]\n";
1050 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1053 #ifndef TORRENT_DISABLE_EXTENSIONS
1054 for (extension_list_t::iterator i
= m_extensions
.begin()
1055 , end(m_extensions
.end()); i
!= end
; ++i
)
1057 if ((*i
)->on_suggest(index
)) return;
1061 if (is_disconnecting()) return;
1062 if (t
->have_piece(index
)) return;
1064 if (m_suggested_pieces
.size() > 9)
1065 m_suggested_pieces
.erase(m_suggested_pieces
.begin());
1066 m_suggested_pieces
.push_back(index
);
1068 #ifdef TORRENT_VERBOSE_LOGGING
1069 (*m_logger
) << time_now_string()
1070 << " ** SUGGEST_PIECE [ piece: " << index
<< " added to set: " << m_suggested_pieces
.size() << " ]\n";
1074 // -----------------------------
1075 // ---------- UNCHOKE ----------
1076 // -----------------------------
1078 void peer_connection::incoming_unchoke()
1082 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1085 #ifndef TORRENT_DISABLE_EXTENSIONS
1086 for (extension_list_t::iterator i
= m_extensions
.begin()
1087 , end(m_extensions
.end()); i
!= end
; ++i
)
1089 if ((*i
)->on_unchoke()) return;
1093 #ifdef TORRENT_VERBOSE_LOGGING
1094 (*m_logger
) << time_now_string() << " <== UNCHOKE\n";
1096 m_peer_choked
= false;
1097 if (is_disconnecting()) return;
1099 t
->get_policy().unchoked(*this);
1102 // -----------------------------
1103 // -------- INTERESTED ---------
1104 // -----------------------------
1106 void peer_connection::incoming_interested()
1110 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1113 #ifndef TORRENT_DISABLE_EXTENSIONS
1114 for (extension_list_t::iterator i
= m_extensions
.begin()
1115 , end(m_extensions
.end()); i
!= end
; ++i
)
1117 if ((*i
)->on_interested()) return;
1121 #ifdef TORRENT_VERBOSE_LOGGING
1122 (*m_logger
) << time_now_string() << " <== INTERESTED\n";
1124 m_peer_interested
= true;
1125 if (is_disconnecting()) return;
1126 t
->get_policy().interested(*this);
1129 // -----------------------------
1130 // ------ NOT INTERESTED -------
1131 // -----------------------------
1133 void peer_connection::incoming_not_interested()
1137 #ifndef TORRENT_DISABLE_EXTENSIONS
1138 for (extension_list_t::iterator i
= m_extensions
.begin()
1139 , end(m_extensions
.end()); i
!= end
; ++i
)
1141 if ((*i
)->on_not_interested()) return;
1145 m_became_uninterested
= time_now();
1147 #ifdef TORRENT_VERBOSE_LOGGING
1148 (*m_logger
) << time_now_string() << " <== NOT_INTERESTED\n";
1150 m_peer_interested
= false;
1151 if (is_disconnecting()) return;
1153 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1156 t
->get_policy().not_interested(*this);
1159 // -----------------------------
1160 // ----------- HAVE ------------
1161 // -----------------------------
1163 void peer_connection::incoming_have(int index
)
1167 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1170 #ifndef TORRENT_DISABLE_EXTENSIONS
1171 for (extension_list_t::iterator i
= m_extensions
.begin()
1172 , end(m_extensions
.end()); i
!= end
; ++i
)
1174 if ((*i
)->on_have(index
)) return;
1178 if (is_disconnecting()) return;
1180 // if we haven't received a bitfield, it was
1181 // probably omitted, which is the same as 'have_none'
1182 if (!m_bitfield_received
) incoming_have_none();
1184 #ifdef TORRENT_VERBOSE_LOGGING
1185 (*m_logger
) << time_now_string()
1186 << " <== HAVE [ piece: " << index
<< "]\n";
1189 if (is_disconnecting()) return;
1191 if (!t
->valid_metadata() && index
> int(m_have_piece
.size()))
1195 // if we don't have metadata
1196 // and we might not have received a bitfield
1197 // extend the bitmask to fit the new
1199 m_have_piece
.resize(index
+ 1, false);
1203 // unless the index > 64k, in which case
1204 // we just ignore it
1209 // if we got an invalid message, abort
1210 if (index
>= int(m_have_piece
.size()) || index
< 0)
1212 disconnect("got 'have'-message with higher index than the number of pieces", 2);
1216 if (m_have_piece
[index
])
1218 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1219 (*m_logger
) << " got redundant HAVE message for index: " << index
<< "\n";
1224 m_have_piece
.set_bit(index
);
1227 // only update the piece_picker if
1228 // we have the metadata and if
1229 // we're not a seed (in which case
1230 // we won't have a piece picker)
1231 if (t
->valid_metadata())
1235 if (!t
->have_piece(index
)
1237 && !is_interesting()
1238 && t
->picker().piece_priority(index
) != 0)
1239 t
->get_policy().peer_is_interesting(*this);
1241 // this will disregard all have messages we get within
1242 // the first two seconds. Since some clients implements
1243 // lazy bitfields, these will not be reliable to use
1244 // for an estimated peer download rate.
1245 if (!peer_info_struct() || time_now() - peer_info_struct()->connected
> seconds(2))
1247 // update bytes downloaded since last timer
1248 m_remote_bytes_dled
+= t
->torrent_file().piece_size(index
);
1254 m_peer_info
->seed
= true;
1255 m_upload_only
= true;
1256 disconnect_if_redundant();
1257 if (is_disconnecting()) return;
1262 // -----------------------------
1263 // --------- BITFIELD ----------
1264 // -----------------------------
1266 void peer_connection::incoming_bitfield(bitfield
const& bits
)
1270 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1273 #ifndef TORRENT_DISABLE_EXTENSIONS
1274 for (extension_list_t::iterator i
= m_extensions
.begin()
1275 , end(m_extensions
.end()); i
!= end
; ++i
)
1277 if ((*i
)->on_bitfield(bits
)) return;
1281 if (is_disconnecting()) return;
1283 #ifdef TORRENT_VERBOSE_LOGGING
1284 (*m_logger
) << time_now_string() << " <== BITFIELD ";
1286 for (int i
= 0; i
< int(bits
.size()); ++i
)
1288 if (bits
[i
]) (*m_logger
) << "1";
1289 else (*m_logger
) << "0";
1291 (*m_logger
) << "\n";
1294 // if we don't have the metedata, we cannot
1295 // verify the bitfield size
1296 if (t
->valid_metadata()
1297 && (bits
.size() + 7) / 8 != (m_have_piece
.size() + 7) / 8)
1299 std::stringstream msg
;
1300 msg
<< "got bitfield with invalid size: " << ((bits
.size() + 7) / 8)
1301 << "bytes. expected: " << ((m_have_piece
.size() + 7) / 8)
1303 disconnect(msg
.str().c_str(), 2);
1307 m_bitfield_received
= true;
1309 // if we don't have metadata yet
1310 // just remember the bitmask
1311 // don't update the piecepicker
1312 // (since it doesn't exist yet)
1313 if (!t
->ready_for_connections())
1315 m_have_piece
= bits
;
1316 m_num_pieces
= bits
.count();
1317 if (m_peer_info
) m_peer_info
->seed
= (m_num_pieces
== int(bits
.size()));
1321 TORRENT_ASSERT(t
->valid_metadata());
1323 int num_pieces
= bits
.count();
1324 if (num_pieces
== int(m_have_piece
.size()))
1326 #ifdef TORRENT_VERBOSE_LOGGING
1327 (*m_logger
) << " *** THIS IS A SEED ***\n";
1329 // if this is a web seed. we don't have a peer_info struct
1330 if (m_peer_info
) m_peer_info
->seed
= true;
1331 m_upload_only
= true;
1333 m_have_piece
.set_all();
1334 m_num_pieces
= num_pieces
;
1336 if (!t
->is_finished())
1337 t
->get_policy().peer_is_interesting(*this);
1339 disconnect_if_redundant();
1344 // let the torrent know which pieces the
1346 // if we're a seed, we don't keep track of piece availability
1347 bool interesting
= false;
1352 for (int i
= 0; i
< (int)m_have_piece
.size(); ++i
)
1354 bool have
= bits
[i
];
1355 if (have
&& !m_have_piece
[i
])
1357 if (!t
->have_piece(i
) && t
->picker().piece_priority(i
) != 0)
1360 else if (!have
&& m_have_piece
[i
])
1362 // this should probably not be allowed
1368 m_have_piece
= bits
;
1369 m_num_pieces
= num_pieces
;
1371 if (interesting
) t
->get_policy().peer_is_interesting(*this);
1372 else if (upload_only()) disconnect("upload to upload connections");
1375 void peer_connection::disconnect_if_redundant()
1377 if (!m_ses
.settings().close_redundant_connections
) return;
1379 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1381 if (m_upload_only
&& t
->is_finished())
1382 disconnect("seed to seed");
1386 && m_bitfield_received
1387 && t
->are_files_checked())
1388 disconnect("uninteresting upload-only peer");
1391 // -----------------------------
1392 // ---------- REQUEST ----------
1393 // -----------------------------
1395 void peer_connection::incoming_request(peer_request
const& r
)
1399 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1402 // if we haven't received a bitfield, it was
1403 // probably omitted, which is the same as 'have_none'
1404 if (!m_bitfield_received
) incoming_have_none();
1405 if (is_disconnecting()) return;
1407 #ifndef TORRENT_DISABLE_EXTENSIONS
1408 for (extension_list_t::iterator i
= m_extensions
.begin()
1409 , end(m_extensions
.end()); i
!= end
; ++i
)
1411 if ((*i
)->on_request(r
)) return;
1414 if (is_disconnecting()) return;
1416 if (!t
->valid_metadata())
1418 // if we don't have valid metadata yet,
1419 // we shouldn't get a request
1420 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1421 (*m_logger
) << time_now_string()
1422 << " <== UNEXPECTED_REQUEST [ "
1423 "piece: " << r
.piece
<< " | "
1424 "s: " << r
.start
<< " | "
1425 "l: " << r
.length
<< " | "
1426 "i: " << m_peer_interested
<< " | "
1427 "t: " << t
->torrent_file().piece_size(r
.piece
) << " | "
1428 "n: " << t
->torrent_file().num_pieces() << " ]\n";
1430 (*m_logger
) << time_now_string()
1431 << " ==> REJECT_PIECE [ "
1432 "piece: " << r
.piece
<< " | "
1433 "s: " << r
.start
<< " | "
1434 "l: " << r
.length
<< " ]\n";
1436 write_reject_request(r
);
1440 if (int(m_requests
.size()) > m_ses
.settings().max_allowed_in_request_queue
)
1442 // don't allow clients to abuse our
1443 // memory consumption.
1444 // ignore requests if the client
1445 // is making too many of them.
1446 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1447 (*m_logger
) << time_now_string()
1448 << " <== TOO MANY REQUESTS [ "
1449 "piece: " << r
.piece
<< " | "
1450 "s: " << r
.start
<< " | "
1451 "l: " << r
.length
<< " | "
1452 "i: " << m_peer_interested
<< " | "
1453 "t: " << t
->torrent_file().piece_size(r
.piece
) << " | "
1454 "n: " << t
->torrent_file().num_pieces() << " ]\n";
1456 (*m_logger
) << time_now_string()
1457 << " ==> REJECT_PIECE [ "
1458 "piece: " << r
.piece
<< " | "
1459 "s: " << r
.start
<< " | "
1460 "l: " << r
.length
<< " ]\n";
1462 write_reject_request(r
);
1466 // make sure this request
1467 // is legal and that the peer
1470 && r
.piece
< t
->torrent_file().num_pieces()
1471 && t
->have_piece(r
.piece
)
1473 && r
.start
< t
->torrent_file().piece_size(r
.piece
)
1475 && r
.length
+ r
.start
<= t
->torrent_file().piece_size(r
.piece
)
1476 && m_peer_interested
1477 && r
.length
<= t
->block_size())
1479 #ifdef TORRENT_VERBOSE_LOGGING
1480 (*m_logger
) << time_now_string()
1481 << " <== REQUEST [ piece: " << r
.piece
<< " | s: " << r
.start
<< " | l: " << r
.length
<< " ]\n";
1483 // if we have choked the client
1484 // ignore the request
1485 if (m_choked
&& m_accept_fast
.find(r
.piece
) == m_accept_fast
.end())
1487 write_reject_request(r
);
1488 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1489 (*m_logger
) << time_now_string()
1490 << " *** REJECTING REQUEST [ peer choked and piece not in allowed fast set ]\n";
1491 (*m_logger
) << time_now_string()
1492 << " ==> REJECT_PIECE [ "
1493 "piece: " << r
.piece
<< " | "
1494 "s: " << r
.start
<< " | "
1495 "l: " << r
.length
<< " ]\n";
1500 m_requests
.push_back(r
);
1501 m_last_incoming_request
= time_now();
1507 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1508 (*m_logger
) << time_now_string()
1509 << " <== INVALID_REQUEST [ "
1510 "piece: " << r
.piece
<< " | "
1511 "s: " << r
.start
<< " | "
1512 "l: " << r
.length
<< " | "
1513 "i: " << m_peer_interested
<< " | "
1514 "t: " << t
->torrent_file().piece_size(r
.piece
) << " | "
1515 "n: " << t
->torrent_file().num_pieces() << " | "
1516 "h: " << t
->have_piece(r
.piece
) << " | "
1517 "block_limit: " << t
->block_size() << " ]\n";
1519 (*m_logger
) << time_now_string()
1520 << " ==> REJECT_PIECE [ "
1521 "piece: " << r
.piece
<< " | "
1522 "s: " << r
.start
<< " | "
1523 "l: " << r
.length
<< " ]\n";
1526 write_reject_request(r
);
1527 ++m_num_invalid_requests
;
1529 if (t
->alerts().should_post
<invalid_request_alert
>())
1531 t
->alerts().post_alert(invalid_request_alert(
1532 t
->get_handle(), m_remote
, m_peer_id
, r
));
1537 void peer_connection::incoming_piece_fragment()
1539 m_last_piece
= time_now();
1543 struct check_postcondition
1545 check_postcondition(boost::shared_ptr
<torrent
> const& t_
1546 , bool init_check
= true): t(t_
) { if (init_check
) check(); }
1548 ~check_postcondition() { check(); }
1554 const int blocks_per_piece
= static_cast<int>(
1555 t
->torrent_file().piece_length() / t
->block_size());
1557 std::vector
<piece_picker::downloading_piece
> const& dl_queue
1558 = t
->picker().get_download_queue();
1560 for (std::vector
<piece_picker::downloading_piece
>::const_iterator i
=
1561 dl_queue
.begin(); i
!= dl_queue
.end(); ++i
)
1563 TORRENT_ASSERT(i
->finished
<= blocks_per_piece
);
1568 shared_ptr
<torrent
> t
;
1573 // -----------------------------
1574 // ----------- PIECE -----------
1575 // -----------------------------
1577 void peer_connection::incoming_piece(peer_request
const& p
, char const* data
)
1579 char* buffer
= m_ses
.allocate_disk_buffer();
1582 disconnect("out of memory");
1585 disk_buffer_holder
holder(m_ses
, buffer
);
1586 std::memcpy(buffer
, data
, p
.length
);
1587 incoming_piece(p
, holder
);
1590 void peer_connection::incoming_piece(peer_request
const& p
, disk_buffer_holder
& data
)
1594 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1597 TORRENT_ASSERT(!m_disk_recv_buffer
);
1598 TORRENT_ASSERT(m_disk_recv_buffer_size
== 0);
1600 #ifdef TORRENT_CORRUPT_DATA
1601 // corrupt all pieces from certain peers
1602 if (m_remote
.address().is_v4()
1603 && (m_remote
.address().to_v4().to_ulong() & 0xf) == 0)
1605 data
.get()[0] = ~data
.get()[0];
1609 // if we haven't received a bitfield, it was
1610 // probably omitted, which is the same as 'have_none'
1611 if (!m_bitfield_received
) incoming_have_none();
1612 if (is_disconnecting()) return;
1614 #ifndef TORRENT_DISABLE_EXTENSIONS
1615 for (extension_list_t::iterator i
= m_extensions
.begin()
1616 , end(m_extensions
.end()); i
!= end
; ++i
)
1618 if ((*i
)->on_piece(p
, data
)) return;
1621 if (is_disconnecting()) return;
1624 check_postcondition
post_checker_(t
);
1625 #if !defined TORRENT_DISABLE_INVARIANT_CHECKS && defined TORRENT_EXPENSIVE_INVARIANT_CHECKS
1626 t
->check_invariant();
1630 #ifdef TORRENT_VERBOSE_LOGGING
1631 (*m_logger
) << time_now_string()
1632 << " <== PIECE [ piece: " << p
.piece
<< " | "
1633 "s: " << p
.start
<< " | "
1634 "l: " << p
.length
<< " | "
1635 "ds: " << statistics().download_rate() << " | "
1636 "qs: " << int(m_desired_queue_size
) << " ]\n";
1641 if (t
->alerts().should_post
<peer_error_alert
>())
1643 t
->alerts().post_alert(peer_error_alert(t
->get_handle(), m_remote
1644 , m_peer_id
, "peer sent 0 length piece"));
1649 if (!verify_piece(p
))
1651 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1652 (*m_logger
) << time_now_string()
1653 << " <== INVALID_PIECE [ piece: " << p
.piece
<< " | "
1654 "start: " << p
.start
<< " | "
1655 "length: " << p
.length
<< " ]\n";
1657 disconnect("got invalid piece packet", 2);
1661 // if we're already seeding, don't bother,
1665 t
->add_redundant_bytes(p
.length
);
1669 ptime now
= time_now();
1671 piece_picker
& picker
= t
->picker();
1672 piece_manager
& fs
= t
->filesystem();
1674 std::vector
<piece_block
> finished_blocks
;
1675 piece_block
block_finished(p
.piece
, p
.start
/ t
->block_size());
1676 TORRENT_ASSERT(p
.start
% t
->block_size() == 0);
1677 TORRENT_ASSERT(p
.length
== t
->block_size()
1678 || p
.length
== t
->torrent_file().total_size() % t
->block_size());
1680 std::deque
<pending_block
>::iterator b
1682 m_download_queue
.begin()
1683 , m_download_queue
.end()
1684 , has_block(block_finished
));
1686 if (b
== m_download_queue
.end())
1688 if (t
->alerts().should_post
<unwanted_block_alert
>())
1690 t
->alerts().post_alert(unwanted_block_alert(t
->get_handle(), m_remote
1691 , m_peer_id
, block_finished
.block_index
, block_finished
.piece_index
));
1693 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1694 (*m_logger
) << " *** The block we just got was not in the "
1695 "request queue ***\n";
1697 t
->add_redundant_bytes(p
.length
);
1698 request_a_block(*t
, *this);
1699 send_block_requests();
1703 pending_block pending_b
= *b
;
1706 int block_index
= b
- m_download_queue
.begin() - 1;
1707 for (int i
= 0; i
< block_index
; ++i
)
1709 pending_block
& qe
= m_download_queue
[i
];
1711 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1712 (*m_logger
) << time_now_string()
1713 << " *** SKIPPED_PIECE [ piece: " << qe
.block
.piece_index
<< " | "
1714 "b: " << qe
.block
.block_index
<< " ] ***\n";
1718 // if the number of times a block is skipped by out of order
1719 // blocks exceeds the size of the outstanding queue, assume that
1720 // the other end dropped the request.
1721 if (qe
.skipped
> m_desired_queue_size
)
1723 if (m_ses
.m_alerts
.should_post
<request_dropped_alert
>())
1724 m_ses
.m_alerts
.post_alert(request_dropped_alert(t
->get_handle()
1725 , remote(), pid(), qe
.block
.block_index
, qe
.block
.piece_index
));
1726 picker
.abort_download(qe
.block
);
1727 TORRENT_ASSERT(m_download_queue
.begin() + i
!= b
);
1728 m_download_queue
.erase(m_download_queue
.begin() + i
);
1733 TORRENT_ASSERT(int(m_download_queue
.size()) > block_index
+ 1);
1734 b
= m_download_queue
.begin() + (block_index
+ 1);
1735 TORRENT_ASSERT(b
->block
== pending_b
.block
);
1737 // if the block we got is already finished, then ignore it
1738 if (picker
.is_downloaded(block_finished
))
1740 t
->add_redundant_bytes(p
.length
);
1742 m_download_queue
.erase(b
);
1743 m_timeout_extend
= 0;
1745 if (!m_download_queue
.empty())
1748 request_a_block(*t
, *this);
1749 send_block_requests();
1753 if (total_seconds(now
- m_requested
)
1754 < m_ses
.settings().request_timeout
1758 if (m_ses
.m_alerts
.should_post
<peer_unsnubbed_alert
>())
1760 m_ses
.m_alerts
.post_alert(peer_unsnubbed_alert(t
->get_handle()
1761 , m_remote
, m_peer_id
));
1765 fs
.async_write(p
, data
, bind(&peer_connection::on_disk_write_complete
1766 , self(), _1
, _2
, p
, t
));
1767 m_outstanding_writing_bytes
+= p
.length
;
1768 TORRENT_ASSERT(m_channel_state
[download_channel
] == peer_info::bw_idle
);
1769 m_download_queue
.erase(b
);
1771 if (m_outstanding_writing_bytes
>= m_ses
.settings().max_outstanding_disk_bytes_per_connection
1772 && t
->alerts().should_post
<performance_alert
>())
1774 t
->alerts().post_alert(performance_alert(t
->get_handle()
1775 , performance_alert::outstanding_disk_buffer_limit_reached
));
1778 if (!m_download_queue
.empty())
1780 m_timeout_extend
= (std::max
)(m_timeout_extend
1781 - m_ses
.settings().request_timeout
, 0);
1782 m_requested
+= seconds(m_ses
.settings().request_timeout
);
1783 if (m_requested
> now
) m_requested
= now
;
1787 m_timeout_extend
= 0;
1790 // did we request this block from any other peers?
1791 bool multi
= picker
.num_peers(block_finished
) > 1;
1792 picker
.mark_as_writing(block_finished
, peer_info_struct());
1794 TORRENT_ASSERT(picker
.num_peers(block_finished
) == 0);
1795 // if we requested this block from other peers, cancel it now
1796 if (multi
) t
->cancel_block(block_finished
);
1798 TORRENT_ASSERT(picker
.num_peers(block_finished
) == 0);
1800 #if !defined NDEBUG && !defined TORRENT_DISABLE_INVARIANT_CHECKS \
1801 && defined TORRENT_EXPENSIVE_INVARIANT_CHECKS
1802 t
->check_invariant();
1804 request_a_block(*t
, *this);
1805 send_block_requests();
1808 void peer_connection::on_disk_write_complete(int ret
, disk_io_job
const& j
1809 , peer_request p
, boost::shared_ptr
<torrent
> t
)
1811 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
1815 m_outstanding_writing_bytes
-= p
.length
;
1816 TORRENT_ASSERT(m_outstanding_writing_bytes
>= 0);
1818 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
1819 // (*m_ses.m_logger) << time_now_string() << " *** DISK_WRITE_COMPLETE [ p: "
1820 // << p.piece << " o: " << p.start << " ]\n";
1822 // in case the outstanding bytes just dropped down
1823 // to allow to receive more data
1826 piece_block
block_finished(p
.piece
, p
.start
/ t
->block_size());
1828 if (ret
== -1 || !t
)
1830 if (t
->has_picker()) t
->picker().write_failed(block_finished
);
1834 disconnect(j
.str
.c_str());
1838 if (t
->alerts().should_post
<file_error_alert
>())
1839 t
->alerts().post_alert(file_error_alert(j
.error_file
, t
->get_handle(), j
.str
));
1840 t
->set_error(j
.str
);
1845 if (t
->is_seed()) return;
1847 piece_picker
& picker
= t
->picker();
1849 TORRENT_ASSERT(p
.piece
== j
.piece
);
1850 TORRENT_ASSERT(p
.start
== j
.offset
);
1851 TORRENT_ASSERT(picker
.num_peers(block_finished
) == 0);
1852 picker
.mark_as_finished(block_finished
, peer_info_struct());
1853 if (t
->alerts().should_post
<block_finished_alert
>())
1855 t
->alerts().post_alert(block_finished_alert(t
->get_handle(),
1856 remote(), pid(), block_finished
.block_index
, block_finished
.piece_index
));
1859 // did we just finish the piece?
1860 if (picker
.is_piece_finished(p
.piece
))
1863 check_postcondition
post_checker2_(t
, false);
1865 t
->async_verify_piece(p
.piece
, bind(&torrent::piece_finished
, t
1869 if (!t
->is_seed() && !m_torrent
.expired())
1871 // this is a free function defined in policy.cpp
1872 request_a_block(*t
, *this);
1873 send_block_requests();
1878 // -----------------------------
1879 // ---------- CANCEL -----------
1880 // -----------------------------
1882 void peer_connection::incoming_cancel(peer_request
const& r
)
1886 #ifndef TORRENT_DISABLE_EXTENSIONS
1887 for (extension_list_t::iterator i
= m_extensions
.begin()
1888 , end(m_extensions
.end()); i
!= end
; ++i
)
1890 if ((*i
)->on_cancel(r
)) return;
1893 if (is_disconnecting()) return;
1895 #ifdef TORRENT_VERBOSE_LOGGING
1896 (*m_logger
) << time_now_string()
1897 << " <== CANCEL [ piece: " << r
.piece
<< " | s: " << r
.start
<< " | l: " << r
.length
<< " ]\n";
1900 std::deque
<peer_request
>::iterator i
1901 = std::find(m_requests
.begin(), m_requests
.end(), r
);
1903 if (i
!= m_requests
.end())
1905 m_requests
.erase(i
);
1906 #ifdef TORRENT_VERBOSE_LOGGING
1907 (*m_logger
) << time_now_string()
1908 << " ==> REJECT_PIECE [ "
1909 "piece: " << r
.piece
<< " | "
1910 "s: " << r
.start
<< " | "
1911 "l: " << r
.length
<< " ]\n";
1913 write_reject_request(r
);
1917 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1918 (*m_logger
) << time_now_string() << " *** GOT CANCEL NOT IN THE QUEUE\n";
1923 // -----------------------------
1924 // --------- DHT PORT ----------
1925 // -----------------------------
1927 void peer_connection::incoming_dht_port(int listen_port
)
1931 #ifdef TORRENT_VERBOSE_LOGGING
1932 (*m_logger
) << time_now_string()
1933 << " <== DHT_PORT [ p: " << listen_port
<< " ]\n";
1935 #ifndef TORRENT_DISABLE_DHT
1936 m_ses
.add_dht_node(udp::endpoint(
1937 m_remote
.address(), listen_port
));
1941 // -----------------------------
1942 // --------- HAVE ALL ----------
1943 // -----------------------------
1945 void peer_connection::incoming_have_all()
1949 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1952 #ifdef TORRENT_VERBOSE_LOGGING
1953 (*m_logger
) << time_now_string() << " <== HAVE_ALL\n";
1956 #ifndef TORRENT_DISABLE_EXTENSIONS
1957 for (extension_list_t::iterator i
= m_extensions
.begin()
1958 , end(m_extensions
.end()); i
!= end
; ++i
)
1960 if ((*i
)->on_have_all()) return;
1963 if (is_disconnecting()) return;
1967 if (m_peer_info
) m_peer_info
->seed
= true;
1968 m_upload_only
= true;
1969 m_bitfield_received
= true;
1971 #ifdef TORRENT_VERBOSE_LOGGING
1972 (*m_logger
) << " *** THIS IS A SEED ***\n";
1975 // if we don't have metadata yet
1976 // just remember the bitmask
1977 // don't update the piecepicker
1978 // (since it doesn't exist yet)
1979 if (!t
->ready_for_connections())
1981 // assume seeds are interesting when we
1982 // don't even have the metadata
1983 t
->get_policy().peer_is_interesting(*this);
1985 disconnect_if_redundant();
1986 // TODO: this might need something more
1987 // so that once we have the metadata
1988 // we can construct a full bitfield
1992 TORRENT_ASSERT(!m_have_piece
.empty());
1993 m_have_piece
.set_all();
1994 m_num_pieces
= m_have_piece
.size();
1998 // if we're finished, we're not interested
1999 if (t
->is_finished()) send_not_interested();
2000 else t
->get_policy().peer_is_interesting(*this);
2002 disconnect_if_redundant();
2005 // -----------------------------
2006 // --------- HAVE NONE ---------
2007 // -----------------------------
2009 void peer_connection::incoming_have_none()
2013 #ifdef TORRENT_VERBOSE_LOGGING
2014 (*m_logger
) << time_now_string() << " <== HAVE_NONE\n";
2017 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2020 #ifndef TORRENT_DISABLE_EXTENSIONS
2021 for (extension_list_t::iterator i
= m_extensions
.begin()
2022 , end(m_extensions
.end()); i
!= end
; ++i
)
2024 if ((*i
)->on_have_none()) return;
2027 if (is_disconnecting()) return;
2028 if (m_peer_info
) m_peer_info
->seed
= false;
2029 m_bitfield_received
= true;
2031 // we're never interested in a peer that doesn't have anything
2032 send_not_interested();
2034 TORRENT_ASSERT(!m_have_piece
.empty() || !t
->ready_for_connections());
2035 disconnect_if_redundant();
2038 // -----------------------------
2039 // ------- ALLOWED FAST --------
2040 // -----------------------------
2042 void peer_connection::incoming_allowed_fast(int index
)
2046 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2049 #ifdef TORRENT_VERBOSE_LOGGING
2050 (*m_logger
) << time_now_string() << " <== ALLOWED_FAST [ " << index
<< " ]\n";
2053 #ifndef TORRENT_DISABLE_EXTENSIONS
2054 for (extension_list_t::iterator i
= m_extensions
.begin()
2055 , end(m_extensions
.end()); i
!= end
; ++i
)
2057 if ((*i
)->on_allowed_fast(index
)) return;
2060 if (is_disconnecting()) return;
2062 if (t
->valid_metadata())
2064 if (index
< 0 || index
>= int(m_have_piece
.size()))
2066 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2067 (*m_logger
) << time_now_string() << " <== INVALID_ALLOWED_FAST [ " << index
<< " | s: "
2068 << int(m_have_piece
.size()) << " ]\n";
2073 // if we already have the piece, we can
2074 // ignore this message
2075 if (t
->have_piece(index
))
2079 m_allowed_fast
.push_back(index
);
2081 // if the peer has the piece and we want
2082 // to download it, request it
2083 if (int(m_have_piece
.size()) > index
2084 && m_have_piece
[index
]
2085 && t
->valid_metadata()
2087 && t
->picker().piece_priority(index
) > 0)
2089 t
->get_policy().peer_is_interesting(*this);
2093 std::vector
<int> const& peer_connection::allowed_fast()
2095 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2098 m_allowed_fast
.erase(std::remove_if(m_allowed_fast
.begin()
2099 , m_allowed_fast
.end(), bind(&torrent::have_piece
, t
, _1
))
2100 , m_allowed_fast
.end());
2102 // TODO: sort the allowed fast set in priority order
2103 return m_allowed_fast
;
2106 void peer_connection::add_request(piece_block
const& block
)
2110 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2113 TORRENT_ASSERT(t
->valid_metadata());
2114 TORRENT_ASSERT(block
.piece_index
>= 0);
2115 TORRENT_ASSERT(block
.piece_index
< t
->torrent_file().num_pieces());
2116 TORRENT_ASSERT(block
.block_index
>= 0);
2117 TORRENT_ASSERT(block
.block_index
< t
->torrent_file().piece_size(block
.piece_index
));
2118 TORRENT_ASSERT(!t
->picker().is_requested(block
) || (t
->picker().num_peers(block
) > 0));
2119 TORRENT_ASSERT(!t
->have_piece(block
.piece_index
));
2120 TORRENT_ASSERT(std::find_if(m_download_queue
.begin(), m_download_queue
.end()
2121 , has_block(block
)) == m_download_queue
.end());
2122 TORRENT_ASSERT(std::find(m_request_queue
.begin(), m_request_queue
.end()
2123 , block
) == m_request_queue
.end());
2125 piece_picker::piece_state_t state
;
2126 peer_speed_t speed
= peer_speed();
2127 char const* speedmsg
= 0;
2131 state
= piece_picker::fast
;
2133 else if (speed
== medium
)
2135 speedmsg
= "medium";
2136 state
= piece_picker::medium
;
2141 state
= piece_picker::slow
;
2144 if (!t
->picker().mark_as_downloading(block
, peer_info_struct(), state
))
2147 if (t
->alerts().should_post
<block_downloading_alert
>())
2149 t
->alerts().post_alert(block_downloading_alert(t
->get_handle(),
2150 remote(), pid(), speedmsg
, block
.block_index
, block
.piece_index
));
2153 m_request_queue
.push_back(block
);
2156 void peer_connection::cancel_request(piece_block
const& block
)
2160 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2161 // this peer might be disconnecting
2164 TORRENT_ASSERT(t
->valid_metadata());
2166 TORRENT_ASSERT(block
.piece_index
>= 0);
2167 TORRENT_ASSERT(block
.piece_index
< t
->torrent_file().num_pieces());
2168 TORRENT_ASSERT(block
.block_index
>= 0);
2169 TORRENT_ASSERT(block
.block_index
< t
->torrent_file().piece_size(block
.piece_index
));
2171 // if all the peers that requested this block has been
2172 // cancelled, then just ignore the cancel.
2173 if (!t
->picker().is_requested(block
)) return;
2175 std::deque
<pending_block
>::iterator it
2176 = std::find_if(m_download_queue
.begin(), m_download_queue
.end(), has_block(block
));
2177 if (it
== m_download_queue
.end())
2179 std::deque
<piece_block
>::iterator rit
= std::find(m_request_queue
.begin()
2180 , m_request_queue
.end(), block
);
2182 // when a multi block is received, it is cancelled
2183 // from all peers, so if this one hasn't requested
2184 // the block, just ignore to cancel it.
2185 if (rit
== m_request_queue
.end()) return;
2187 t
->picker().abort_download(block
);
2188 m_request_queue
.erase(rit
);
2189 // since we found it in the request queue, it means it hasn't been
2190 // sent yet, so we don't have to send a cancel.
2194 int block_offset
= block
.block_index
* t
->block_size();
2196 = (std::min
)(t
->torrent_file().piece_size(block
.piece_index
)-block_offset
,
2198 TORRENT_ASSERT(block_size
> 0);
2199 TORRENT_ASSERT(block_size
<= t
->block_size());
2202 r
.piece
= block
.piece_index
;
2203 r
.start
= block_offset
;
2204 r
.length
= block_size
;
2206 #ifdef TORRENT_VERBOSE_LOGGING
2207 (*m_logger
) << time_now_string()
2208 << " ==> CANCEL [ piece: " << block
.piece_index
<< " | s: "
2209 << block_offset
<< " | l: " << block_size
<< " | " << block
.block_index
<< " ]\n";
2214 void peer_connection::send_choke()
2218 TORRENT_ASSERT(!m_peer_info
|| !m_peer_info
->optimistically_unchoked
);
2220 if (m_choked
) return;
2224 #ifdef TORRENT_VERBOSE_LOGGING
2225 (*m_logger
) << time_now_string() << " ==> CHOKE\n";
2228 m_last_choke
= time_now();
2230 m_num_invalid_requests
= 0;
2232 // reject the requests we have in the queue
2233 // except the allowed fast pieces
2234 for (std::deque
<peer_request
>::iterator i
= m_requests
.begin();
2235 i
!= m_requests
.end();)
2237 if (m_accept_fast
.count(i
->piece
))
2243 peer_request
const& r
= *i
;
2244 write_reject_request(r
);
2246 #ifdef TORRENT_VERBOSE_LOGGING
2247 (*m_logger
) << time_now_string()
2248 << " ==> REJECT_PIECE [ "
2249 "piece: " << r
.piece
<< " | "
2250 "s: " << r
.start
<< " | "
2251 "l: " << r
.length
<< " ]\n";
2253 i
= m_requests
.erase(i
);
2257 bool peer_connection::send_unchoke()
2261 if (!m_choked
) return false;
2262 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2263 if (!t
->ready_for_connections()) return false;
2264 m_last_unchoke
= time_now();
2268 #ifdef TORRENT_VERBOSE_LOGGING
2269 (*m_logger
) << time_now_string() << " ==> UNCHOKE\n";
2274 void peer_connection::send_interested()
2276 if (m_interesting
) return;
2277 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2278 if (!t
->ready_for_connections()) return;
2279 m_interesting
= true;
2282 #ifdef TORRENT_VERBOSE_LOGGING
2283 (*m_logger
) << time_now_string() << " ==> INTERESTED\n";
2287 void peer_connection::send_not_interested()
2289 if (!m_interesting
) return;
2290 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2291 if (!t
->ready_for_connections()) return;
2292 m_interesting
= false;
2293 write_not_interested();
2295 m_became_uninteresting
= time_now();
2297 #ifdef TORRENT_VERBOSE_LOGGING
2298 (*m_logger
) << time_now_string() << " ==> NOT_INTERESTED\n";
2300 disconnect_if_redundant();
2303 void peer_connection::send_block_requests()
2307 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2310 if ((int)m_download_queue
.size() >= m_desired_queue_size
) return;
2312 bool empty_download_queue
= m_download_queue
.empty();
2314 while (!m_request_queue
.empty()
2315 && (int)m_download_queue
.size() < m_desired_queue_size
)
2317 piece_block block
= m_request_queue
.front();
2319 int block_offset
= block
.block_index
* t
->block_size();
2320 int block_size
= (std::min
)(t
->torrent_file().piece_size(
2321 block
.piece_index
) - block_offset
, t
->block_size());
2322 TORRENT_ASSERT(block_size
> 0);
2323 TORRENT_ASSERT(block_size
<= t
->block_size());
2326 r
.piece
= block
.piece_index
;
2327 r
.start
= block_offset
;
2328 r
.length
= block_size
;
2330 m_request_queue
.pop_front();
2331 if (t
->is_seed()) continue;
2332 // this can happen if a block times out, is re-requested and
2333 // then arrives "unexpectedly"
2334 if (t
->picker().is_finished(block
) || t
->picker().is_downloaded(block
))
2337 m_download_queue
.push_back(block
);
2339 #ifdef TORRENT_VERBOSE_LOGGING
2340 (*m_logger) << time_now_string()
2341 << " *** REQUEST-QUEUE** [ "
2342 "piece: " << block.piece_index << " | "
2343 "block: " << block.block_index << " ]\n";
2346 // if we are requesting large blocks, merge the smaller
2347 // blocks that are in the same piece into larger requests
2348 if (m_request_large_blocks
)
2350 int blocks_per_piece
= t
->torrent_file().piece_length() / t
->block_size();
2352 while (!m_request_queue
.empty())
2354 // check to see if this block is connected to the previous one
2355 // if it is, merge them, otherwise, break this merge loop
2356 piece_block
const& front
= m_request_queue
.front();
2357 if (front
.piece_index
* blocks_per_piece
+ front
.block_index
2358 != block
.piece_index
* blocks_per_piece
+ block
.block_index
+ 1)
2360 block
= m_request_queue
.front();
2361 m_request_queue
.pop_front();
2362 m_download_queue
.push_back(block
);
2364 #ifdef TORRENT_VERBOSE_LOGGING
2365 (*m_logger
) << time_now_string()
2366 << " *** MERGING REQUEST ** [ "
2367 "piece: " << block
.piece_index
<< " | "
2368 "block: " << block
.block_index
<< " ]\n";
2371 block_offset
= block
.block_index
* t
->block_size();
2372 block_size
= (std::min
)(t
->torrent_file().piece_size(
2373 block
.piece_index
) - block_offset
, t
->block_size());
2374 TORRENT_ASSERT(block_size
> 0);
2375 TORRENT_ASSERT(block_size
<= t
->block_size());
2377 r
.length
+= block_size
;
2381 TORRENT_ASSERT(verify_piece(r
));
2383 #ifndef TORRENT_DISABLE_EXTENSIONS
2384 bool handled
= false;
2385 for (extension_list_t::iterator i
= m_extensions
.begin()
2386 , end(m_extensions
.end()); i
!= end
; ++i
)
2388 if (handled
= (*i
)->write_request(r
)) break;
2390 if (is_disconnecting()) return;
2394 m_last_request
= time_now();
2398 m_last_request
= time_now();
2401 #ifdef TORRENT_VERBOSE_LOGGING
2402 (*m_logger
) << time_now_string()
2403 << " ==> REQUEST [ "
2404 "piece: " << r
.piece
<< " | "
2405 "s: " << r
.start
<< " | "
2406 "l: " << r
.length
<< " | "
2407 "ds: " << statistics().download_rate() << " B/s | "
2408 "qs: " << int(m_desired_queue_size
) << " "
2409 "blk: " << (m_request_large_blocks
?"large":"single") << " ]\n";
2412 m_last_piece
= time_now();
2414 if (!m_download_queue
.empty()
2415 && empty_download_queue
)
2417 // This means we just added a request to this connection
2418 m_requested
= time_now();
2422 void peer_connection::timed_out()
2424 TORRENT_ASSERT(m_connecting
);
2425 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
2426 (*m_ses
.m_logger
) << time_now_string() << " CONNECTION TIMED OUT: " << m_remote
.address().to_string()
2429 disconnect("timed out: connect", 1);
2432 // the error argument defaults to 0, which means deliberate disconnect
2433 // 1 means unexpected disconnect/error
2434 // 2 protocol error (client sent something invalid)
2435 void peer_connection::disconnect(char const* message
, int error
)
2437 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
2440 m_disconnect_started
= true;
2443 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2447 (*m_logger
) << "*** CONNECTION CLOSED " << message
<< "\n";
2450 (*m_logger
) << "*** CONNECTION FAILED " << message
<< "\n";
2453 (*m_logger
) << "*** PEER ERROR " << message
<< "\n";
2457 // we cannot do this in a constructor
2458 TORRENT_ASSERT(m_in_constructor
== false);
2459 if (error
> 0) m_failed
= true;
2460 if (m_disconnecting
) return;
2461 boost::intrusive_ptr
<peer_connection
> me(this);
2465 if (m_connecting
&& m_connection_ticket
>= 0)
2467 m_ses
.m_half_open
.done(m_connection_ticket
);
2468 m_connection_ticket
= -1;
2471 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2472 torrent_handle handle
;
2473 if (t
) handle
= t
->get_handle();
2477 if (error
> 1 && m_ses
.m_alerts
.should_post
<peer_error_alert
>())
2479 m_ses
.m_alerts
.post_alert(
2480 peer_error_alert(handle
, remote(), pid(), message
));
2482 else if (error
<= 1 && m_ses
.m_alerts
.should_post
<peer_disconnected_alert
>())
2484 m_ses
.m_alerts
.post_alert(
2485 peer_disconnected_alert(handle
, remote(), pid(), message
));
2491 // make sure we keep all the stats!
2493 t
->add_stats(statistics());
2495 if (t
->has_picker())
2497 piece_picker
& picker
= t
->picker();
2499 while (!m_download_queue
.empty())
2501 picker
.abort_download(m_download_queue
.back().block
);
2502 m_download_queue
.pop_back();
2504 while (!m_request_queue
.empty())
2506 picker
.abort_download(m_request_queue
.back());
2507 m_request_queue
.pop_back();
2511 t
->remove_peer(this);
2516 // since this connection doesn't have a torrent reference
2517 // no torrent should have a reference to this connection either
2518 for (aux::session_impl::torrent_map::const_iterator i
= m_ses
.m_torrents
.begin()
2519 , end(m_ses
.m_torrents
.end()); i
!= end
; ++i
)
2520 TORRENT_ASSERT(!i
->second
->has_peer(this));
2523 m_disconnecting
= true;
2525 m_socket
->close(ec
);
2526 m_ses
.close_connection(this, message
);
2529 void peer_connection::set_upload_limit(int limit
)
2531 TORRENT_ASSERT(limit
>= -1);
2532 if (limit
== -1) limit
= (std::numeric_limits
<int>::max
)();
2533 if (limit
< 10) limit
= 10;
2534 m_upload_limit
= limit
;
2535 m_bandwidth_limit
[upload_channel
].throttle(m_upload_limit
);
2538 void peer_connection::set_download_limit(int limit
)
2540 TORRENT_ASSERT(limit
>= -1);
2541 if (limit
== -1) limit
= (std::numeric_limits
<int>::max
)();
2542 if (limit
< 10) limit
= 10;
2543 m_download_limit
= limit
;
2544 m_bandwidth_limit
[download_channel
].throttle(m_download_limit
);
2547 size_type
peer_connection::share_diff() const
2551 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2554 float ratio
= t
->ratio();
2556 // if we have an infinite ratio, just say we have downloaded
2557 // much more than we have uploaded. And we'll keep uploading.
2559 return (std::numeric_limits
<size_type
>::max
)();
2561 return m_free_upload
2562 + static_cast<size_type
>(m_statistics
.total_payload_download() * ratio
)
2563 - m_statistics
.total_payload_upload();
2566 // defined in upnp.cpp
2567 bool is_local(address
const& a
);
2569 bool peer_connection::on_local_network() const
2571 if (libtorrent::is_local(m_remote
.address())
2572 || is_loopback(m_remote
.address())) return true;
2576 void peer_connection::get_peer_info(peer_info
& p
) const
2578 TORRENT_ASSERT(!associated_torrent().expired());
2580 ptime now
= time_now();
2582 p
.download_rate_peak
= m_download_rate_peak
;
2583 p
.upload_rate_peak
= m_upload_rate_peak
;
2585 p
.down_speed
= statistics().download_rate();
2586 p
.up_speed
= statistics().upload_rate();
2587 p
.payload_down_speed
= statistics().download_payload_rate();
2588 p
.payload_up_speed
= statistics().upload_payload_rate();
2591 p
.pending_disk_bytes
= m_outstanding_writing_bytes
;
2592 p
.send_quota
= m_bandwidth_limit
[upload_channel
].quota_left();
2593 p
.receive_quota
= m_bandwidth_limit
[download_channel
].quota_left();
2594 if (m_download_queue
.empty()) p
.request_timeout
= -1;
2595 else p
.request_timeout
= total_seconds(m_requested
- now
) + m_ses
.settings().request_timeout
2597 #ifndef TORRENT_DISABLE_GEO_IP
2598 p
.inet_as_name
= m_inet_as_name
;
2601 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
2602 p
.country
[0] = m_country
[0];
2603 p
.country
[1] = m_country
[1];
2606 p
.total_download
= statistics().total_payload_download();
2607 p
.total_upload
= statistics().total_payload_upload();
2609 if (m_bandwidth_limit
[upload_channel
].throttle() == bandwidth_limit::inf
)
2610 p
.upload_limit
= -1;
2612 p
.upload_limit
= m_bandwidth_limit
[upload_channel
].throttle();
2614 if (m_bandwidth_limit
[download_channel
].throttle() == bandwidth_limit::inf
)
2615 p
.download_limit
= -1;
2617 p
.download_limit
= m_bandwidth_limit
[download_channel
].throttle();
2619 p
.load_balancing
= total_free_upload();
2621 p
.download_queue_length
= int(download_queue().size() + m_request_queue
.size());
2622 p
.requests_in_buffer
= int(m_requests_in_buffer
.size());
2623 p
.target_dl_queue_length
= int(desired_queue_size());
2624 p
.upload_queue_length
= int(upload_queue().size());
2626 if (boost::optional
<piece_block_progress
> ret
= downloading_piece_progress())
2628 p
.downloading_piece_index
= ret
->piece_index
;
2629 p
.downloading_block_index
= ret
->block_index
;
2630 p
.downloading_progress
= ret
->bytes_downloaded
;
2631 p
.downloading_total
= ret
->full_block_bytes
;
2635 p
.downloading_piece_index
= -1;
2636 p
.downloading_block_index
= -1;
2637 p
.downloading_progress
= 0;
2638 p
.downloading_total
= 0;
2641 p
.pieces
= get_bitfield();
2642 p
.last_request
= now
- m_last_request
;
2643 p
.last_active
= now
- (std::max
)(m_last_sent
, m_last_receive
);
2645 // this will set the flags so that we can update them later
2647 get_specific_peer_info(p
);
2649 p
.flags
|= is_seed() ? peer_info::seed
: 0;
2650 p
.flags
|= m_snubbed
? peer_info::snubbed
: 0;
2651 p
.flags
|= m_upload_only
? peer_info::upload_only
: 0;
2652 if (peer_info_struct())
2654 policy::peer
* pi
= peer_info_struct();
2655 p
.source
= pi
->source
;
2656 p
.failcount
= pi
->failcount
;
2657 p
.num_hashfails
= pi
->hashfails
;
2658 p
.flags
|= pi
->on_parole
? peer_info::on_parole
: 0;
2659 p
.flags
|= pi
->optimistically_unchoked
? peer_info::optimistic_unchoke
: 0;
2660 #ifndef TORRENT_DISABLE_GEO_IP
2661 p
.inet_as
= pi
->inet_as
->first
;
2668 p
.num_hashfails
= 0;
2669 p
.remote_dl_rate
= 0;
2670 #ifndef TORRENT_DISABLE_GEO_IP
2675 p
.remote_dl_rate
= m_remote_dl_rate
;
2676 p
.send_buffer_size
= m_send_buffer
.capacity();
2677 p
.used_send_buffer
= m_send_buffer
.size();
2678 p
.receive_buffer_size
= m_recv_buffer
.capacity() + m_disk_recv_buffer_size
;
2679 p
.used_receive_buffer
= m_recv_pos
;
2680 p
.write_state
= m_channel_state
[upload_channel
];
2681 p
.read_state
= m_channel_state
[download_channel
];
2683 p
.progress
= (float)p
.pieces
.count() / (float)p
.pieces
.size();
2686 // allocates a disk buffer of size 'disk_buffer_size' and replaces the
2687 // end of the current receive buffer with it. i.e. the receive pos
2688 // must be <= packet_size - disk_buffer_size
2689 // the disk buffer can be accessed through release_disk_receive_buffer()
2690 // when it is queried, the responsibility to free it is transferred
2692 bool peer_connection::allocate_disk_receive_buffer(int disk_buffer_size
)
2696 TORRENT_ASSERT(m_packet_size
> 0);
2697 TORRENT_ASSERT(m_recv_pos
<= m_packet_size
- disk_buffer_size
);
2698 TORRENT_ASSERT(!m_disk_recv_buffer
);
2699 TORRENT_ASSERT(disk_buffer_size
<= 16 * 1024);
2701 if (disk_buffer_size
== 0) return true;
2703 if (disk_buffer_size
> 16 * 1024)
2705 disconnect("invalid piece size", 2);
2709 m_disk_recv_buffer
.reset(m_ses
.allocate_disk_buffer());
2710 if (!m_disk_recv_buffer
)
2712 disconnect("out of memory");
2715 m_disk_recv_buffer_size
= disk_buffer_size
;
2719 char* peer_connection::release_disk_receive_buffer()
2721 m_disk_recv_buffer_size
= 0;
2722 return m_disk_recv_buffer
.release();
2725 void peer_connection::cut_receive_buffer(int size
, int packet_size
)
2729 TORRENT_ASSERT(packet_size
> 0);
2730 TORRENT_ASSERT(int(m_recv_buffer
.size()) >= size
);
2731 TORRENT_ASSERT(int(m_recv_buffer
.size()) >= m_recv_pos
);
2732 TORRENT_ASSERT(m_recv_pos
>= size
);
2735 std::memmove(&m_recv_buffer
[0], &m_recv_buffer
[0] + size
, m_recv_pos
- size
);
2740 std::fill(m_recv_buffer
.begin() + m_recv_pos
, m_recv_buffer
.end(), 0);
2743 m_packet_size
= packet_size
;
2746 void peer_connection::calc_ip_overhead()
2748 m_statistics
.calc_ip_overhead();
2751 void peer_connection::second_tick(float tick_interval
)
2753 ptime
now(time_now());
2754 boost::intrusive_ptr
<peer_connection
> me(self());
2756 // the invariant check must be run before me is destructed
2757 // in case the peer got disconnected
2760 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2761 if (!t
|| m_disconnecting
)
2763 m_ses
.m_half_open
.done(m_connection_ticket
);
2764 m_connecting
= false;
2765 disconnect("torrent aborted");
2771 #ifndef TORRENT_DISABLE_EXTENSIONS
2772 for (extension_list_t::iterator i
= m_extensions
.begin()
2773 , end(m_extensions
.end()); i
!= end
; ++i
)
2777 if (is_disconnecting()) return;
2780 // if the peer hasn't said a thing for a certain
2781 // time, it is considered to have timed out
2783 d
= now
- m_last_receive
;
2784 if (d
> seconds(m_timeout
) && !m_connecting
)
2786 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2787 (*m_logger
) << time_now_string() << " *** LAST ACTIVITY [ "
2788 << total_seconds(d
) << " seconds ago ] ***\n";
2790 disconnect("timed out: inactivity");
2794 // do not stall waiting for a handshake
2797 && d
> seconds(m_ses
.settings().handshake_timeout
))
2799 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2800 (*m_logger
) << time_now_string() << " *** NO HANDSHAKE [ waited "
2801 << total_seconds(d
) << " seconds ] ***\n";
2803 disconnect("timed out: no handshake");
2807 // disconnect peers that we unchoked, but
2808 // they didn't send a request within 20 seconds.
2809 // but only if we're a seed
2810 d
= now
- (std::max
)(m_last_unchoke
, m_last_incoming_request
);
2812 && m_requests
.empty()
2814 && m_peer_interested
2815 && t
&& t
->is_finished()
2818 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2819 (*m_logger
) << time_now_string() << " *** NO REQUEST [ t: "
2820 << total_seconds(d
) << " ] ***\n";
2822 disconnect("timed out: no request when unchoked");
2826 // if the peer hasn't become interested and we haven't
2827 // become interested in the peer for 10 minutes, it
2828 // has also timed out.
2831 d1
= now
- m_became_uninterested
;
2832 d2
= now
- m_became_uninteresting
;
2833 time_duration time_limit
= seconds(
2834 m_ses
.settings().inactivity_timeout
);
2836 // don't bother disconnect peers we haven't been interested
2837 // in (and that hasn't been interested in us) for a while
2838 // unless we have used up all our connection slots
2840 && !m_peer_interested
2843 && (m_ses
.num_connections() >= m_ses
.max_connections()
2844 || (t
&& t
->num_peers() >= t
->max_connections())))
2846 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2847 (*m_logger
) << time_now_string() << " *** MUTUAL NO INTEREST [ "
2848 "t1: " << total_seconds(d1
) << " | "
2849 "t2: " << total_seconds(d2
) << " ] ***\n";
2851 disconnect("timed out: no interest");
2855 if (!m_download_queue
.empty()
2856 && now
> m_requested
+ seconds(m_ses
.settings().request_timeout
2857 + m_timeout_extend
))
2862 // if we haven't sent something in too long, send a keep-alive
2865 m_ignore_bandwidth_limits
= m_ses
.settings().ignore_limits_on_local_network
2866 && on_local_network();
2868 m_statistics
.second_tick(tick_interval
);
2870 if (m_statistics
.upload_payload_rate() > m_upload_rate_peak
)
2872 m_upload_rate_peak
= m_statistics
.upload_payload_rate();
2874 if (m_statistics
.download_payload_rate() > m_download_rate_peak
)
2876 m_download_rate_peak
= m_statistics
.download_payload_rate();
2877 #ifndef TORRENT_DISABLE_GEO_IP
2878 if (peer_info_struct())
2880 std::pair
<const int, int>* as_stats
= peer_info_struct()->inet_as
;
2881 if (as_stats
&& as_stats
->second
< m_download_rate_peak
)
2882 as_stats
->second
= m_download_rate_peak
;
2886 if (is_disconnecting()) return;
2888 if (!t
->ready_for_connections()) return;
2890 // calculate the desired download queue size
2891 const float queue_time
= m_ses
.settings().request_queue_time
;
2892 // (if the latency is more than this, the download will stall)
2893 // so, the queue size is queue_time * down_rate / 16 kiB
2894 // (16 kB is the size of each request)
2895 // the minimum number of requests is 2 and the maximum is 48
2896 // the block size doesn't have to be 16. So we first query the
2898 const int block_size
= m_request_large_blocks
2899 ? t
->torrent_file().piece_length() : t
->block_size();
2900 TORRENT_ASSERT(block_size
> 0);
2904 m_desired_queue_size
= 1;
2908 m_desired_queue_size
= static_cast<int>(queue_time
2909 * statistics().download_rate() / block_size
);
2910 if (m_desired_queue_size
> m_max_out_request_queue
)
2911 m_desired_queue_size
= m_max_out_request_queue
;
2912 if (m_desired_queue_size
< min_request_queue
)
2913 m_desired_queue_size
= min_request_queue
;
2915 if (m_desired_queue_size
== m_max_out_request_queue
2916 && t
->alerts().should_post
<performance_alert
>())
2918 t
->alerts().post_alert(performance_alert(t
->get_handle()
2919 , performance_alert::outstanding_request_limit_reached
));
2923 if (!m_download_queue
.empty()
2924 && now
- m_last_piece
> seconds(m_ses
.settings().piece_timeout
2925 + m_timeout_extend
))
2927 // this peer isn't sending the pieces we've
2928 // requested (this has been observed by BitComet)
2929 // in this case we'll clear our download queue and
2930 // re-request the blocks.
2931 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2932 (*m_logger
) << time_now_string()
2933 << " *** PIECE_REQUESTS TIMED OUT [ " << (int)m_download_queue
.size()
2934 << " " << total_seconds(now
- m_last_piece
) << "] ***\n";
2940 // If the client sends more data
2941 // we send it data faster, otherwise, slower.
2942 // It will also depend on how much data the
2943 // client has sent us. This is the mean to
2944 // maintain the share ratio given by m_ratio
2947 if (t
->is_finished() || is_choked() || t
->ratio() == 0.0f
)
2949 // if we have downloaded more than one piece more
2950 // than we have uploaded OR if we are a seed
2951 // have an unlimited upload rate
2952 m_bandwidth_limit
[upload_channel
].throttle(m_upload_limit
);
2956 size_type bias
= 0x10000 + 2 * t
->block_size() + m_free_upload
;
2958 double break_even_time
= 15; // seconds.
2959 size_type have_uploaded
= m_statistics
.total_payload_upload();
2960 size_type have_downloaded
= m_statistics
.total_payload_download();
2961 double download_speed
= m_statistics
.download_rate();
2963 size_type soon_downloaded
=
2964 have_downloaded
+ (size_type
)(download_speed
* break_even_time
*1.5);
2966 if (t
->ratio() != 1.f
)
2967 soon_downloaded
= (size_type
)(soon_downloaded
*(double)t
->ratio());
2969 double upload_speed_limit
= (std::min
)((soon_downloaded
- have_uploaded
2970 + bias
) / break_even_time
, double(m_upload_limit
));
2972 upload_speed_limit
= (std::min
)(upload_speed_limit
,
2973 (double)(std::numeric_limits
<int>::max
)());
2975 m_bandwidth_limit
[upload_channel
].throttle(
2976 (std::min
)((std::max
)((int)upload_speed_limit
, 20)
2980 // update once every minute
2981 if (now
- m_remote_dl_update
>= seconds(60))
2983 float factor
= 0.6666666666667f
;
2985 if (m_remote_dl_rate
== 0) factor
= 0.0f
;
2987 m_remote_dl_rate
= int((m_remote_dl_rate
* factor
) +
2988 ((m_remote_bytes_dled
* (1.0f
-factor
)) / 60.f
));
2990 m_remote_bytes_dled
= 0;
2991 m_remote_dl_update
= now
;
2997 void peer_connection::snub_peer()
3001 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
3007 if (m_ses
.m_alerts
.should_post
<peer_snubbed_alert
>())
3009 m_ses
.m_alerts
.post_alert(peer_snubbed_alert(t
->get_handle()
3010 , m_remote
, m_peer_id
));
3013 m_desired_queue_size
= 1;
3017 m_timeout_extend
+= m_ses
.settings().request_timeout
;
3020 if (!t
->has_picker()) return;
3021 piece_picker
& picker
= t
->picker();
3023 piece_block
r(-1, -1);
3024 // time out the last request in the queue
3025 if (!m_request_queue
.empty())
3027 r
= m_request_queue
.back();
3028 m_request_queue
.pop_back();
3032 TORRENT_ASSERT(!m_download_queue
.empty());
3033 r
= m_download_queue
.back().block
;
3035 // only time out a request if it blocks the piece
3036 // from being completed (i.e. no free blocks to
3038 piece_picker::downloading_piece p
;
3039 picker
.piece_info(r
.piece_index
, p
);
3040 int free_blocks
= picker
.blocks_in_piece(r
.piece_index
)
3041 - p
.finished
- p
.writing
- p
.requested
;
3042 if (free_blocks
> 0)
3044 m_timeout_extend
+= m_ses
.settings().request_timeout
;
3048 if (m_ses
.m_alerts
.should_post
<block_timeout_alert
>())
3050 m_ses
.m_alerts
.post_alert(block_timeout_alert(t
->get_handle()
3051 , remote(), pid(), r
.block_index
, r
.piece_index
));
3053 m_download_queue
.pop_back();
3055 if (!m_download_queue
.empty() || !m_request_queue
.empty())
3056 m_timeout_extend
+= m_ses
.settings().request_timeout
;
3058 m_desired_queue_size
= 2;
3059 request_a_block(*t
, *this);
3060 m_desired_queue_size
= 1;
3062 // abort the block after the new one has
3063 // been requested in order to prevent it from
3064 // picking the same block again, stalling the
3065 // same piece indefinitely.
3066 if (r
!= piece_block(-1, -1))
3067 picker
.abort_download(r
);
3069 send_block_requests();
3072 void peer_connection::fill_send_buffer()
3074 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3078 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
3081 // only add new piece-chunks if the send buffer is small enough
3082 // otherwise there will be no end to how large it will be!
3084 int buffer_size_watermark
= int(m_statistics
.upload_rate()) / 2;
3085 if (buffer_size_watermark
< 512) buffer_size_watermark
= 512;
3086 else if (buffer_size_watermark
> m_ses
.settings().send_buffer_watermark
)
3087 buffer_size_watermark
= m_ses
.settings().send_buffer_watermark
;
3089 while (!m_requests
.empty()
3090 && (send_buffer_size() + m_reading_bytes
< buffer_size_watermark
))
3092 TORRENT_ASSERT(t
->ready_for_connections());
3093 peer_request
& r
= m_requests
.front();
3095 TORRENT_ASSERT(r
.piece
>= 0);
3096 TORRENT_ASSERT(r
.piece
< (int)m_have_piece
.size());
3097 TORRENT_ASSERT(t
->have_piece(r
.piece
));
3098 TORRENT_ASSERT(r
.start
+ r
.length
<= t
->torrent_file().piece_size(r
.piece
));
3099 TORRENT_ASSERT(r
.length
> 0 && r
.start
>= 0);
3101 t
->filesystem().async_read(r
, bind(&peer_connection::on_disk_read_complete
3102 , self(), _1
, _2
, r
));
3103 m_reading_bytes
+= r
.length
;
3105 m_requests
.erase(m_requests
.begin());
3109 void peer_connection::on_disk_read_complete(int ret
, disk_io_job
const& j
, peer_request r
)
3111 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3113 m_reading_bytes
-= r
.length
;
3115 disk_buffer_holder
buffer(m_ses
, j
.buffer
);
3117 if (ret
!= r
.length
|| m_torrent
.expired())
3119 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
3122 disconnect(j
.str
.c_str());
3126 if (t
->alerts().should_post
<file_error_alert
>())
3127 t
->alerts().post_alert(file_error_alert(j
.error_file
, t
->get_handle(), j
.str
));
3128 t
->set_error(j
.str
);
3133 #ifdef TORRENT_VERBOSE_LOGGING
3134 (*m_logger
) << time_now_string()
3135 << " ==> PIECE [ piece: " << r
.piece
<< " | s: " << r
.start
3136 << " | l: " << r
.length
<< " ]\n";
3139 write_piece(r
, buffer
);
3143 void peer_connection::assign_bandwidth(int channel
, int amount
)
3145 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3147 #ifdef TORRENT_VERBOSE_LOGGING
3148 (*m_logger
) << "bandwidth [ " << channel
<< " ] + " << amount
<< "\n";
3151 m_bandwidth_limit
[channel
].assign(amount
);
3152 TORRENT_ASSERT(m_channel_state
[channel
] == peer_info::bw_global
);
3153 m_channel_state
[channel
] = peer_info::bw_idle
;
3154 if (channel
== upload_channel
)
3158 else if (channel
== download_channel
)
3164 void peer_connection::expire_bandwidth(int channel
, int amount
)
3166 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3168 m_bandwidth_limit
[channel
].expire(amount
);
3169 if (channel
== upload_channel
)
3173 else if (channel
== download_channel
)
3179 void peer_connection::setup_send()
3181 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3183 if (m_channel_state
[upload_channel
] != peer_info::bw_idle
) return;
3185 shared_ptr
<torrent
> t
= m_torrent
.lock();
3187 if (m_bandwidth_limit
[upload_channel
].quota_left() == 0
3188 && !m_send_buffer
.empty()
3191 && !m_ignore_bandwidth_limits
)
3193 // in this case, we have data to send, but no
3194 // bandwidth. So, we simply request bandwidth
3197 if (m_bandwidth_limit
[upload_channel
].max_assignable() > 0)
3199 int priority
= is_interesting() * 2 + m_requests_in_buffer
.size();
3200 // peers that we are not interested in are non-prioritized
3201 m_channel_state
[upload_channel
] = peer_info::bw_torrent
;
3202 t
->request_bandwidth(upload_channel
, self()
3203 , m_send_buffer
.size(), priority
);
3204 #ifdef TORRENT_VERBOSE_LOGGING
3205 (*m_logger
) << time_now_string() << " *** REQUEST_BANDWIDTH [ upload prio: "
3206 << priority
<< "]\n";
3215 #ifdef TORRENT_VERBOSE_LOGGING
3216 (*m_logger
) << time_now_string() << " *** CANNOT WRITE ["
3217 " quota: " << m_bandwidth_limit
[download_channel
].quota_left() <<
3218 " ignore: " << (m_ignore_bandwidth_limits
?"yes":"no") <<
3219 " buf: " << m_send_buffer
.size() <<
3220 " connecting: " << (m_connecting
?"yes":"no") <<
3226 // send the actual buffer
3227 if (!m_send_buffer
.empty())
3229 int amount_to_send
= m_send_buffer
.size();
3230 int quota_left
= m_bandwidth_limit
[upload_channel
].quota_left();
3231 if (!m_ignore_bandwidth_limits
&& amount_to_send
> quota_left
)
3232 amount_to_send
= quota_left
;
3234 TORRENT_ASSERT(amount_to_send
> 0);
3236 #ifdef TORRENT_VERBOSE_LOGGING
3237 (*m_logger
) << time_now_string() << " *** ASYNC_WRITE [ bytes: " << amount_to_send
<< " ]\n";
3239 std::list
<asio::const_buffer
> const& vec
= m_send_buffer
.build_iovec(amount_to_send
);
3240 m_socket
->async_write_some(vec
, bind(&peer_connection::on_send_data
, self(), _1
, _2
));
3242 m_channel_state
[upload_channel
] = peer_info::bw_network
;
3246 void peer_connection::setup_receive()
3248 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3252 if (m_channel_state
[download_channel
] != peer_info::bw_idle
) return;
3254 shared_ptr
<torrent
> t
= m_torrent
.lock();
3256 if (m_bandwidth_limit
[download_channel
].quota_left() == 0
3259 && !m_ignore_bandwidth_limits
)
3261 if (m_bandwidth_limit
[download_channel
].max_assignable() > 0)
3263 #ifdef TORRENT_VERBOSE_LOGGING
3264 (*m_logger
) << time_now_string() << " *** REQUEST_BANDWIDTH [ download ]\n";
3266 TORRENT_ASSERT(m_channel_state
[download_channel
] == peer_info::bw_idle
);
3267 m_channel_state
[download_channel
] = peer_info::bw_torrent
;
3268 t
->request_bandwidth(download_channel
, self()
3269 , m_download_queue
.size() * 16 * 1024 + 30, m_priority
);
3276 #ifdef TORRENT_VERBOSE_LOGGING
3277 (*m_logger
) << time_now_string() << " *** CANNOT READ ["
3278 " quota: " << m_bandwidth_limit
[download_channel
].quota_left() <<
3279 " ignore: " << (m_ignore_bandwidth_limits
?"yes":"no") <<
3280 " outstanding: " << m_outstanding_writing_bytes
<<
3281 " outstanding-limit: " << m_ses
.settings().max_outstanding_disk_bytes_per_connection
<<
3287 TORRENT_ASSERT(m_packet_size
> 0);
3288 int max_receive
= m_packet_size
- m_recv_pos
;
3289 int quota_left
= m_bandwidth_limit
[download_channel
].quota_left();
3290 if (!m_ignore_bandwidth_limits
&& max_receive
> quota_left
)
3291 max_receive
= quota_left
;
3293 if (max_receive
== 0) return;
3295 TORRENT_ASSERT(m_recv_pos
>= 0);
3296 TORRENT_ASSERT(m_packet_size
> 0);
3297 TORRENT_ASSERT(can_read());
3298 #ifdef TORRENT_VERBOSE_LOGGING
3299 (*m_logger
) << time_now_string() << " *** ASYNC_READ [ max: " << max_receive
<< " bytes ]\n";
3302 int regular_buffer_size
= m_packet_size
- m_disk_recv_buffer_size
;
3304 if (int(m_recv_buffer
.size()) < regular_buffer_size
)
3305 m_recv_buffer
.resize(regular_buffer_size
);
3307 if (!m_disk_recv_buffer
|| regular_buffer_size
>= m_recv_pos
+ max_receive
)
3309 // only receive into regular buffer
3310 TORRENT_ASSERT(m_recv_pos
+ max_receive
<= int(m_recv_buffer
.size()));
3311 m_socket
->async_read_some(asio::buffer(&m_recv_buffer
[m_recv_pos
]
3312 , max_receive
), bind(&peer_connection::on_receive_data
, self(), _1
, _2
));
3314 else if (m_recv_pos
>= regular_buffer_size
)
3316 // only receive into disk buffer
3317 TORRENT_ASSERT(m_recv_pos
- regular_buffer_size
>= 0);
3318 TORRENT_ASSERT(m_recv_pos
- regular_buffer_size
+ max_receive
<= m_disk_recv_buffer_size
);
3319 m_socket
->async_read_some(asio::buffer(m_disk_recv_buffer
.get() + m_recv_pos
- regular_buffer_size
3321 , bind(&peer_connection::on_receive_data
, self(), _1
, _2
));
3325 // receive into both regular and disk buffer
3326 TORRENT_ASSERT(max_receive
+ m_recv_pos
> regular_buffer_size
);
3327 TORRENT_ASSERT(m_recv_pos
< regular_buffer_size
);
3328 TORRENT_ASSERT(max_receive
- regular_buffer_size
3329 + m_recv_pos
<= m_disk_recv_buffer_size
);
3331 boost::array
<asio::mutable_buffer
, 2> vec
;
3332 vec
[0] = asio::buffer(&m_recv_buffer
[m_recv_pos
]
3333 , regular_buffer_size
- m_recv_pos
);
3334 vec
[1] = asio::buffer(m_disk_recv_buffer
.get()
3335 , max_receive
- regular_buffer_size
+ m_recv_pos
);
3336 m_socket
->async_read_some(vec
, bind(&peer_connection::on_receive_data
3339 m_channel_state
[download_channel
] = peer_info::bw_network
;
3342 #ifndef TORRENT_DISABLE_ENCRYPTION
3344 // returns the last 'bytes' from the receive buffer
3345 std::pair
<buffer::interval
, buffer::interval
> peer_connection::wr_recv_buffers(int bytes
)
3347 TORRENT_ASSERT(bytes
<= m_recv_pos
);
3349 std::pair
<buffer::interval
, buffer::interval
> vec
;
3350 int regular_buffer_size
= m_packet_size
- m_disk_recv_buffer_size
;
3351 TORRENT_ASSERT(regular_buffer_size
>= 0);
3352 if (!m_disk_recv_buffer
|| regular_buffer_size
>= m_recv_pos
)
3354 vec
.first
= buffer::interval(&m_recv_buffer
[0]
3355 + m_recv_pos
- bytes
, &m_recv_buffer
[0] + m_recv_pos
);
3356 vec
.second
= buffer::interval(0,0);
3358 else if (m_recv_pos
- bytes
>= regular_buffer_size
)
3360 vec
.first
= buffer::interval(m_disk_recv_buffer
.get() + m_recv_pos
3361 - regular_buffer_size
- bytes
, m_disk_recv_buffer
.get() + m_recv_pos
3362 - regular_buffer_size
);
3363 vec
.second
= buffer::interval(0,0);
3367 TORRENT_ASSERT(m_recv_pos
- bytes
< regular_buffer_size
);
3368 TORRENT_ASSERT(m_recv_pos
> regular_buffer_size
);
3369 vec
.first
= buffer::interval(&m_recv_buffer
[0] + m_recv_pos
- bytes
3370 , &m_recv_buffer
[0] + regular_buffer_size
);
3371 vec
.second
= buffer::interval(m_disk_recv_buffer
.get()
3372 , m_disk_recv_buffer
.get() + m_recv_pos
- regular_buffer_size
);
3374 TORRENT_ASSERT(vec
.first
.left() + vec
.second
.left() == bytes
);
3379 void peer_connection::reset_recv_buffer(int packet_size
)
3381 TORRENT_ASSERT(packet_size
> 0);
3382 if (m_recv_pos
> m_packet_size
)
3384 cut_receive_buffer(m_packet_size
, packet_size
);
3388 m_packet_size
= packet_size
;
3391 void peer_connection::send_buffer(char const* buf
, int size
, int flags
)
3393 if (flags
== message_type_request
)
3394 m_requests_in_buffer
.push_back(m_send_buffer
.size() + size
);
3396 int free_space
= m_send_buffer
.space_in_last_buffer();
3397 if (free_space
> size
) free_space
= size
;
3400 m_send_buffer
.append(buf
, free_space
);
3403 #ifdef TORRENT_STATS
3404 m_ses
.m_buffer_usage_logger
<< log_time() << " send_buffer: "
3405 << free_space
<< std::endl
;
3406 m_ses
.log_buffer_usage();
3409 if (size
<= 0) return;
3411 std::pair
<char*, int> buffer
= m_ses
.allocate_buffer(size
);
3412 if (buffer
.first
== 0)
3414 disconnect("out of memory");
3417 TORRENT_ASSERT(buffer
.second
>= size
);
3418 std::memcpy(buffer
.first
, buf
, size
);
3419 m_send_buffer
.append_buffer(buffer
.first
, buffer
.second
, size
3420 , bind(&session_impl::free_buffer
, boost::ref(m_ses
), _1
, buffer
.second
));
3421 #ifdef TORRENT_STATS
3422 m_ses
.m_buffer_usage_logger
<< log_time() << " send_buffer_alloc: " << size
<< std::endl
;
3423 m_ses
.log_buffer_usage();
3428 // TODO: change this interface to automatically call setup_send() when the
3429 // return value is destructed
3430 buffer::interval
peer_connection::allocate_send_buffer(int size
)
3432 TORRENT_ASSERT(size
> 0);
3433 char* insert
= m_send_buffer
.allocate_appendix(size
);
3436 std::pair
<char*, int> buffer
= m_ses
.allocate_buffer(size
);
3437 if (buffer
.first
== 0)
3439 disconnect("out of memory");
3440 return buffer::interval(0, 0);
3442 TORRENT_ASSERT(buffer
.second
>= size
);
3443 m_send_buffer
.append_buffer(buffer
.first
, buffer
.second
, size
3444 , bind(&session_impl::free_buffer
, boost::ref(m_ses
), _1
, buffer
.second
));
3445 buffer::interval
ret(buffer
.first
, buffer
.first
+ size
);
3446 #ifdef TORRENT_STATS
3447 m_ses
.m_buffer_usage_logger
<< log_time() << " allocate_buffer_alloc: " << size
<< std::endl
;
3448 m_ses
.log_buffer_usage();
3454 #ifdef TORRENT_STATS
3455 m_ses
.m_buffer_usage_logger
<< log_time() << " allocate_buffer: " << size
<< std::endl
;
3456 m_ses
.log_buffer_usage();
3458 buffer::interval
ret(insert
, insert
+ size
);
3466 set_to_zero(T
& v
, bool cond
): m_val(v
), m_cond(cond
) {}
3467 void fire() { if (!m_cond
) return; m_cond
= false; m_val
= 0; }
3468 ~set_to_zero() { if (m_cond
) m_val
= 0; }
3474 // --------------------------
3476 // --------------------------
3478 // throws exception when the client should be disconnected
3479 void peer_connection::on_receive_data(const error_code
& error
3480 , std::size_t bytes_transferred
)
3482 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3486 TORRENT_ASSERT(m_channel_state
[download_channel
] == peer_info::bw_network
);
3487 m_channel_state
[download_channel
] = peer_info::bw_idle
;
3491 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
3492 (*m_logger
) << time_now_string() << " **ERROR**: "
3493 << error
.message() << "[in peer_connection::on_receive_data]\n";
3495 on_receive(error
, bytes_transferred
);
3496 disconnect(error
.message().c_str());
3500 int max_receive
= 0;
3503 #ifdef TORRENT_VERBOSE_LOGGING
3504 (*m_logger
) << "read " << bytes_transferred
<< " bytes\n";
3506 // correct the dl quota usage, if not all of the buffer was actually read
3507 if (!m_ignore_bandwidth_limits
)
3508 m_bandwidth_limit
[download_channel
].use_quota(bytes_transferred
);
3510 if (m_disconnecting
) return;
3512 TORRENT_ASSERT(m_packet_size
> 0);
3513 TORRENT_ASSERT(bytes_transferred
> 0);
3515 m_last_receive
= time_now();
3516 m_recv_pos
+= bytes_transferred
;
3517 TORRENT_ASSERT(m_recv_pos
<= int(m_recv_buffer
.size()
3518 + m_disk_recv_buffer_size
));
3521 size_type cur_payload_dl
= m_statistics
.last_payload_downloaded();
3522 size_type cur_protocol_dl
= m_statistics
.last_protocol_downloaded();
3524 on_receive(error
, bytes_transferred
);
3526 TORRENT_ASSERT(m_statistics
.last_payload_downloaded() - cur_payload_dl
>= 0);
3527 TORRENT_ASSERT(m_statistics
.last_protocol_downloaded() - cur_protocol_dl
>= 0);
3528 size_type stats_diff
= m_statistics
.last_payload_downloaded() - cur_payload_dl
+
3529 m_statistics
.last_protocol_downloaded() - cur_protocol_dl
;
3530 TORRENT_ASSERT(stats_diff
== bytes_transferred
);
3533 TORRENT_ASSERT(m_packet_size
> 0);
3537 && (m_recv_buffer
.capacity() - m_packet_size
) > 128)
3539 buffer(m_packet_size
).swap(m_recv_buffer
);
3542 max_receive
= m_packet_size
- m_recv_pos
;
3543 int quota_left
= m_bandwidth_limit
[download_channel
].quota_left();
3544 if (!m_ignore_bandwidth_limits
&& max_receive
> quota_left
)
3545 max_receive
= quota_left
;
3547 if (max_receive
== 0) break;
3549 int regular_buffer_size
= m_packet_size
- m_disk_recv_buffer_size
;
3551 if (int(m_recv_buffer
.size()) < regular_buffer_size
)
3552 m_recv_buffer
.resize(regular_buffer_size
);
3555 if (!m_disk_recv_buffer
|| regular_buffer_size
>= m_recv_pos
+ max_receive
)
3557 // only receive into regular buffer
3558 TORRENT_ASSERT(m_recv_pos
+ max_receive
<= int(m_recv_buffer
.size()));
3559 bytes_transferred
= m_socket
->read_some(asio::buffer(&m_recv_buffer
[m_recv_pos
]
3560 , max_receive
), ec
);
3562 else if (m_recv_pos
>= regular_buffer_size
)
3564 // only receive into disk buffer
3565 TORRENT_ASSERT(m_recv_pos
- regular_buffer_size
>= 0);
3566 TORRENT_ASSERT(m_recv_pos
- regular_buffer_size
+ max_receive
<= m_disk_recv_buffer_size
);
3567 bytes_transferred
= m_socket
->read_some(asio::buffer(m_disk_recv_buffer
.get()
3568 + m_recv_pos
- regular_buffer_size
, (std::min
)(m_packet_size
3569 - m_recv_pos
, max_receive
)), ec
);
3573 // receive into both regular and disk buffer
3574 TORRENT_ASSERT(max_receive
+ m_recv_pos
> regular_buffer_size
);
3575 TORRENT_ASSERT(m_recv_pos
< regular_buffer_size
);
3576 TORRENT_ASSERT(max_receive
- regular_buffer_size
3577 + m_recv_pos
<= m_disk_recv_buffer_size
);
3579 boost::array
<asio::mutable_buffer
, 2> vec
;
3580 vec
[0] = asio::buffer(&m_recv_buffer
[m_recv_pos
]
3581 , regular_buffer_size
- m_recv_pos
);
3582 vec
[1] = asio::buffer(m_disk_recv_buffer
.get()
3583 , (std::min
)(m_disk_recv_buffer_size
3584 , max_receive
- regular_buffer_size
+ m_recv_pos
));
3585 bytes_transferred
= m_socket
->read_some(vec
, ec
);
3587 if (ec
&& ec
!= asio::error::would_block
)
3589 disconnect(ec
.message().c_str());
3592 if (ec
== asio::error::would_block
) break;
3594 while (bytes_transferred
> 0);
3599 bool peer_connection::can_write() const
3601 // if we have requests or pending data to be sent or announcements to be made
3602 // we want to send data
3603 return !m_send_buffer
.empty()
3604 && (m_bandwidth_limit
[upload_channel
].quota_left() > 0
3605 || m_ignore_bandwidth_limits
)
3609 bool peer_connection::can_read() const
3611 bool ret
= (m_bandwidth_limit
[download_channel
].quota_left() > 0
3612 || m_ignore_bandwidth_limits
)
3614 && m_outstanding_writing_bytes
<
3615 m_ses
.settings().max_outstanding_disk_bytes_per_connection
;
3620 void peer_connection::connect(int ticket
)
3623 // in case we disconnect here, we need to
3624 // keep the connection alive until the
3625 // exit invariant check is run
3626 boost::intrusive_ptr
<peer_connection
> me(self());
3631 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
3632 (*m_ses
.m_logger
) << time_now_string() << " CONNECTING: " << m_remote
.address().to_string(ec
)
3633 << ":" << m_remote
.port() << "\n";
3636 m_connection_ticket
= ticket
;
3637 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
3640 TORRENT_ASSERT(m_connecting
);
3644 disconnect("torrent aborted");
3648 m_socket
->open(t
->get_interface().protocol(), ec
);
3651 disconnect(ec
.message().c_str());
3655 // set the socket to non-blocking, so that we can
3656 // read the entire buffer on each read event we get
3657 tcp::socket::non_blocking_io
ioc(true);
3658 m_socket
->io_control(ioc
, ec
);
3661 disconnect(ec
.message().c_str());
3665 tcp::endpoint bind_interface
= t
->get_interface();
3667 std::pair
<int, int> const& out_ports
= m_ses
.settings().outgoing_ports
;
3668 if (out_ports
.first
> 0 && out_ports
.second
>= out_ports
.first
)
3670 m_socket
->set_option(socket_acceptor::reuse_address(true), ec
);
3673 disconnect(ec
.message().c_str());
3676 bind_interface
.port(m_ses
.next_port());
3679 m_socket
->bind(bind_interface
, ec
);
3682 disconnect(ec
.message().c_str());
3685 m_socket
->async_connect(m_remote
3686 , bind(&peer_connection::on_connection_complete
, self(), _1
));
3687 m_connect
= time_now();
3688 m_statistics
.sent_syn();
3690 if (t
->alerts().should_post
<peer_connect_alert
>())
3692 t
->alerts().post_alert(peer_connect_alert(
3693 t
->get_handle(), remote(), pid()));
3697 void peer_connection::on_connection_complete(error_code
const& e
)
3699 ptime completed
= time_now();
3701 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3705 m_rtt
= total_milliseconds(completed
- m_connect
);
3707 if (m_disconnecting
) return;
3709 m_connecting
= false;
3710 m_ses
.m_half_open
.done(m_connection_ticket
);
3714 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
3715 (*m_ses
.m_logger
) << time_now_string() << " CONNECTION FAILED: " << m_remote
.address().to_string()
3716 << ": " << e
.message() << "\n";
3718 disconnect(e
.message().c_str(), 1);
3722 if (m_disconnecting
) return;
3723 m_last_receive
= time_now();
3725 // this means the connection just succeeded
3727 m_statistics
.received_synack();
3729 TORRENT_ASSERT(m_socket
);
3730 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING
3731 (*m_ses
.m_logger
) << time_now_string() << " COMPLETED: " << m_remote
.address().to_string()
3732 << " rtt = " << m_rtt
<< "\n";
3736 if (m_remote
== m_socket
->local_endpoint(ec
))
3738 // if the remote endpoint is the same as the local endpoint, we're connected
3740 disconnect("connected to ourselves", 1);
3744 if (m_remote
.address().is_v4())
3747 m_socket
->set_option(type_of_service(m_ses
.settings().peer_tos
), ec
);
3755 // --------------------------
3757 // --------------------------
3759 // throws exception when the client should be disconnected
3760 void peer_connection::on_send_data(error_code
const& error
3761 , std::size_t bytes_transferred
)
3763 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3767 TORRENT_ASSERT(m_channel_state
[upload_channel
] == peer_info::bw_network
);
3769 m_send_buffer
.pop_front(bytes_transferred
);
3771 for (std::vector
<int>::iterator i
= m_requests_in_buffer
.begin()
3772 , end(m_requests_in_buffer
.end()); i
!= end
; ++i
)
3773 *i
-= bytes_transferred
;
3775 while (!m_requests_in_buffer
.empty()
3776 && m_requests_in_buffer
.front() <= 0)
3777 m_requests_in_buffer
.erase(m_requests_in_buffer
.begin());
3779 m_channel_state
[upload_channel
] = peer_info::bw_idle
;
3781 if (!m_ignore_bandwidth_limits
)
3782 m_bandwidth_limit
[upload_channel
].use_quota(bytes_transferred
);
3784 #ifdef TORRENT_VERBOSE_LOGGING
3785 (*m_logger
) << "wrote " << bytes_transferred
<< " bytes\n";
3790 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
3791 (*m_logger
) << "**ERROR**: " << error
.message() << " [in peer_connection::on_send_data]\n";
3793 disconnect(error
.message().c_str());
3796 if (m_disconnecting
) return;
3798 TORRENT_ASSERT(!m_connecting
);
3799 TORRENT_ASSERT(bytes_transferred
> 0);
3801 m_last_sent
= time_now();
3804 size_type cur_payload_ul
= m_statistics
.last_payload_uploaded();
3805 size_type cur_protocol_ul
= m_statistics
.last_protocol_uploaded();
3807 on_sent(error
, bytes_transferred
);
3809 TORRENT_ASSERT(m_statistics
.last_payload_uploaded() - cur_payload_ul
>= 0);
3810 TORRENT_ASSERT(m_statistics
.last_protocol_uploaded() - cur_protocol_ul
>= 0);
3811 size_type stats_diff
= m_statistics
.last_payload_uploaded() - cur_payload_ul
3812 + m_statistics
.last_protocol_uploaded() - cur_protocol_ul
;
3813 TORRENT_ASSERT(stats_diff
== bytes_transferred
);
3822 void peer_connection::check_invariant() const
3824 TORRENT_ASSERT(bool(m_disk_recv_buffer
) == (m_disk_recv_buffer_size
> 0));
3826 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
3827 if (m_disconnecting
)
3830 TORRENT_ASSERT(m_disconnect_started
);
3832 else if (!m_in_constructor
)
3834 TORRENT_ASSERT(m_ses
.has_peer((peer_connection
*)this));
3838 // this assertion correct most of the time, but sometimes right when the
3839 // limit is changed it might break
3840 for (int i = 0; i < 2; ++i)
3842 // this peer is in the bandwidth history iff max_assignable < limit
3843 TORRENT_ASSERT((m_bandwidth_limit[i].max_assignable() < m_bandwidth_limit[i].throttle())
3844 == m_ses.m_bandwidth_manager[i]->is_in_history(this)
3845 || m_bandwidth_limit[i].throttle() == bandwidth_limit::inf);
3849 if (m_channel_state
[download_channel
] == peer_info::bw_torrent
3850 || m_channel_state
[download_channel
] == peer_info::bw_global
)
3851 TORRENT_ASSERT(m_bandwidth_limit
[download_channel
].quota_left() == 0);
3852 if (m_channel_state
[upload_channel
] == peer_info::bw_torrent
3853 || m_channel_state
[upload_channel
] == peer_info::bw_global
)
3854 TORRENT_ASSERT(m_bandwidth_limit
[upload_channel
].quota_left() == 0);
3856 std::set
<piece_block
> unique
;
3857 std::transform(m_download_queue
.begin(), m_download_queue
.end()
3858 , std::inserter(unique
, unique
.begin()), boost::bind(&pending_block::block
, _1
));
3859 std::copy(m_request_queue
.begin(), m_request_queue
.end(), std::inserter(unique
, unique
.begin()));
3860 TORRENT_ASSERT(unique
.size() == m_download_queue
.size() + m_request_queue
.size());
3863 TORRENT_ASSERT(m_peer_info
->prev_amount_upload
== 0);
3864 TORRENT_ASSERT(m_peer_info
->prev_amount_download
== 0);
3865 TORRENT_ASSERT(m_peer_info
->connection
== this
3866 || m_peer_info
->connection
== 0);
3868 if (m_peer_info
->optimistically_unchoked
)
3869 TORRENT_ASSERT(!is_choked());
3872 TORRENT_ASSERT(m_have_piece
.count() == m_num_pieces
);
3876 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3877 // since this connection doesn't have a torrent reference
3878 // no torrent should have a reference to this connection either
3879 for (aux::session_impl::torrent_map::const_iterator i
= m_ses
.m_torrents
.begin()
3880 , end(m_ses
.m_torrents
.end()); i
!= end
; ++i
)
3881 TORRENT_ASSERT(!i
->second
->has_peer((peer_connection
*)this));
3886 if (t
->ready_for_connections() && m_initialized
)
3887 TORRENT_ASSERT(t
->torrent_file().num_pieces() == int(m_have_piece
.size()));
3889 if (m_ses
.settings().close_redundant_connections
)
3891 // make sure upload only peers are disconnected
3892 if (t
->is_finished() && m_upload_only
)
3893 TORRENT_ASSERT(m_disconnect_started
);
3896 && m_bitfield_received
3897 && t
->are_files_checked())
3898 TORRENT_ASSERT(m_disconnect_started
);
3901 if (!m_disconnect_started
&& m_initialized
)
3903 // none of this matters if we're disconnecting anyway
3904 if (t
->is_finished())
3905 TORRENT_ASSERT(!m_interesting
);
3907 TORRENT_ASSERT(m_upload_only
);
3910 if (t
->has_picker())
3912 std::map
<piece_block
, int> num_requests
;
3913 for (torrent::const_peer_iterator i
= t
->begin(); i
!= t
->end(); ++i
)
3915 // make sure this peer is not a dangling pointer
3916 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3917 TORRENT_ASSERT(m_ses
.has_peer(*i
));
3919 peer_connection
const& p
= *(*i
);
3920 for (std::deque
<piece_block
>::const_iterator i
= p
.request_queue().begin()
3921 , end(p
.request_queue().end()); i
!= end
; ++i
)
3923 for (std::deque
<pending_block
>::const_iterator i
= p
.download_queue().begin()
3924 , end(p
.download_queue().end()); i
!= end
; ++i
)
3925 ++num_requests
[i
->block
];
3927 for (std::map
<piece_block
, int>::iterator i
= num_requests
.begin()
3928 , end(num_requests
.end()); i
!= end
; ++i
)
3930 if (!t
->picker().is_downloaded(i
->first
))
3931 TORRENT_ASSERT(t
->picker().num_peers(i
->first
) == i
->second
);
3934 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3937 policy::const_iterator i
= t
->get_policy().begin_peer();
3938 policy::const_iterator end
= t
->get_policy().end_peer();
3939 for (; i
!= end
; ++i
)
3941 if (&i
->second
== m_peer_info
) break;
3943 TORRENT_ASSERT(i
!= end
);
3946 if (t
->has_picker() && !t
->is_aborted())
3948 // make sure that pieces that have completed the download
3949 // of all their blocks are in the disk io thread's queue
3951 const std::vector
<piece_picker::downloading_piece
>& dl_queue
3952 = t
->picker().get_download_queue();
3953 for (std::vector
<piece_picker::downloading_piece
>::const_iterator i
=
3954 dl_queue
.begin(); i
!= dl_queue
.end(); ++i
)
3956 const int blocks_per_piece
= t
->picker().blocks_in_piece(i
->index
);
3958 bool complete
= true;
3959 for (int j
= 0; j
< blocks_per_piece
; ++j
)
3961 if (i
->info
[j
].state
== piece_picker::block_info::state_finished
)
3967 // this invariant is not valid anymore since the completion event
3968 // might be queued in the io service
3969 if (complete && !piece_failed)
3971 disk_io_job ret = m_ses.m_disk_thread.find_job(
3972 &t->filesystem(), -1, i->index);
3973 TORRENT_ASSERT(ret.action == disk_io_job::hash || ret.action == disk_io_job::write);
3974 TORRENT_ASSERT(ret.piece == i->index);
3980 // extremely expensive invariant check
3984 piece_picker& p = t->picker();
3985 const std::vector<piece_picker::downloading_piece>& dlq = p.get_download_queue();
3986 const int blocks_per_piece = static_cast<int>(
3987 t->torrent_file().piece_length() / t->block_size());
3989 for (std::vector<piece_picker::downloading_piece>::const_iterator i =
3990 dlq.begin(); i != dlq.end(); ++i)
3992 for (int j = 0; j < blocks_per_piece; ++j)
3994 if (std::find(m_request_queue.begin(), m_request_queue.end()
3995 , piece_block(i->index, j)) != m_request_queue.end()
3997 std::find(m_download_queue.begin(), m_download_queue.end()
3998 , piece_block(i->index, j)) != m_download_queue.end())
4000 TORRENT_ASSERT(i->info[j].peer == m_remote);
4004 TORRENT_ASSERT(i->info[j].peer != m_remote || i->info[j].finished);
4013 peer_connection::peer_speed_t
peer_connection::peer_speed()
4015 shared_ptr
<torrent
> t
= m_torrent
.lock();
4018 int download_rate
= int(statistics().download_payload_rate());
4019 int torrent_download_rate
= int(t
->statistics().download_payload_rate());
4021 if (download_rate
> 512 && download_rate
> torrent_download_rate
/ 16)
4023 else if (download_rate
> 4096 && download_rate
> torrent_download_rate
/ 64)
4025 else if (download_rate
< torrent_download_rate
/ 15 && m_speed
== fast
)
4033 void peer_connection::keep_alive()
4035 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
4040 d
= time_now() - m_last_sent
;
4041 if (total_seconds(d
) < m_timeout
/ 2) return;
4043 if (m_connecting
) return;
4044 if (in_handshake()) return;
4046 // if the last send has not completed yet, do not send a keep
4048 if (m_channel_state
[upload_channel
] != peer_info::bw_idle
) return;
4050 #ifdef TORRENT_VERBOSE_LOGGING
4051 (*m_logger
) << time_now_string() << " ==> KEEPALIVE\n";
4054 m_last_sent
= time_now();
4058 bool peer_connection::is_seed() const
4060 // if m_num_pieces == 0, we probably don't have the
4062 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
4063 return m_num_pieces
== (int)m_have_piece
.size() && m_num_pieces
> 0 && t
&& t
->valid_metadata();