AUTO_LT_SYNC
[tore.git] / libtorrent / src / peer_connection.cpp
blob564d077c2c4be1e6e096d937da426d255754b278
1 /*
3 Copyright (c) 2003, Arvid Norberg
4 All rights reserved.
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #include "libtorrent/pch.hpp"
35 #include <vector>
36 #include <iostream>
37 #include <iomanip>
38 #include <limits>
39 #include <boost/bind.hpp>
41 #include "libtorrent/peer_connection.hpp"
42 #include "libtorrent/identify_client.hpp"
43 #include "libtorrent/entry.hpp"
44 #include "libtorrent/bencode.hpp"
45 #include "libtorrent/alert_types.hpp"
46 #include "libtorrent/invariant_check.hpp"
47 #include "libtorrent/io.hpp"
48 #include "libtorrent/file.hpp"
49 #include "libtorrent/version.hpp"
50 #include "libtorrent/extensions.hpp"
51 #include "libtorrent/aux_/session_impl.hpp"
52 #include "libtorrent/policy.hpp"
53 #include "libtorrent/socket_type.hpp"
54 #include "libtorrent/assert.hpp"
56 //#define TORRENT_CORRUPT_DATA
58 using boost::bind;
59 using boost::shared_ptr;
60 using libtorrent::aux::session_impl;
62 namespace libtorrent
64 // outbound connection
65 peer_connection::peer_connection(
66 session_impl& ses
67 , boost::weak_ptr<torrent> tor
68 , shared_ptr<socket_type> s
69 , tcp::endpoint const& endp
70 , policy::peer* peerinfo)
72 #ifndef NDEBUG
73 m_last_choke(time_now() - hours(1))
75 #endif
76 m_ses(ses)
77 , m_max_out_request_queue(m_ses.settings().max_out_request_queue)
78 , m_last_piece(time_now())
79 , m_last_request(time_now())
80 , m_last_incoming_request(min_time())
81 , m_last_unchoke(min_time())
82 , m_last_receive(time_now())
83 , m_last_sent(time_now())
84 , m_requested(min_time())
85 , m_timeout_extend(0)
86 , m_remote_dl_update(time_now())
87 , m_connect(time_now())
88 , m_became_uninterested(time_now())
89 , m_became_uninteresting(time_now())
90 , m_free_upload(0)
91 , m_downloaded_at_last_unchoke(0)
92 , m_disk_recv_buffer(ses, 0)
93 , m_socket(s)
94 , m_remote(endp)
95 , m_torrent(tor)
96 , m_num_pieces(0)
97 , m_timeout(m_ses.settings().peer_timeout)
98 , m_packet_size(0)
99 , m_recv_pos(0)
100 , m_disk_recv_buffer_size(0)
101 , m_reading_bytes(0)
102 , m_num_invalid_requests(0)
103 , m_priority(1)
104 , m_upload_limit(bandwidth_limit::inf)
105 , m_download_limit(bandwidth_limit::inf)
106 , m_peer_info(peerinfo)
107 , m_speed(slow)
108 , m_connection_ticket(-1)
109 , m_remote_bytes_dled(0)
110 , m_remote_dl_rate(0)
111 , m_outstanding_writing_bytes(0)
112 , m_download_rate_peak(0)
113 , m_upload_rate_peak(0)
114 , m_rtt(0)
115 , m_prefer_whole_pieces(0)
116 , m_desired_queue_size(2)
117 , m_fast_reconnect(false)
118 , m_active(true)
119 , m_peer_interested(false)
120 , m_peer_choked(true)
121 , m_interesting(false)
122 , m_choked(true)
123 , m_failed(false)
124 , m_ignore_bandwidth_limits(false)
125 , m_have_all(false)
126 , m_disconnecting(false)
127 , m_connecting(true)
128 , m_queued(true)
129 , m_request_large_blocks(false)
130 , m_upload_only(false)
131 , m_snubbed(false)
132 , m_bitfield_received(false)
133 #ifndef NDEBUG
134 , m_in_constructor(true)
135 , m_disconnect_started(false)
136 , m_initialized(false)
137 #endif
139 m_channel_state[upload_channel] = peer_info::bw_idle;
140 m_channel_state[download_channel] = peer_info::bw_idle;
142 TORRENT_ASSERT(peerinfo == 0 || peerinfo->banned == false);
143 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
144 std::fill(m_country, m_country + 2, 0);
145 #ifndef TORRENT_DISABLE_GEO_IP
146 if (m_ses.has_country_db())
148 char const *country = m_ses.country_for_ip(m_remote.address());
149 if (country != 0)
151 m_country[0] = country[0];
152 m_country[1] = country[1];
155 #endif
156 #endif
157 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
158 m_logger = m_ses.create_log(m_remote.address().to_string() + "_"
159 + boost::lexical_cast<std::string>(m_remote.port()), m_ses.listen_port());
160 (*m_logger) << "*** OUTGOING CONNECTION\n";
161 #endif
162 #ifndef NDEBUG
163 piece_failed = false;
164 #endif
165 #ifndef TORRENT_DISABLE_GEO_IP
166 m_inet_as_name = m_ses.as_name_for_ip(m_remote.address());
167 #endif
169 std::fill(m_peer_id.begin(), m_peer_id.end(), 0);
172 // incoming connection
173 peer_connection::peer_connection(
174 session_impl& ses
175 , shared_ptr<socket_type> s
176 , tcp::endpoint const& endp
177 , policy::peer* peerinfo)
179 #ifndef NDEBUG
180 m_last_choke(time_now() - hours(1))
182 #endif
183 m_ses(ses)
184 , m_max_out_request_queue(m_ses.settings().max_out_request_queue)
185 , m_last_piece(time_now())
186 , m_last_request(time_now())
187 , m_last_incoming_request(min_time())
188 , m_last_unchoke(min_time())
189 , m_last_receive(time_now())
190 , m_last_sent(time_now())
191 , m_requested(min_time())
192 , m_timeout_extend(0)
193 , m_remote_dl_update(time_now())
194 , m_connect(time_now())
195 , m_became_uninterested(time_now())
196 , m_became_uninteresting(time_now())
197 , m_free_upload(0)
198 , m_downloaded_at_last_unchoke(0)
199 , m_disk_recv_buffer(ses, 0)
200 , m_socket(s)
201 , m_remote(endp)
202 , m_num_pieces(0)
203 , m_timeout(m_ses.settings().peer_timeout)
204 , m_packet_size(0)
205 , m_recv_pos(0)
206 , m_disk_recv_buffer_size(0)
207 , m_reading_bytes(0)
208 , m_num_invalid_requests(0)
209 , m_priority(1)
210 , m_upload_limit(bandwidth_limit::inf)
211 , m_download_limit(bandwidth_limit::inf)
212 , m_peer_info(peerinfo)
213 , m_speed(slow)
214 , m_connection_ticket(-1)
215 , m_remote_bytes_dled(0)
216 , m_remote_dl_rate(0)
217 , m_outstanding_writing_bytes(0)
218 , m_download_rate_peak(0)
219 , m_upload_rate_peak(0)
220 , m_rtt(0)
221 , m_prefer_whole_pieces(0)
222 , m_desired_queue_size(2)
223 , m_fast_reconnect(false)
224 , m_active(false)
225 , m_peer_interested(false)
226 , m_peer_choked(true)
227 , m_interesting(false)
228 , m_choked(true)
229 , m_failed(false)
230 , m_ignore_bandwidth_limits(false)
231 , m_have_all(false)
232 , m_disconnecting(false)
233 , m_connecting(false)
234 , m_queued(false)
235 , m_request_large_blocks(false)
236 , m_upload_only(false)
237 , m_snubbed(false)
238 , m_bitfield_received(false)
239 #ifndef NDEBUG
240 , m_in_constructor(true)
241 , m_disconnect_started(false)
242 , m_initialized(false)
243 #endif
245 m_channel_state[upload_channel] = peer_info::bw_idle;
246 m_channel_state[download_channel] = peer_info::bw_idle;
248 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
249 std::fill(m_country, m_country + 2, 0);
250 #ifndef TORRENT_DISABLE_GEO_IP
251 if (m_ses.has_country_db())
253 char const *country = m_ses.country_for_ip(m_remote.address());
254 if (country != 0)
256 m_country[0] = country[0];
257 m_country[1] = country[1];
260 #endif
261 #endif
263 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
264 error_code ec;
265 TORRENT_ASSERT(m_socket->remote_endpoint(ec) == m_remote || ec);
266 m_logger = m_ses.create_log(remote().address().to_string(ec) + "_"
267 + boost::lexical_cast<std::string>(remote().port()), m_ses.listen_port());
268 (*m_logger) << "*** INCOMING CONNECTION\n";
269 #endif
271 #ifndef TORRENT_DISABLE_GEO_IP
272 m_inet_as_name = m_ses.as_name_for_ip(m_remote.address());
273 #endif
274 #ifndef NDEBUG
275 piece_failed = false;
276 #endif
277 std::fill(m_peer_id.begin(), m_peer_id.end(), 0);
280 bool peer_connection::unchoke_compare(boost::intrusive_ptr<peer_connection const> const& p) const
282 TORRENT_ASSERT(p);
283 peer_connection const& rhs = *p;
285 // first compare how many bytes they've sent us
286 size_type c1 = m_statistics.total_payload_download() - m_downloaded_at_last_unchoke;
287 size_type c2 = rhs.m_statistics.total_payload_download() - rhs.m_downloaded_at_last_unchoke;
288 if (c1 > c2) return true;
289 if (c1 < c2) return false;
291 // if they are equal, compare how much we have uploaded
292 if (m_peer_info) c1 = m_peer_info->total_upload();
293 else c1 = m_statistics.total_payload_upload();
294 if (rhs.m_peer_info) c2 = rhs.m_peer_info->total_upload();
295 else c2 = rhs.m_statistics.total_payload_upload();
297 // in order to not switch back and forth too often,
298 // unchoked peers must be at least one piece ahead
299 // of a choked peer to be sorted at a lower unchoke-priority
300 boost::shared_ptr<torrent> t = m_torrent.lock();
301 TORRENT_ASSERT(t);
302 if (!is_choked()) c1 -= t->torrent_file().piece_length();
303 if (!rhs.is_choked()) c2 -= t->torrent_file().piece_length();
305 return c1 < c2;
308 void peer_connection::reset_choke_counters()
310 m_downloaded_at_last_unchoke = m_statistics.total_payload_download();
313 void peer_connection::start()
315 TORRENT_ASSERT(m_peer_info == 0 || m_peer_info->connection == this);
316 boost::shared_ptr<torrent> t = m_torrent.lock();
318 if (!t)
320 tcp::socket::non_blocking_io ioc(true);
321 error_code ec;
322 m_socket->io_control(ioc, ec);
323 if (ec)
325 disconnect(ec.message().c_str());
326 return;
328 m_remote = m_socket->remote_endpoint(ec);
329 if (ec)
331 disconnect(ec.message().c_str());
332 return;
334 if (m_remote.address().is_v4())
335 m_socket->set_option(type_of_service(m_ses.settings().peer_tos), ec);
337 else if (t->ready_for_connections())
339 init();
343 void peer_connection::update_interest()
345 boost::shared_ptr<torrent> t = m_torrent.lock();
346 TORRENT_ASSERT(t);
348 // if m_have_piece is 0, it means the connections
349 // have not been initialized yet. The interested
350 // flag will be updated once they are.
351 if (m_have_piece.size() == 0) return;
352 if (!t->ready_for_connections()) return;
354 bool interested = false;
355 if (!t->is_finished())
357 piece_picker const& p = t->picker();
358 int num_pieces = p.num_pieces();
359 for (int j = 0; j != num_pieces; ++j)
361 if (!p.have_piece(j)
362 && t->piece_priority(j) > 0
363 && m_have_piece[j])
365 interested = true;
366 break;
372 if (!interested) send_not_interested();
373 else t->get_policy().peer_is_interesting(*this);
375 // may throw an asio error if socket has disconnected
376 catch (std::exception&) {}
378 TORRENT_ASSERT(in_handshake() || is_interesting() == interested);
381 #ifndef TORRENT_DISABLE_EXTENSIONS
382 void peer_connection::add_extension(boost::shared_ptr<peer_plugin> ext)
384 m_extensions.push_back(ext);
386 #endif
388 void peer_connection::send_allowed_set()
390 INVARIANT_CHECK;
392 boost::shared_ptr<torrent> t = m_torrent.lock();
393 TORRENT_ASSERT(t);
395 int num_allowed_pieces = m_ses.settings().allowed_fast_set_size;
396 int num_pieces = t->torrent_file().num_pieces();
398 if (num_allowed_pieces >= num_pieces)
400 for (int i = 0; i < num_pieces; ++i)
402 #ifdef TORRENT_VERBOSE_LOGGING
403 (*m_logger) << time_now_string()
404 << " ==> ALLOWED_FAST [ " << i << " ]\n";
405 #endif
406 write_allow_fast(i);
407 m_accept_fast.insert(i);
409 return;
412 std::string x;
413 address const& addr = m_remote.address();
414 if (addr.is_v4())
416 address_v4::bytes_type bytes = addr.to_v4().to_bytes();
417 x.assign((char*)&bytes[0], bytes.size());
419 else
421 address_v6::bytes_type bytes = addr.to_v6().to_bytes();
422 x.assign((char*)&bytes[0], bytes.size());
424 x.append((char*)&t->torrent_file().info_hash()[0], 20);
426 sha1_hash hash = hasher(&x[0], x.size()).final();
427 for (;;)
429 char* p = (char*)&hash[0];
430 for (int i = 0; i < 5; ++i)
432 int piece = detail::read_uint32(p) % num_pieces;
433 if (m_accept_fast.find(piece) == m_accept_fast.end())
435 #ifdef TORRENT_VERBOSE_LOGGING
436 (*m_logger) << time_now_string()
437 << " ==> ALLOWED_FAST [ " << piece << " ]\n";
438 #endif
439 write_allow_fast(piece);
440 m_accept_fast.insert(piece);
441 if (int(m_accept_fast.size()) >= num_allowed_pieces
442 || int(m_accept_fast.size()) == num_pieces) return;
445 hash = hasher((char*)&hash[0], 20).final();
449 void peer_connection::on_metadata_impl()
451 boost::shared_ptr<torrent> t = associated_torrent().lock();
452 m_have_piece.resize(t->torrent_file().num_pieces(), m_have_all);
453 m_num_pieces = m_have_piece.count();
454 if (m_num_pieces == int(m_have_piece.size()))
456 #ifdef TORRENT_VERBOSE_LOGGING
457 (*m_logger) << time_now_string()
458 << " *** on_metadata(): THIS IS A SEED ***\n";
459 #endif
460 // if this is a web seed. we don't have a peer_info struct
461 if (m_peer_info) m_peer_info->seed = true;
462 m_upload_only = true;
464 t->peer_has_all();
465 disconnect_if_redundant();
466 if (m_disconnecting) return;
468 on_metadata();
469 if (m_disconnecting) return;
471 if (!t->is_finished())
472 t->get_policy().peer_is_interesting(*this);
474 return;
476 TORRENT_ASSERT(!m_have_all);
478 on_metadata();
479 if (m_disconnecting) return;
481 // let the torrent know which pieces the
482 // peer has
483 // if we're a seed, we don't keep track of piece availability
484 bool interesting = false;
485 if (!t->is_seed())
487 t->peer_has(m_have_piece);
489 for (int i = 0; i < (int)m_have_piece.size(); ++i)
491 if (m_have_piece[i])
493 if (!t->have_piece(i) && t->picker().piece_priority(i) != 0)
494 interesting = true;
499 if (interesting) t->get_policy().peer_is_interesting(*this);
500 else if (upload_only()) disconnect("upload to upload connections");
503 void peer_connection::init()
505 INVARIANT_CHECK;
507 boost::shared_ptr<torrent> t = m_torrent.lock();
508 TORRENT_ASSERT(t);
509 TORRENT_ASSERT(t->valid_metadata());
510 TORRENT_ASSERT(t->ready_for_connections());
512 m_have_piece.resize(t->torrent_file().num_pieces(), m_have_all);
514 if (m_have_all) m_num_pieces = t->torrent_file().num_pieces();
515 #ifndef NDEBUG
516 m_initialized = true;
517 #endif
518 // now that we have a piece_picker,
519 // update it with this peer's pieces
521 TORRENT_ASSERT(m_num_pieces == m_have_piece.count());
523 if (m_num_pieces == int(m_have_piece.size()))
525 #ifdef TORRENT_VERBOSE_LOGGING
526 (*m_logger) << " *** THIS IS A SEED ***\n";
527 #endif
528 // if this is a web seed. we don't have a peer_info struct
529 if (m_peer_info) m_peer_info->seed = true;
530 m_upload_only = true;
532 t->peer_has_all();
533 if (t->is_finished()) send_not_interested();
534 else t->get_policy().peer_is_interesting(*this);
535 return;
538 // if we're a seed, we don't keep track of piece availability
539 if (!t->is_seed())
541 t->peer_has(m_have_piece);
542 bool interesting = false;
543 for (int i = 0; i < int(m_have_piece.size()); ++i)
545 if (m_have_piece[i])
547 // if the peer has a piece and we don't, the peer is interesting
548 if (!t->have_piece(i)
549 && t->picker().piece_priority(i) != 0)
550 interesting = true;
553 if (interesting) t->get_policy().peer_is_interesting(*this);
554 else send_not_interested();
556 else
558 update_interest();
562 peer_connection::~peer_connection()
564 // INVARIANT_CHECK;
565 TORRENT_ASSERT(!m_in_constructor);
566 TORRENT_ASSERT(m_disconnecting);
567 TORRENT_ASSERT(m_disconnect_started);
569 m_disk_recv_buffer_size = 0;
571 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
572 if (m_logger)
574 (*m_logger) << time_now_string()
575 << " *** CONNECTION CLOSED\n";
577 #endif
578 TORRENT_ASSERT(!m_ses.has_peer(this));
579 #ifndef NDEBUG
580 for (aux::session_impl::torrent_map::const_iterator i = m_ses.m_torrents.begin()
581 , end(m_ses.m_torrents.end()); i != end; ++i)
582 TORRENT_ASSERT(!i->second->has_peer(this));
583 if (m_peer_info)
584 TORRENT_ASSERT(m_peer_info->connection == 0);
586 boost::shared_ptr<torrent> t = m_torrent.lock();
587 #endif
590 int peer_connection::picker_options() const
592 int ret = 0;
593 boost::shared_ptr<torrent> t = m_torrent.lock();
594 TORRENT_ASSERT(t);
595 if (!t) return 0;
597 if (t->is_sequential_download())
599 ret |= piece_picker::sequential;
601 else if (t->num_have() < t->settings().initial_picker_threshold)
603 // if we have fewer pieces than a certain threshols
604 // don't pick rare pieces, just pick random ones,
605 // and prioritize finishing them
606 ret |= piece_picker::prioritize_partials;
608 else
610 ret |= piece_picker::rarest_first;
613 if (m_snubbed)
615 // snubbed peers should request
616 // the common pieces first, just to make
617 // it more likely for all snubbed peers to
618 // request blocks from the same piece
619 ret |= piece_picker::reverse;
622 if (t->settings().prioritize_partial_pieces)
623 ret |= piece_picker::prioritize_partials;
625 if (on_parole()) ret |= piece_picker::on_parole
626 | piece_picker::prioritize_partials;
628 // only one of rarest_first, common_first and sequential can be set.
629 TORRENT_ASSERT(bool(ret & piece_picker::rarest_first)
630 + bool(ret & piece_picker::sequential) <= 1);
631 return ret;
634 void peer_connection::fast_reconnect(bool r)
636 if (!peer_info_struct() || peer_info_struct()->fast_reconnects > 1)
637 return;
638 m_fast_reconnect = r;
639 peer_info_struct()->connected = time_now()
640 - seconds(m_ses.settings().min_reconnect_time
641 * m_ses.settings().max_failcount);
642 ++peer_info_struct()->fast_reconnects;
645 void peer_connection::announce_piece(int index)
647 // dont announce during handshake
648 if (in_handshake()) return;
650 // remove suggested pieces that we have
651 std::vector<int>::iterator i = std::find(
652 m_suggested_pieces.begin(), m_suggested_pieces.end(), index);
653 if (i != m_suggested_pieces.end()) m_suggested_pieces.erase(i);
655 if (has_piece(index))
657 // if we got a piece that this peer has
658 // it might have been the last interesting
659 // piece this peer had. We might not be
660 // interested anymore
661 update_interest();
662 if (is_disconnecting()) return;
664 // optimization, don't send have messages
665 // to peers that already have the piece
666 if (!m_ses.settings().send_redundant_have)
668 #ifdef TORRENT_VERBOSE_LOGGING
669 (*m_logger) << time_now_string()
670 << " ==> HAVE [ piece: " << index << " ] SUPRESSED\n";
671 #endif
672 return;
676 #ifdef TORRENT_VERBOSE_LOGGING
677 (*m_logger) << time_now_string()
678 << " ==> HAVE [ piece: " << index << "]\n";
679 #endif
680 write_have(index);
681 #ifndef NDEBUG
682 boost::shared_ptr<torrent> t = m_torrent.lock();
683 TORRENT_ASSERT(t);
684 TORRENT_ASSERT(t->have_piece(index));
685 #endif
688 bool peer_connection::has_piece(int i) const
690 boost::shared_ptr<torrent> t = m_torrent.lock();
691 TORRENT_ASSERT(t);
692 TORRENT_ASSERT(t->valid_metadata());
693 TORRENT_ASSERT(i >= 0);
694 TORRENT_ASSERT(i < t->torrent_file().num_pieces());
695 return m_have_piece[i];
698 std::deque<piece_block> const& peer_connection::request_queue() const
700 return m_request_queue;
703 std::deque<pending_block> const& peer_connection::download_queue() const
705 return m_download_queue;
708 std::deque<peer_request> const& peer_connection::upload_queue() const
710 return m_requests;
713 void peer_connection::add_stat(size_type downloaded, size_type uploaded)
715 m_statistics.add_stat(downloaded, uploaded);
718 bitfield const& peer_connection::get_bitfield() const
720 return m_have_piece;
723 void peer_connection::received_valid_data(int index)
725 INVARIANT_CHECK;
727 #ifndef TORRENT_DISABLE_EXTENSIONS
728 for (extension_list_t::iterator i = m_extensions.begin()
729 , end(m_extensions.end()); i != end; ++i)
731 #ifdef BOOST_NO_EXCEPTIONS
732 (*i)->on_piece_pass(index);
733 #else
734 try { (*i)->on_piece_pass(index); } catch (std::exception&) {}
735 #endif
737 #endif
740 void peer_connection::received_invalid_data(int index)
742 INVARIANT_CHECK;
744 #ifndef TORRENT_DISABLE_EXTENSIONS
745 for (extension_list_t::iterator i = m_extensions.begin()
746 , end(m_extensions.end()); i != end; ++i)
748 #ifdef BOOST_NO_EXCEPTIONS
749 (*i)->on_piece_failed(index);
750 #else
751 try { (*i)->on_piece_failed(index); } catch (std::exception&) {}
752 #endif
754 #endif
755 if (is_disconnecting()) return;
757 if (peer_info_struct())
759 if (m_ses.settings().use_parole_mode)
760 peer_info_struct()->on_parole = true;
762 ++peer_info_struct()->hashfails;
763 boost::int8_t& trust_points = peer_info_struct()->trust_points;
765 // we decrease more than we increase, to keep the
766 // allowed failed/passed ratio low.
767 // TODO: make this limit user settable
768 trust_points -= 2;
769 if (trust_points < -7) trust_points = -7;
773 size_type peer_connection::total_free_upload() const
775 return m_free_upload;
778 void peer_connection::add_free_upload(size_type free_upload)
780 INVARIANT_CHECK;
782 m_free_upload += free_upload;
785 // verifies a piece to see if it is valid (is within a valid range)
786 // and if it can correspond to a request generated by libtorrent.
787 bool peer_connection::verify_piece(const peer_request& p) const
789 boost::shared_ptr<torrent> t = m_torrent.lock();
790 TORRENT_ASSERT(t);
792 TORRENT_ASSERT(t->valid_metadata());
793 torrent_info const& ti = t->torrent_file();
795 return p.piece >= 0
796 && p.piece < t->torrent_file().num_pieces()
797 && p.length > 0
798 && p.start >= 0
799 && (p.length == t->block_size()
800 || (p.length < t->block_size()
801 && p.piece == ti.num_pieces()-1
802 && p.start + p.length == ti.piece_size(p.piece))
803 || (m_request_large_blocks
804 && p.length <= ti.piece_length() * m_prefer_whole_pieces == 0 ?
805 1 : m_prefer_whole_pieces))
806 && p.piece * size_type(ti.piece_length()) + p.start + p.length
807 <= ti.total_size()
808 && (p.start % t->block_size() == 0);
811 void peer_connection::attach_to_torrent(sha1_hash const& ih)
813 INVARIANT_CHECK;
815 TORRENT_ASSERT(!m_disconnecting);
816 TORRENT_ASSERT(m_torrent.expired());
817 boost::weak_ptr<torrent> wpt = m_ses.find_torrent(ih);
818 boost::shared_ptr<torrent> t = wpt.lock();
820 if (t && t->is_aborted())
822 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
823 (*m_logger) << " *** the torrent has been aborted\n";
824 #endif
825 t.reset();
828 if (!t)
830 // we couldn't find the torrent!
831 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
832 (*m_logger) << " *** couldn't find a torrent with the given info_hash: " << ih << "\n";
833 (*m_logger) << " torrents:\n";
834 session_impl::torrent_map const& torrents = m_ses.m_torrents;
835 for (session_impl::torrent_map::const_iterator i = torrents.begin()
836 , end(torrents.end()); i != end; ++i)
838 (*m_logger) << " " << i->second->torrent_file().info_hash() << "\n";
840 #endif
841 disconnect("got invalid info-hash", 2);
842 return;
845 if (t->is_paused())
847 // paused torrents will not accept
848 // incoming connections
849 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
850 (*m_logger) << " rejected connection to paused torrent\n";
851 #endif
852 disconnect("connection rejected bacause torrent is paused");
853 return;
856 TORRENT_ASSERT(m_torrent.expired());
857 // check to make sure we don't have another connection with the same
858 // info_hash and peer_id. If we do. close this connection.
859 #ifndef NDEBUG
862 #endif
863 t->attach_peer(this);
864 #ifndef NDEBUG
866 catch (std::exception& e)
868 std::cout << e.what() << std::endl;
869 TORRENT_ASSERT(false);
871 #endif
872 if (m_disconnecting) return;
873 m_torrent = wpt;
875 TORRENT_ASSERT(!m_torrent.expired());
877 // if the torrent isn't ready to accept
878 // connections yet, we'll have to wait with
879 // our initialization
880 if (t->ready_for_connections()) init();
882 TORRENT_ASSERT(!m_torrent.expired());
884 // assume the other end has no pieces
885 // if we don't have valid metadata yet,
886 // leave the vector unallocated
887 TORRENT_ASSERT(m_num_pieces == 0);
888 m_have_piece.clear_all();
889 TORRENT_ASSERT(!m_torrent.expired());
892 // message handlers
894 // -----------------------------
895 // --------- KEEPALIVE ---------
896 // -----------------------------
898 void peer_connection::incoming_keepalive()
900 INVARIANT_CHECK;
902 #ifdef TORRENT_VERBOSE_LOGGING
903 (*m_logger) << time_now_string() << " <== KEEPALIVE\n";
904 #endif
907 // -----------------------------
908 // ----------- CHOKE -----------
909 // -----------------------------
911 void peer_connection::incoming_choke()
913 INVARIANT_CHECK;
915 boost::shared_ptr<torrent> t = m_torrent.lock();
916 TORRENT_ASSERT(t);
918 #ifndef TORRENT_DISABLE_EXTENSIONS
919 for (extension_list_t::iterator i = m_extensions.begin()
920 , end(m_extensions.end()); i != end; ++i)
922 if ((*i)->on_choke()) return;
924 #endif
925 if (is_disconnecting()) return;
927 #ifdef TORRENT_VERBOSE_LOGGING
928 (*m_logger) << time_now_string() << " <== CHOKE\n";
929 #endif
930 m_peer_choked = true;
932 if (peer_info_struct() == 0 || !peer_info_struct()->on_parole)
934 // if the peer is not in parole mode, clear the queued
935 // up block requests
936 if (!t->is_seed())
938 piece_picker& p = t->picker();
939 for (std::deque<piece_block>::const_iterator i = m_request_queue.begin()
940 , end(m_request_queue.end()); i != end; ++i)
942 // since this piece was skipped, clear it and allow it to
943 // be requested from other peers
944 p.abort_download(*i);
947 m_request_queue.clear();
951 bool match_request(peer_request const& r, piece_block const& b, int block_size)
953 if (b.piece_index != r.piece) return false;
954 if (b.block_index != r.start / block_size) return false;
955 if (r.start % block_size != 0) return false;
956 return true;
959 // -----------------------------
960 // -------- REJECT PIECE -------
961 // -----------------------------
963 void peer_connection::incoming_reject_request(peer_request const& r)
965 INVARIANT_CHECK;
967 boost::shared_ptr<torrent> t = m_torrent.lock();
968 TORRENT_ASSERT(t);
970 #ifndef TORRENT_DISABLE_EXTENSIONS
971 for (extension_list_t::iterator i = m_extensions.begin()
972 , end(m_extensions.end()); i != end; ++i)
974 if ((*i)->on_reject(r)) return;
976 #endif
978 if (is_disconnecting()) return;
980 std::deque<pending_block>::iterator i = std::find_if(
981 m_download_queue.begin(), m_download_queue.end()
982 , bind(match_request, boost::cref(r), bind(&pending_block::block, _1)
983 , t->block_size()));
985 #ifdef TORRENT_VERBOSE_LOGGING
986 (*m_logger) << time_now_string()
987 << " <== REJECT_PIECE [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
988 #endif
990 piece_block b(-1, 0);
991 if (i != m_download_queue.end())
993 b = i->block;
994 m_download_queue.erase(i);
996 // if the peer is in parole mode, keep the request
997 if (peer_info_struct() && peer_info_struct()->on_parole)
999 m_request_queue.push_front(b);
1001 else if (!t->is_seed())
1003 piece_picker& p = t->picker();
1004 p.abort_download(b);
1007 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1008 else
1010 (*m_logger) << time_now_string()
1011 << " *** PIECE NOT IN REQUEST QUEUE\n";
1013 #endif
1014 if (has_peer_choked())
1016 // if we're choked and we got a rejection of
1017 // a piece in the allowed fast set, remove it
1018 // from the allow fast set.
1019 std::vector<int>::iterator i = std::find(
1020 m_allowed_fast.begin(), m_allowed_fast.end(), r.piece);
1021 if (i != m_allowed_fast.end()) m_allowed_fast.erase(i);
1023 else
1025 std::vector<int>::iterator i = std::find(m_suggested_pieces.begin()
1026 , m_suggested_pieces.end(), r.piece);
1027 if (i != m_suggested_pieces.end())
1028 m_suggested_pieces.erase(i);
1031 if (m_request_queue.empty() && m_download_queue.size() < 2)
1033 request_a_block(*t, *this);
1034 send_block_requests();
1038 // -----------------------------
1039 // ------- SUGGEST PIECE -------
1040 // -----------------------------
1042 void peer_connection::incoming_suggest(int index)
1044 INVARIANT_CHECK;
1046 #ifdef TORRENT_VERBOSE_LOGGING
1047 (*m_logger) << time_now_string()
1048 << " <== SUGGEST_PIECE [ piece: " << index << " ]\n";
1049 #endif
1050 boost::shared_ptr<torrent> t = m_torrent.lock();
1051 if (!t) return;
1053 #ifndef TORRENT_DISABLE_EXTENSIONS
1054 for (extension_list_t::iterator i = m_extensions.begin()
1055 , end(m_extensions.end()); i != end; ++i)
1057 if ((*i)->on_suggest(index)) return;
1059 #endif
1061 if (is_disconnecting()) return;
1062 if (t->have_piece(index)) return;
1064 if (m_suggested_pieces.size() > 9)
1065 m_suggested_pieces.erase(m_suggested_pieces.begin());
1066 m_suggested_pieces.push_back(index);
1068 #ifdef TORRENT_VERBOSE_LOGGING
1069 (*m_logger) << time_now_string()
1070 << " ** SUGGEST_PIECE [ piece: " << index << " added to set: " << m_suggested_pieces.size() << " ]\n";
1071 #endif
1074 // -----------------------------
1075 // ---------- UNCHOKE ----------
1076 // -----------------------------
1078 void peer_connection::incoming_unchoke()
1080 INVARIANT_CHECK;
1082 boost::shared_ptr<torrent> t = m_torrent.lock();
1083 TORRENT_ASSERT(t);
1085 #ifndef TORRENT_DISABLE_EXTENSIONS
1086 for (extension_list_t::iterator i = m_extensions.begin()
1087 , end(m_extensions.end()); i != end; ++i)
1089 if ((*i)->on_unchoke()) return;
1091 #endif
1093 #ifdef TORRENT_VERBOSE_LOGGING
1094 (*m_logger) << time_now_string() << " <== UNCHOKE\n";
1095 #endif
1096 m_peer_choked = false;
1097 if (is_disconnecting()) return;
1099 t->get_policy().unchoked(*this);
1102 // -----------------------------
1103 // -------- INTERESTED ---------
1104 // -----------------------------
1106 void peer_connection::incoming_interested()
1108 INVARIANT_CHECK;
1110 boost::shared_ptr<torrent> t = m_torrent.lock();
1111 TORRENT_ASSERT(t);
1113 #ifndef TORRENT_DISABLE_EXTENSIONS
1114 for (extension_list_t::iterator i = m_extensions.begin()
1115 , end(m_extensions.end()); i != end; ++i)
1117 if ((*i)->on_interested()) return;
1119 #endif
1121 #ifdef TORRENT_VERBOSE_LOGGING
1122 (*m_logger) << time_now_string() << " <== INTERESTED\n";
1123 #endif
1124 m_peer_interested = true;
1125 if (is_disconnecting()) return;
1126 t->get_policy().interested(*this);
1129 // -----------------------------
1130 // ------ NOT INTERESTED -------
1131 // -----------------------------
1133 void peer_connection::incoming_not_interested()
1135 INVARIANT_CHECK;
1137 #ifndef TORRENT_DISABLE_EXTENSIONS
1138 for (extension_list_t::iterator i = m_extensions.begin()
1139 , end(m_extensions.end()); i != end; ++i)
1141 if ((*i)->on_not_interested()) return;
1143 #endif
1145 m_became_uninterested = time_now();
1147 #ifdef TORRENT_VERBOSE_LOGGING
1148 (*m_logger) << time_now_string() << " <== NOT_INTERESTED\n";
1149 #endif
1150 m_peer_interested = false;
1151 if (is_disconnecting()) return;
1153 boost::shared_ptr<torrent> t = m_torrent.lock();
1154 TORRENT_ASSERT(t);
1156 t->get_policy().not_interested(*this);
1159 // -----------------------------
1160 // ----------- HAVE ------------
1161 // -----------------------------
1163 void peer_connection::incoming_have(int index)
1165 INVARIANT_CHECK;
1167 boost::shared_ptr<torrent> t = m_torrent.lock();
1168 TORRENT_ASSERT(t);
1170 #ifndef TORRENT_DISABLE_EXTENSIONS
1171 for (extension_list_t::iterator i = m_extensions.begin()
1172 , end(m_extensions.end()); i != end; ++i)
1174 if ((*i)->on_have(index)) return;
1176 #endif
1178 if (is_disconnecting()) return;
1180 // if we haven't received a bitfield, it was
1181 // probably omitted, which is the same as 'have_none'
1182 if (!m_bitfield_received) incoming_have_none();
1184 #ifdef TORRENT_VERBOSE_LOGGING
1185 (*m_logger) << time_now_string()
1186 << " <== HAVE [ piece: " << index << "]\n";
1187 #endif
1189 if (is_disconnecting()) return;
1191 if (!t->valid_metadata() && index > int(m_have_piece.size()))
1193 if (index < 65536)
1195 // if we don't have metadata
1196 // and we might not have received a bitfield
1197 // extend the bitmask to fit the new
1198 // have message
1199 m_have_piece.resize(index + 1, false);
1201 else
1203 // unless the index > 64k, in which case
1204 // we just ignore it
1205 return;
1209 // if we got an invalid message, abort
1210 if (index >= int(m_have_piece.size()) || index < 0)
1212 disconnect("got 'have'-message with higher index than the number of pieces", 2);
1213 return;
1216 if (m_have_piece[index])
1218 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1219 (*m_logger) << " got redundant HAVE message for index: " << index << "\n";
1220 #endif
1222 else
1224 m_have_piece.set_bit(index);
1225 ++m_num_pieces;
1227 // only update the piece_picker if
1228 // we have the metadata and if
1229 // we're not a seed (in which case
1230 // we won't have a piece picker)
1231 if (t->valid_metadata())
1233 t->peer_has(index);
1235 if (!t->have_piece(index)
1236 && !t->is_seed()
1237 && !is_interesting()
1238 && t->picker().piece_priority(index) != 0)
1239 t->get_policy().peer_is_interesting(*this);
1241 // this will disregard all have messages we get within
1242 // the first two seconds. Since some clients implements
1243 // lazy bitfields, these will not be reliable to use
1244 // for an estimated peer download rate.
1245 if (!peer_info_struct() || time_now() - peer_info_struct()->connected > seconds(2))
1247 // update bytes downloaded since last timer
1248 m_remote_bytes_dled += t->torrent_file().piece_size(index);
1252 if (is_seed())
1254 m_peer_info->seed = true;
1255 m_upload_only = true;
1256 disconnect_if_redundant();
1257 if (is_disconnecting()) return;
1262 // -----------------------------
1263 // --------- BITFIELD ----------
1264 // -----------------------------
1266 void peer_connection::incoming_bitfield(bitfield const& bits)
1268 INVARIANT_CHECK;
1270 boost::shared_ptr<torrent> t = m_torrent.lock();
1271 TORRENT_ASSERT(t);
1273 #ifndef TORRENT_DISABLE_EXTENSIONS
1274 for (extension_list_t::iterator i = m_extensions.begin()
1275 , end(m_extensions.end()); i != end; ++i)
1277 if ((*i)->on_bitfield(bits)) return;
1279 #endif
1281 if (is_disconnecting()) return;
1283 #ifdef TORRENT_VERBOSE_LOGGING
1284 (*m_logger) << time_now_string() << " <== BITFIELD ";
1286 for (int i = 0; i < int(bits.size()); ++i)
1288 if (bits[i]) (*m_logger) << "1";
1289 else (*m_logger) << "0";
1291 (*m_logger) << "\n";
1292 #endif
1294 // if we don't have the metedata, we cannot
1295 // verify the bitfield size
1296 if (t->valid_metadata()
1297 && (bits.size() + 7) / 8 != (m_have_piece.size() + 7) / 8)
1299 std::stringstream msg;
1300 msg << "got bitfield with invalid size: " << ((bits.size() + 7) / 8)
1301 << "bytes. expected: " << ((m_have_piece.size() + 7) / 8)
1302 << " bytes";
1303 disconnect(msg.str().c_str(), 2);
1304 return;
1307 m_bitfield_received = true;
1309 // if we don't have metadata yet
1310 // just remember the bitmask
1311 // don't update the piecepicker
1312 // (since it doesn't exist yet)
1313 if (!t->ready_for_connections())
1315 m_have_piece = bits;
1316 m_num_pieces = bits.count();
1317 if (m_peer_info) m_peer_info->seed = (m_num_pieces == int(bits.size()));
1318 return;
1321 TORRENT_ASSERT(t->valid_metadata());
1323 int num_pieces = bits.count();
1324 if (num_pieces == int(m_have_piece.size()))
1326 #ifdef TORRENT_VERBOSE_LOGGING
1327 (*m_logger) << " *** THIS IS A SEED ***\n";
1328 #endif
1329 // if this is a web seed. we don't have a peer_info struct
1330 if (m_peer_info) m_peer_info->seed = true;
1331 m_upload_only = true;
1333 m_have_piece.set_all();
1334 m_num_pieces = num_pieces;
1335 t->peer_has_all();
1336 if (!t->is_finished())
1337 t->get_policy().peer_is_interesting(*this);
1339 disconnect_if_redundant();
1341 return;
1344 // let the torrent know which pieces the
1345 // peer has
1346 // if we're a seed, we don't keep track of piece availability
1347 bool interesting = false;
1348 if (!t->is_seed())
1350 t->peer_has(bits);
1352 for (int i = 0; i < (int)m_have_piece.size(); ++i)
1354 bool have = bits[i];
1355 if (have && !m_have_piece[i])
1357 if (!t->have_piece(i) && t->picker().piece_priority(i) != 0)
1358 interesting = true;
1360 else if (!have && m_have_piece[i])
1362 // this should probably not be allowed
1363 t->peer_lost(i);
1368 m_have_piece = bits;
1369 m_num_pieces = num_pieces;
1371 if (interesting) t->get_policy().peer_is_interesting(*this);
1372 else if (upload_only()) disconnect("upload to upload connections");
1375 void peer_connection::disconnect_if_redundant()
1377 if (!m_ses.settings().close_redundant_connections) return;
1379 boost::shared_ptr<torrent> t = m_torrent.lock();
1380 TORRENT_ASSERT(t);
1381 if (m_upload_only && t->is_finished())
1382 disconnect("seed to seed");
1384 if (m_upload_only
1385 && !m_interesting
1386 && m_bitfield_received
1387 && t->are_files_checked())
1388 disconnect("uninteresting upload-only peer");
1391 // -----------------------------
1392 // ---------- REQUEST ----------
1393 // -----------------------------
1395 void peer_connection::incoming_request(peer_request const& r)
1397 INVARIANT_CHECK;
1399 boost::shared_ptr<torrent> t = m_torrent.lock();
1400 TORRENT_ASSERT(t);
1402 // if we haven't received a bitfield, it was
1403 // probably omitted, which is the same as 'have_none'
1404 if (!m_bitfield_received) incoming_have_none();
1405 if (is_disconnecting()) return;
1407 #ifndef TORRENT_DISABLE_EXTENSIONS
1408 for (extension_list_t::iterator i = m_extensions.begin()
1409 , end(m_extensions.end()); i != end; ++i)
1411 if ((*i)->on_request(r)) return;
1413 #endif
1414 if (is_disconnecting()) return;
1416 if (!t->valid_metadata())
1418 // if we don't have valid metadata yet,
1419 // we shouldn't get a request
1420 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1421 (*m_logger) << time_now_string()
1422 << " <== UNEXPECTED_REQUEST [ "
1423 "piece: " << r.piece << " | "
1424 "s: " << r.start << " | "
1425 "l: " << r.length << " | "
1426 "i: " << m_peer_interested << " | "
1427 "t: " << t->torrent_file().piece_size(r.piece) << " | "
1428 "n: " << t->torrent_file().num_pieces() << " ]\n";
1430 (*m_logger) << time_now_string()
1431 << " ==> REJECT_PIECE [ "
1432 "piece: " << r.piece << " | "
1433 "s: " << r.start << " | "
1434 "l: " << r.length << " ]\n";
1435 #endif
1436 write_reject_request(r);
1437 return;
1440 if (int(m_requests.size()) > m_ses.settings().max_allowed_in_request_queue)
1442 // don't allow clients to abuse our
1443 // memory consumption.
1444 // ignore requests if the client
1445 // is making too many of them.
1446 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1447 (*m_logger) << time_now_string()
1448 << " <== TOO MANY REQUESTS [ "
1449 "piece: " << r.piece << " | "
1450 "s: " << r.start << " | "
1451 "l: " << r.length << " | "
1452 "i: " << m_peer_interested << " | "
1453 "t: " << t->torrent_file().piece_size(r.piece) << " | "
1454 "n: " << t->torrent_file().num_pieces() << " ]\n";
1456 (*m_logger) << time_now_string()
1457 << " ==> REJECT_PIECE [ "
1458 "piece: " << r.piece << " | "
1459 "s: " << r.start << " | "
1460 "l: " << r.length << " ]\n";
1461 #endif
1462 write_reject_request(r);
1463 return;
1466 // make sure this request
1467 // is legal and that the peer
1468 // is not choked
1469 if (r.piece >= 0
1470 && r.piece < t->torrent_file().num_pieces()
1471 && t->have_piece(r.piece)
1472 && r.start >= 0
1473 && r.start < t->torrent_file().piece_size(r.piece)
1474 && r.length > 0
1475 && r.length + r.start <= t->torrent_file().piece_size(r.piece)
1476 && m_peer_interested
1477 && r.length <= t->block_size())
1479 #ifdef TORRENT_VERBOSE_LOGGING
1480 (*m_logger) << time_now_string()
1481 << " <== REQUEST [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
1482 #endif
1483 // if we have choked the client
1484 // ignore the request
1485 if (m_choked && m_accept_fast.find(r.piece) == m_accept_fast.end())
1487 write_reject_request(r);
1488 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1489 (*m_logger) << time_now_string()
1490 << " *** REJECTING REQUEST [ peer choked and piece not in allowed fast set ]\n";
1491 (*m_logger) << time_now_string()
1492 << " ==> REJECT_PIECE [ "
1493 "piece: " << r.piece << " | "
1494 "s: " << r.start << " | "
1495 "l: " << r.length << " ]\n";
1496 #endif
1498 else
1500 m_requests.push_back(r);
1501 m_last_incoming_request = time_now();
1502 fill_send_buffer();
1505 else
1507 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1508 (*m_logger) << time_now_string()
1509 << " <== INVALID_REQUEST [ "
1510 "piece: " << r.piece << " | "
1511 "s: " << r.start << " | "
1512 "l: " << r.length << " | "
1513 "i: " << m_peer_interested << " | "
1514 "t: " << t->torrent_file().piece_size(r.piece) << " | "
1515 "n: " << t->torrent_file().num_pieces() << " | "
1516 "h: " << t->have_piece(r.piece) << " | "
1517 "block_limit: " << t->block_size() << " ]\n";
1519 (*m_logger) << time_now_string()
1520 << " ==> REJECT_PIECE [ "
1521 "piece: " << r.piece << " | "
1522 "s: " << r.start << " | "
1523 "l: " << r.length << " ]\n";
1524 #endif
1526 write_reject_request(r);
1527 ++m_num_invalid_requests;
1529 if (t->alerts().should_post<invalid_request_alert>())
1531 t->alerts().post_alert(invalid_request_alert(
1532 t->get_handle(), m_remote, m_peer_id, r));
1537 void peer_connection::incoming_piece_fragment()
1539 m_last_piece = time_now();
1542 #ifndef NDEBUG
1543 struct check_postcondition
1545 check_postcondition(boost::shared_ptr<torrent> const& t_
1546 , bool init_check = true): t(t_) { if (init_check) check(); }
1548 ~check_postcondition() { check(); }
1550 void check()
1552 if (!t->is_seed())
1554 const int blocks_per_piece = static_cast<int>(
1555 t->torrent_file().piece_length() / t->block_size());
1557 std::vector<piece_picker::downloading_piece> const& dl_queue
1558 = t->picker().get_download_queue();
1560 for (std::vector<piece_picker::downloading_piece>::const_iterator i =
1561 dl_queue.begin(); i != dl_queue.end(); ++i)
1563 TORRENT_ASSERT(i->finished <= blocks_per_piece);
1568 shared_ptr<torrent> t;
1570 #endif
1573 // -----------------------------
1574 // ----------- PIECE -----------
1575 // -----------------------------
1577 void peer_connection::incoming_piece(peer_request const& p, char const* data)
1579 char* buffer = m_ses.allocate_disk_buffer();
1580 if (buffer == 0)
1582 disconnect("out of memory");
1583 return;
1585 disk_buffer_holder holder(m_ses, buffer);
1586 std::memcpy(buffer, data, p.length);
1587 incoming_piece(p, holder);
1590 void peer_connection::incoming_piece(peer_request const& p, disk_buffer_holder& data)
1592 INVARIANT_CHECK;
1594 boost::shared_ptr<torrent> t = m_torrent.lock();
1595 TORRENT_ASSERT(t);
1597 TORRENT_ASSERT(!m_disk_recv_buffer);
1598 TORRENT_ASSERT(m_disk_recv_buffer_size == 0);
1600 #ifdef TORRENT_CORRUPT_DATA
1601 // corrupt all pieces from certain peers
1602 if (m_remote.address().is_v4()
1603 && (m_remote.address().to_v4().to_ulong() & 0xf) == 0)
1605 data.get()[0] = ~data.get()[0];
1607 #endif
1609 // if we haven't received a bitfield, it was
1610 // probably omitted, which is the same as 'have_none'
1611 if (!m_bitfield_received) incoming_have_none();
1612 if (is_disconnecting()) return;
1614 #ifndef TORRENT_DISABLE_EXTENSIONS
1615 for (extension_list_t::iterator i = m_extensions.begin()
1616 , end(m_extensions.end()); i != end; ++i)
1618 if ((*i)->on_piece(p, data)) return;
1620 #endif
1621 if (is_disconnecting()) return;
1623 #ifndef NDEBUG
1624 check_postcondition post_checker_(t);
1625 #if !defined TORRENT_DISABLE_INVARIANT_CHECKS && defined TORRENT_EXPENSIVE_INVARIANT_CHECKS
1626 t->check_invariant();
1627 #endif
1628 #endif
1630 #ifdef TORRENT_VERBOSE_LOGGING
1631 (*m_logger) << time_now_string()
1632 << " <== PIECE [ piece: " << p.piece << " | "
1633 "s: " << p.start << " | "
1634 "l: " << p.length << " | "
1635 "ds: " << statistics().download_rate() << " | "
1636 "qs: " << int(m_desired_queue_size) << " ]\n";
1637 #endif
1639 if (p.length == 0)
1641 if (t->alerts().should_post<peer_error_alert>())
1643 t->alerts().post_alert(peer_error_alert(t->get_handle(), m_remote
1644 , m_peer_id, "peer sent 0 length piece"));
1646 return;
1649 if (!verify_piece(p))
1651 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1652 (*m_logger) << time_now_string()
1653 << " <== INVALID_PIECE [ piece: " << p.piece << " | "
1654 "start: " << p.start << " | "
1655 "length: " << p.length << " ]\n";
1656 #endif
1657 disconnect("got invalid piece packet", 2);
1658 return;
1661 // if we're already seeding, don't bother,
1662 // just ignore it
1663 if (t->is_seed())
1665 t->add_redundant_bytes(p.length);
1666 return;
1669 ptime now = time_now();
1671 piece_picker& picker = t->picker();
1672 piece_manager& fs = t->filesystem();
1674 std::vector<piece_block> finished_blocks;
1675 piece_block block_finished(p.piece, p.start / t->block_size());
1676 TORRENT_ASSERT(p.start % t->block_size() == 0);
1677 TORRENT_ASSERT(p.length == t->block_size()
1678 || p.length == t->torrent_file().total_size() % t->block_size());
1680 std::deque<pending_block>::iterator b
1681 = std::find_if(
1682 m_download_queue.begin()
1683 , m_download_queue.end()
1684 , has_block(block_finished));
1686 if (b == m_download_queue.end())
1688 if (t->alerts().should_post<unwanted_block_alert>())
1690 t->alerts().post_alert(unwanted_block_alert(t->get_handle(), m_remote
1691 , m_peer_id, block_finished.block_index, block_finished.piece_index));
1693 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1694 (*m_logger) << " *** The block we just got was not in the "
1695 "request queue ***\n";
1696 #endif
1697 t->add_redundant_bytes(p.length);
1698 request_a_block(*t, *this);
1699 send_block_requests();
1700 return;
1702 #ifndef NDEBUG
1703 pending_block pending_b = *b;
1704 #endif
1706 int block_index = b - m_download_queue.begin() - 1;
1707 for (int i = 0; i < block_index; ++i)
1709 pending_block& qe = m_download_queue[i];
1711 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1712 (*m_logger) << time_now_string()
1713 << " *** SKIPPED_PIECE [ piece: " << qe.block.piece_index << " | "
1714 "b: " << qe.block.block_index << " ] ***\n";
1715 #endif
1717 ++qe.skipped;
1718 // if the number of times a block is skipped by out of order
1719 // blocks exceeds the size of the outstanding queue, assume that
1720 // the other end dropped the request.
1721 if (qe.skipped > m_desired_queue_size)
1723 if (m_ses.m_alerts.should_post<request_dropped_alert>())
1724 m_ses.m_alerts.post_alert(request_dropped_alert(t->get_handle()
1725 , remote(), pid(), qe.block.block_index, qe.block.piece_index));
1726 picker.abort_download(qe.block);
1727 TORRENT_ASSERT(m_download_queue.begin() + i != b);
1728 m_download_queue.erase(m_download_queue.begin() + i);
1729 --i;
1730 --block_index;
1733 TORRENT_ASSERT(int(m_download_queue.size()) > block_index + 1);
1734 b = m_download_queue.begin() + (block_index + 1);
1735 TORRENT_ASSERT(b->block == pending_b.block);
1737 // if the block we got is already finished, then ignore it
1738 if (picker.is_downloaded(block_finished))
1740 t->add_redundant_bytes(p.length);
1742 m_download_queue.erase(b);
1743 m_timeout_extend = 0;
1745 if (!m_download_queue.empty())
1746 m_requested = now;
1748 request_a_block(*t, *this);
1749 send_block_requests();
1750 return;
1753 if (total_seconds(now - m_requested)
1754 < m_ses.settings().request_timeout
1755 && m_snubbed)
1757 m_snubbed = false;
1758 if (m_ses.m_alerts.should_post<peer_unsnubbed_alert>())
1760 m_ses.m_alerts.post_alert(peer_unsnubbed_alert(t->get_handle()
1761 , m_remote, m_peer_id));
1765 fs.async_write(p, data, bind(&peer_connection::on_disk_write_complete
1766 , self(), _1, _2, p, t));
1767 m_outstanding_writing_bytes += p.length;
1768 TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_idle);
1769 m_download_queue.erase(b);
1771 if (m_outstanding_writing_bytes >= m_ses.settings().max_outstanding_disk_bytes_per_connection
1772 && t->alerts().should_post<performance_alert>())
1774 t->alerts().post_alert(performance_alert(t->get_handle()
1775 , performance_alert::outstanding_disk_buffer_limit_reached));
1778 if (!m_download_queue.empty())
1780 m_timeout_extend = (std::max)(m_timeout_extend
1781 - m_ses.settings().request_timeout, 0);
1782 m_requested += seconds(m_ses.settings().request_timeout);
1783 if (m_requested > now) m_requested = now;
1785 else
1787 m_timeout_extend = 0;
1790 // did we request this block from any other peers?
1791 bool multi = picker.num_peers(block_finished) > 1;
1792 picker.mark_as_writing(block_finished, peer_info_struct());
1794 TORRENT_ASSERT(picker.num_peers(block_finished) == 0);
1795 // if we requested this block from other peers, cancel it now
1796 if (multi) t->cancel_block(block_finished);
1798 TORRENT_ASSERT(picker.num_peers(block_finished) == 0);
1800 #if !defined NDEBUG && !defined TORRENT_DISABLE_INVARIANT_CHECKS \
1801 && defined TORRENT_EXPENSIVE_INVARIANT_CHECKS
1802 t->check_invariant();
1803 #endif
1804 request_a_block(*t, *this);
1805 send_block_requests();
1808 void peer_connection::on_disk_write_complete(int ret, disk_io_job const& j
1809 , peer_request p, boost::shared_ptr<torrent> t)
1811 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
1813 INVARIANT_CHECK;
1815 m_outstanding_writing_bytes -= p.length;
1816 TORRENT_ASSERT(m_outstanding_writing_bytes >= 0);
1818 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
1819 // (*m_ses.m_logger) << time_now_string() << " *** DISK_WRITE_COMPLETE [ p: "
1820 // << p.piece << " o: " << p.start << " ]\n";
1821 #endif
1822 // in case the outstanding bytes just dropped down
1823 // to allow to receive more data
1824 setup_receive();
1826 piece_block block_finished(p.piece, p.start / t->block_size());
1828 if (ret == -1 || !t)
1830 if (t->has_picker()) t->picker().write_failed(block_finished);
1832 if (!t)
1834 disconnect(j.str.c_str());
1835 return;
1838 if (t->alerts().should_post<file_error_alert>())
1839 t->alerts().post_alert(file_error_alert(j.error_file, t->get_handle(), j.str));
1840 t->set_error(j.str);
1841 t->pause();
1842 return;
1845 if (t->is_seed()) return;
1847 piece_picker& picker = t->picker();
1849 TORRENT_ASSERT(p.piece == j.piece);
1850 TORRENT_ASSERT(p.start == j.offset);
1851 TORRENT_ASSERT(picker.num_peers(block_finished) == 0);
1852 picker.mark_as_finished(block_finished, peer_info_struct());
1853 if (t->alerts().should_post<block_finished_alert>())
1855 t->alerts().post_alert(block_finished_alert(t->get_handle(),
1856 remote(), pid(), block_finished.block_index, block_finished.piece_index));
1859 // did we just finish the piece?
1860 if (picker.is_piece_finished(p.piece))
1862 #ifndef NDEBUG
1863 check_postcondition post_checker2_(t, false);
1864 #endif
1865 t->async_verify_piece(p.piece, bind(&torrent::piece_finished, t
1866 , p.piece, _1));
1869 if (!t->is_seed() && !m_torrent.expired())
1871 // this is a free function defined in policy.cpp
1872 request_a_block(*t, *this);
1873 send_block_requests();
1878 // -----------------------------
1879 // ---------- CANCEL -----------
1880 // -----------------------------
1882 void peer_connection::incoming_cancel(peer_request const& r)
1884 INVARIANT_CHECK;
1886 #ifndef TORRENT_DISABLE_EXTENSIONS
1887 for (extension_list_t::iterator i = m_extensions.begin()
1888 , end(m_extensions.end()); i != end; ++i)
1890 if ((*i)->on_cancel(r)) return;
1892 #endif
1893 if (is_disconnecting()) return;
1895 #ifdef TORRENT_VERBOSE_LOGGING
1896 (*m_logger) << time_now_string()
1897 << " <== CANCEL [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
1898 #endif
1900 std::deque<peer_request>::iterator i
1901 = std::find(m_requests.begin(), m_requests.end(), r);
1903 if (i != m_requests.end())
1905 m_requests.erase(i);
1906 #ifdef TORRENT_VERBOSE_LOGGING
1907 (*m_logger) << time_now_string()
1908 << " ==> REJECT_PIECE [ "
1909 "piece: " << r.piece << " | "
1910 "s: " << r.start << " | "
1911 "l: " << r.length << " ]\n";
1912 #endif
1913 write_reject_request(r);
1915 else
1917 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1918 (*m_logger) << time_now_string() << " *** GOT CANCEL NOT IN THE QUEUE\n";
1919 #endif
1923 // -----------------------------
1924 // --------- DHT PORT ----------
1925 // -----------------------------
1927 void peer_connection::incoming_dht_port(int listen_port)
1929 INVARIANT_CHECK;
1931 #ifdef TORRENT_VERBOSE_LOGGING
1932 (*m_logger) << time_now_string()
1933 << " <== DHT_PORT [ p: " << listen_port << " ]\n";
1934 #endif
1935 #ifndef TORRENT_DISABLE_DHT
1936 m_ses.add_dht_node(udp::endpoint(
1937 m_remote.address(), listen_port));
1938 #endif
1941 // -----------------------------
1942 // --------- HAVE ALL ----------
1943 // -----------------------------
1945 void peer_connection::incoming_have_all()
1947 INVARIANT_CHECK;
1949 boost::shared_ptr<torrent> t = m_torrent.lock();
1950 TORRENT_ASSERT(t);
1952 #ifdef TORRENT_VERBOSE_LOGGING
1953 (*m_logger) << time_now_string() << " <== HAVE_ALL\n";
1954 #endif
1956 #ifndef TORRENT_DISABLE_EXTENSIONS
1957 for (extension_list_t::iterator i = m_extensions.begin()
1958 , end(m_extensions.end()); i != end; ++i)
1960 if ((*i)->on_have_all()) return;
1962 #endif
1963 if (is_disconnecting()) return;
1965 m_have_all = true;
1967 if (m_peer_info) m_peer_info->seed = true;
1968 m_upload_only = true;
1969 m_bitfield_received = true;
1971 #ifdef TORRENT_VERBOSE_LOGGING
1972 (*m_logger) << " *** THIS IS A SEED ***\n";
1973 #endif
1975 // if we don't have metadata yet
1976 // just remember the bitmask
1977 // don't update the piecepicker
1978 // (since it doesn't exist yet)
1979 if (!t->ready_for_connections())
1981 // assume seeds are interesting when we
1982 // don't even have the metadata
1983 t->get_policy().peer_is_interesting(*this);
1985 disconnect_if_redundant();
1986 // TODO: this might need something more
1987 // so that once we have the metadata
1988 // we can construct a full bitfield
1989 return;
1992 TORRENT_ASSERT(!m_have_piece.empty());
1993 m_have_piece.set_all();
1994 m_num_pieces = m_have_piece.size();
1996 t->peer_has_all();
1998 // if we're finished, we're not interested
1999 if (t->is_finished()) send_not_interested();
2000 else t->get_policy().peer_is_interesting(*this);
2002 disconnect_if_redundant();
2005 // -----------------------------
2006 // --------- HAVE NONE ---------
2007 // -----------------------------
2009 void peer_connection::incoming_have_none()
2011 INVARIANT_CHECK;
2013 #ifdef TORRENT_VERBOSE_LOGGING
2014 (*m_logger) << time_now_string() << " <== HAVE_NONE\n";
2015 #endif
2017 boost::shared_ptr<torrent> t = m_torrent.lock();
2018 TORRENT_ASSERT(t);
2020 #ifndef TORRENT_DISABLE_EXTENSIONS
2021 for (extension_list_t::iterator i = m_extensions.begin()
2022 , end(m_extensions.end()); i != end; ++i)
2024 if ((*i)->on_have_none()) return;
2026 #endif
2027 if (is_disconnecting()) return;
2028 if (m_peer_info) m_peer_info->seed = false;
2029 m_bitfield_received = true;
2031 // we're never interested in a peer that doesn't have anything
2032 send_not_interested();
2034 TORRENT_ASSERT(!m_have_piece.empty() || !t->ready_for_connections());
2035 disconnect_if_redundant();
2038 // -----------------------------
2039 // ------- ALLOWED FAST --------
2040 // -----------------------------
2042 void peer_connection::incoming_allowed_fast(int index)
2044 INVARIANT_CHECK;
2046 boost::shared_ptr<torrent> t = m_torrent.lock();
2047 TORRENT_ASSERT(t);
2049 #ifdef TORRENT_VERBOSE_LOGGING
2050 (*m_logger) << time_now_string() << " <== ALLOWED_FAST [ " << index << " ]\n";
2051 #endif
2053 #ifndef TORRENT_DISABLE_EXTENSIONS
2054 for (extension_list_t::iterator i = m_extensions.begin()
2055 , end(m_extensions.end()); i != end; ++i)
2057 if ((*i)->on_allowed_fast(index)) return;
2059 #endif
2060 if (is_disconnecting()) return;
2062 if (t->valid_metadata())
2064 if (index < 0 || index >= int(m_have_piece.size()))
2066 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2067 (*m_logger) << time_now_string() << " <== INVALID_ALLOWED_FAST [ " << index << " | s: "
2068 << int(m_have_piece.size()) << " ]\n";
2069 #endif
2070 return;
2073 // if we already have the piece, we can
2074 // ignore this message
2075 if (t->have_piece(index))
2076 return;
2079 m_allowed_fast.push_back(index);
2081 // if the peer has the piece and we want
2082 // to download it, request it
2083 if (int(m_have_piece.size()) > index
2084 && m_have_piece[index]
2085 && t->valid_metadata()
2086 && t->has_picker()
2087 && t->picker().piece_priority(index) > 0)
2089 t->get_policy().peer_is_interesting(*this);
2093 std::vector<int> const& peer_connection::allowed_fast()
2095 boost::shared_ptr<torrent> t = m_torrent.lock();
2096 TORRENT_ASSERT(t);
2098 m_allowed_fast.erase(std::remove_if(m_allowed_fast.begin()
2099 , m_allowed_fast.end(), bind(&torrent::have_piece, t, _1))
2100 , m_allowed_fast.end());
2102 // TODO: sort the allowed fast set in priority order
2103 return m_allowed_fast;
2106 void peer_connection::add_request(piece_block const& block)
2108 // INVARIANT_CHECK;
2110 boost::shared_ptr<torrent> t = m_torrent.lock();
2111 TORRENT_ASSERT(t);
2113 TORRENT_ASSERT(t->valid_metadata());
2114 TORRENT_ASSERT(block.piece_index >= 0);
2115 TORRENT_ASSERT(block.piece_index < t->torrent_file().num_pieces());
2116 TORRENT_ASSERT(block.block_index >= 0);
2117 TORRENT_ASSERT(block.block_index < t->torrent_file().piece_size(block.piece_index));
2118 TORRENT_ASSERT(!t->picker().is_requested(block) || (t->picker().num_peers(block) > 0));
2119 TORRENT_ASSERT(!t->have_piece(block.piece_index));
2120 TORRENT_ASSERT(std::find_if(m_download_queue.begin(), m_download_queue.end()
2121 , has_block(block)) == m_download_queue.end());
2122 TORRENT_ASSERT(std::find(m_request_queue.begin(), m_request_queue.end()
2123 , block) == m_request_queue.end());
2125 piece_picker::piece_state_t state;
2126 peer_speed_t speed = peer_speed();
2127 char const* speedmsg = 0;
2128 if (speed == fast)
2130 speedmsg = "fast";
2131 state = piece_picker::fast;
2133 else if (speed == medium)
2135 speedmsg = "medium";
2136 state = piece_picker::medium;
2138 else
2140 speedmsg = "slow";
2141 state = piece_picker::slow;
2144 if (!t->picker().mark_as_downloading(block, peer_info_struct(), state))
2145 return;
2147 if (t->alerts().should_post<block_downloading_alert>())
2149 t->alerts().post_alert(block_downloading_alert(t->get_handle(),
2150 remote(), pid(), speedmsg, block.block_index, block.piece_index));
2153 m_request_queue.push_back(block);
2156 void peer_connection::cancel_request(piece_block const& block)
2158 INVARIANT_CHECK;
2160 boost::shared_ptr<torrent> t = m_torrent.lock();
2161 // this peer might be disconnecting
2162 if (!t) return;
2164 TORRENT_ASSERT(t->valid_metadata());
2166 TORRENT_ASSERT(block.piece_index >= 0);
2167 TORRENT_ASSERT(block.piece_index < t->torrent_file().num_pieces());
2168 TORRENT_ASSERT(block.block_index >= 0);
2169 TORRENT_ASSERT(block.block_index < t->torrent_file().piece_size(block.piece_index));
2171 // if all the peers that requested this block has been
2172 // cancelled, then just ignore the cancel.
2173 if (!t->picker().is_requested(block)) return;
2175 std::deque<pending_block>::iterator it
2176 = std::find_if(m_download_queue.begin(), m_download_queue.end(), has_block(block));
2177 if (it == m_download_queue.end())
2179 std::deque<piece_block>::iterator rit = std::find(m_request_queue.begin()
2180 , m_request_queue.end(), block);
2182 // when a multi block is received, it is cancelled
2183 // from all peers, so if this one hasn't requested
2184 // the block, just ignore to cancel it.
2185 if (rit == m_request_queue.end()) return;
2187 t->picker().abort_download(block);
2188 m_request_queue.erase(rit);
2189 // since we found it in the request queue, it means it hasn't been
2190 // sent yet, so we don't have to send a cancel.
2191 return;
2194 int block_offset = block.block_index * t->block_size();
2195 int block_size
2196 = (std::min)(t->torrent_file().piece_size(block.piece_index)-block_offset,
2197 t->block_size());
2198 TORRENT_ASSERT(block_size > 0);
2199 TORRENT_ASSERT(block_size <= t->block_size());
2201 peer_request r;
2202 r.piece = block.piece_index;
2203 r.start = block_offset;
2204 r.length = block_size;
2206 #ifdef TORRENT_VERBOSE_LOGGING
2207 (*m_logger) << time_now_string()
2208 << " ==> CANCEL [ piece: " << block.piece_index << " | s: "
2209 << block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n";
2210 #endif
2211 write_cancel(r);
2214 void peer_connection::send_choke()
2216 INVARIANT_CHECK;
2218 TORRENT_ASSERT(!m_peer_info || !m_peer_info->optimistically_unchoked);
2220 if (m_choked) return;
2221 write_choke();
2222 m_choked = true;
2224 #ifdef TORRENT_VERBOSE_LOGGING
2225 (*m_logger) << time_now_string() << " ==> CHOKE\n";
2226 #endif
2227 #ifndef NDEBUG
2228 m_last_choke = time_now();
2229 #endif
2230 m_num_invalid_requests = 0;
2232 // reject the requests we have in the queue
2233 // except the allowed fast pieces
2234 for (std::deque<peer_request>::iterator i = m_requests.begin();
2235 i != m_requests.end();)
2237 if (m_accept_fast.count(i->piece))
2239 ++i;
2240 continue;
2243 peer_request const& r = *i;
2244 write_reject_request(r);
2246 #ifdef TORRENT_VERBOSE_LOGGING
2247 (*m_logger) << time_now_string()
2248 << " ==> REJECT_PIECE [ "
2249 "piece: " << r.piece << " | "
2250 "s: " << r.start << " | "
2251 "l: " << r.length << " ]\n";
2252 #endif
2253 i = m_requests.erase(i);
2257 bool peer_connection::send_unchoke()
2259 INVARIANT_CHECK;
2261 if (!m_choked) return false;
2262 boost::shared_ptr<torrent> t = m_torrent.lock();
2263 if (!t->ready_for_connections()) return false;
2264 m_last_unchoke = time_now();
2265 write_unchoke();
2266 m_choked = false;
2268 #ifdef TORRENT_VERBOSE_LOGGING
2269 (*m_logger) << time_now_string() << " ==> UNCHOKE\n";
2270 #endif
2271 return true;
2274 void peer_connection::send_interested()
2276 if (m_interesting) return;
2277 boost::shared_ptr<torrent> t = m_torrent.lock();
2278 if (!t->ready_for_connections()) return;
2279 m_interesting = true;
2280 write_interested();
2282 #ifdef TORRENT_VERBOSE_LOGGING
2283 (*m_logger) << time_now_string() << " ==> INTERESTED\n";
2284 #endif
2287 void peer_connection::send_not_interested()
2289 if (!m_interesting) return;
2290 boost::shared_ptr<torrent> t = m_torrent.lock();
2291 if (!t->ready_for_connections()) return;
2292 m_interesting = false;
2293 write_not_interested();
2295 m_became_uninteresting = time_now();
2297 #ifdef TORRENT_VERBOSE_LOGGING
2298 (*m_logger) << time_now_string() << " ==> NOT_INTERESTED\n";
2299 #endif
2300 disconnect_if_redundant();
2303 void peer_connection::send_block_requests()
2305 INVARIANT_CHECK;
2307 boost::shared_ptr<torrent> t = m_torrent.lock();
2308 TORRENT_ASSERT(t);
2310 if ((int)m_download_queue.size() >= m_desired_queue_size) return;
2312 bool empty_download_queue = m_download_queue.empty();
2314 while (!m_request_queue.empty()
2315 && (int)m_download_queue.size() < m_desired_queue_size)
2317 piece_block block = m_request_queue.front();
2319 int block_offset = block.block_index * t->block_size();
2320 int block_size = (std::min)(t->torrent_file().piece_size(
2321 block.piece_index) - block_offset, t->block_size());
2322 TORRENT_ASSERT(block_size > 0);
2323 TORRENT_ASSERT(block_size <= t->block_size());
2325 peer_request r;
2326 r.piece = block.piece_index;
2327 r.start = block_offset;
2328 r.length = block_size;
2330 m_request_queue.pop_front();
2331 if (t->is_seed()) continue;
2332 // this can happen if a block times out, is re-requested and
2333 // then arrives "unexpectedly"
2334 if (t->picker().is_finished(block) || t->picker().is_downloaded(block))
2335 continue;
2337 m_download_queue.push_back(block);
2339 #ifdef TORRENT_VERBOSE_LOGGING
2340 (*m_logger) << time_now_string()
2341 << " *** REQUEST-QUEUE** [ "
2342 "piece: " << block.piece_index << " | "
2343 "block: " << block.block_index << " ]\n";
2344 #endif
2346 // if we are requesting large blocks, merge the smaller
2347 // blocks that are in the same piece into larger requests
2348 if (m_request_large_blocks)
2350 int blocks_per_piece = t->torrent_file().piece_length() / t->block_size();
2352 while (!m_request_queue.empty())
2354 // check to see if this block is connected to the previous one
2355 // if it is, merge them, otherwise, break this merge loop
2356 piece_block const& front = m_request_queue.front();
2357 if (front.piece_index * blocks_per_piece + front.block_index
2358 != block.piece_index * blocks_per_piece + block.block_index + 1)
2359 break;
2360 block = m_request_queue.front();
2361 m_request_queue.pop_front();
2362 m_download_queue.push_back(block);
2364 #ifdef TORRENT_VERBOSE_LOGGING
2365 (*m_logger) << time_now_string()
2366 << " *** MERGING REQUEST ** [ "
2367 "piece: " << block.piece_index << " | "
2368 "block: " << block.block_index << " ]\n";
2369 #endif
2371 block_offset = block.block_index * t->block_size();
2372 block_size = (std::min)(t->torrent_file().piece_size(
2373 block.piece_index) - block_offset, t->block_size());
2374 TORRENT_ASSERT(block_size > 0);
2375 TORRENT_ASSERT(block_size <= t->block_size());
2377 r.length += block_size;
2381 TORRENT_ASSERT(verify_piece(r));
2383 #ifndef TORRENT_DISABLE_EXTENSIONS
2384 bool handled = false;
2385 for (extension_list_t::iterator i = m_extensions.begin()
2386 , end(m_extensions.end()); i != end; ++i)
2388 if (handled = (*i)->write_request(r)) break;
2390 if (is_disconnecting()) return;
2391 if (!handled)
2393 write_request(r);
2394 m_last_request = time_now();
2396 #else
2397 write_request(r);
2398 m_last_request = time_now();
2399 #endif
2401 #ifdef TORRENT_VERBOSE_LOGGING
2402 (*m_logger) << time_now_string()
2403 << " ==> REQUEST [ "
2404 "piece: " << r.piece << " | "
2405 "s: " << r.start << " | "
2406 "l: " << r.length << " | "
2407 "ds: " << statistics().download_rate() << " B/s | "
2408 "qs: " << int(m_desired_queue_size) << " "
2409 "blk: " << (m_request_large_blocks?"large":"single") << " ]\n";
2410 #endif
2412 m_last_piece = time_now();
2414 if (!m_download_queue.empty()
2415 && empty_download_queue)
2417 // This means we just added a request to this connection
2418 m_requested = time_now();
2422 void peer_connection::timed_out()
2424 TORRENT_ASSERT(m_connecting);
2425 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
2426 (*m_ses.m_logger) << time_now_string() << " CONNECTION TIMED OUT: " << m_remote.address().to_string()
2427 << "\n";
2428 #endif
2429 disconnect("timed out: connect", 1);
2432 // the error argument defaults to 0, which means deliberate disconnect
2433 // 1 means unexpected disconnect/error
2434 // 2 protocol error (client sent something invalid)
2435 void peer_connection::disconnect(char const* message, int error)
2437 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2439 #ifndef NDEBUG
2440 m_disconnect_started = true;
2441 #endif
2443 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2444 switch (error)
2446 case 0:
2447 (*m_logger) << "*** CONNECTION CLOSED " << message << "\n";
2448 break;
2449 case 1:
2450 (*m_logger) << "*** CONNECTION FAILED " << message << "\n";
2451 break;
2452 case 2:
2453 (*m_logger) << "*** PEER ERROR " << message << "\n";
2454 break;
2456 #endif
2457 // we cannot do this in a constructor
2458 TORRENT_ASSERT(m_in_constructor == false);
2459 if (error > 0) m_failed = true;
2460 if (m_disconnecting) return;
2461 boost::intrusive_ptr<peer_connection> me(this);
2463 INVARIANT_CHECK;
2465 if (m_connecting && m_connection_ticket >= 0)
2467 m_ses.m_half_open.done(m_connection_ticket);
2468 m_connection_ticket = -1;
2471 boost::shared_ptr<torrent> t = m_torrent.lock();
2472 torrent_handle handle;
2473 if (t) handle = t->get_handle();
2475 if (message)
2477 if (error > 1 && m_ses.m_alerts.should_post<peer_error_alert>())
2479 m_ses.m_alerts.post_alert(
2480 peer_error_alert(handle, remote(), pid(), message));
2482 else if (error <= 1 && m_ses.m_alerts.should_post<peer_disconnected_alert>())
2484 m_ses.m_alerts.post_alert(
2485 peer_disconnected_alert(handle, remote(), pid(), message));
2489 if (t)
2491 // make sure we keep all the stats!
2492 calc_ip_overhead();
2493 t->add_stats(statistics());
2495 if (t->has_picker())
2497 piece_picker& picker = t->picker();
2499 while (!m_download_queue.empty())
2501 picker.abort_download(m_download_queue.back().block);
2502 m_download_queue.pop_back();
2504 while (!m_request_queue.empty())
2506 picker.abort_download(m_request_queue.back());
2507 m_request_queue.pop_back();
2511 t->remove_peer(this);
2512 m_torrent.reset();
2515 #ifndef NDEBUG
2516 // since this connection doesn't have a torrent reference
2517 // no torrent should have a reference to this connection either
2518 for (aux::session_impl::torrent_map::const_iterator i = m_ses.m_torrents.begin()
2519 , end(m_ses.m_torrents.end()); i != end; ++i)
2520 TORRENT_ASSERT(!i->second->has_peer(this));
2521 #endif
2523 m_disconnecting = true;
2524 error_code ec;
2525 m_socket->close(ec);
2526 m_ses.close_connection(this, message);
2529 void peer_connection::set_upload_limit(int limit)
2531 TORRENT_ASSERT(limit >= -1);
2532 if (limit == -1) limit = (std::numeric_limits<int>::max)();
2533 if (limit < 10) limit = 10;
2534 m_upload_limit = limit;
2535 m_bandwidth_limit[upload_channel].throttle(m_upload_limit);
2538 void peer_connection::set_download_limit(int limit)
2540 TORRENT_ASSERT(limit >= -1);
2541 if (limit == -1) limit = (std::numeric_limits<int>::max)();
2542 if (limit < 10) limit = 10;
2543 m_download_limit = limit;
2544 m_bandwidth_limit[download_channel].throttle(m_download_limit);
2547 size_type peer_connection::share_diff() const
2549 INVARIANT_CHECK;
2551 boost::shared_ptr<torrent> t = m_torrent.lock();
2552 TORRENT_ASSERT(t);
2554 float ratio = t->ratio();
2556 // if we have an infinite ratio, just say we have downloaded
2557 // much more than we have uploaded. And we'll keep uploading.
2558 if (ratio == 0.f)
2559 return (std::numeric_limits<size_type>::max)();
2561 return m_free_upload
2562 + static_cast<size_type>(m_statistics.total_payload_download() * ratio)
2563 - m_statistics.total_payload_upload();
2566 // defined in upnp.cpp
2567 bool is_local(address const& a);
2569 bool peer_connection::on_local_network() const
2571 if (libtorrent::is_local(m_remote.address())
2572 || is_loopback(m_remote.address())) return true;
2573 return false;
2576 void peer_connection::get_peer_info(peer_info& p) const
2578 TORRENT_ASSERT(!associated_torrent().expired());
2580 ptime now = time_now();
2582 p.download_rate_peak = m_download_rate_peak;
2583 p.upload_rate_peak = m_upload_rate_peak;
2584 p.rtt = m_rtt;
2585 p.down_speed = statistics().download_rate();
2586 p.up_speed = statistics().upload_rate();
2587 p.payload_down_speed = statistics().download_payload_rate();
2588 p.payload_up_speed = statistics().upload_payload_rate();
2589 p.pid = pid();
2590 p.ip = remote();
2591 p.pending_disk_bytes = m_outstanding_writing_bytes;
2592 p.send_quota = m_bandwidth_limit[upload_channel].quota_left();
2593 p.receive_quota = m_bandwidth_limit[download_channel].quota_left();
2594 if (m_download_queue.empty()) p.request_timeout = -1;
2595 else p.request_timeout = total_seconds(m_requested - now) + m_ses.settings().request_timeout
2596 + m_timeout_extend;
2597 #ifndef TORRENT_DISABLE_GEO_IP
2598 p.inet_as_name = m_inet_as_name;
2599 #endif
2601 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
2602 p.country[0] = m_country[0];
2603 p.country[1] = m_country[1];
2604 #endif
2606 p.total_download = statistics().total_payload_download();
2607 p.total_upload = statistics().total_payload_upload();
2609 if (m_bandwidth_limit[upload_channel].throttle() == bandwidth_limit::inf)
2610 p.upload_limit = -1;
2611 else
2612 p.upload_limit = m_bandwidth_limit[upload_channel].throttle();
2614 if (m_bandwidth_limit[download_channel].throttle() == bandwidth_limit::inf)
2615 p.download_limit = -1;
2616 else
2617 p.download_limit = m_bandwidth_limit[download_channel].throttle();
2619 p.load_balancing = total_free_upload();
2621 p.download_queue_length = int(download_queue().size() + m_request_queue.size());
2622 p.requests_in_buffer = int(m_requests_in_buffer.size());
2623 p.target_dl_queue_length = int(desired_queue_size());
2624 p.upload_queue_length = int(upload_queue().size());
2626 if (boost::optional<piece_block_progress> ret = downloading_piece_progress())
2628 p.downloading_piece_index = ret->piece_index;
2629 p.downloading_block_index = ret->block_index;
2630 p.downloading_progress = ret->bytes_downloaded;
2631 p.downloading_total = ret->full_block_bytes;
2633 else
2635 p.downloading_piece_index = -1;
2636 p.downloading_block_index = -1;
2637 p.downloading_progress = 0;
2638 p.downloading_total = 0;
2641 p.pieces = get_bitfield();
2642 p.last_request = now - m_last_request;
2643 p.last_active = now - (std::max)(m_last_sent, m_last_receive);
2645 // this will set the flags so that we can update them later
2646 p.flags = 0;
2647 get_specific_peer_info(p);
2649 p.flags |= is_seed() ? peer_info::seed : 0;
2650 p.flags |= m_snubbed ? peer_info::snubbed : 0;
2651 p.flags |= m_upload_only ? peer_info::upload_only : 0;
2652 if (peer_info_struct())
2654 policy::peer* pi = peer_info_struct();
2655 p.source = pi->source;
2656 p.failcount = pi->failcount;
2657 p.num_hashfails = pi->hashfails;
2658 p.flags |= pi->on_parole ? peer_info::on_parole : 0;
2659 p.flags |= pi->optimistically_unchoked ? peer_info::optimistic_unchoke : 0;
2660 #ifndef TORRENT_DISABLE_GEO_IP
2661 p.inet_as = pi->inet_as->first;
2662 #endif
2664 else
2666 p.source = 0;
2667 p.failcount = 0;
2668 p.num_hashfails = 0;
2669 p.remote_dl_rate = 0;
2670 #ifndef TORRENT_DISABLE_GEO_IP
2671 p.inet_as = 0xffff;
2672 #endif
2675 p.remote_dl_rate = m_remote_dl_rate;
2676 p.send_buffer_size = m_send_buffer.capacity();
2677 p.used_send_buffer = m_send_buffer.size();
2678 p.receive_buffer_size = m_recv_buffer.capacity() + m_disk_recv_buffer_size;
2679 p.used_receive_buffer = m_recv_pos;
2680 p.write_state = m_channel_state[upload_channel];
2681 p.read_state = m_channel_state[download_channel];
2683 p.progress = (float)p.pieces.count() / (float)p.pieces.size();
2686 // allocates a disk buffer of size 'disk_buffer_size' and replaces the
2687 // end of the current receive buffer with it. i.e. the receive pos
2688 // must be <= packet_size - disk_buffer_size
2689 // the disk buffer can be accessed through release_disk_receive_buffer()
2690 // when it is queried, the responsibility to free it is transferred
2691 // to the caller
2692 bool peer_connection::allocate_disk_receive_buffer(int disk_buffer_size)
2694 INVARIANT_CHECK;
2696 TORRENT_ASSERT(m_packet_size > 0);
2697 TORRENT_ASSERT(m_recv_pos <= m_packet_size - disk_buffer_size);
2698 TORRENT_ASSERT(!m_disk_recv_buffer);
2699 TORRENT_ASSERT(disk_buffer_size <= 16 * 1024);
2701 if (disk_buffer_size == 0) return true;
2703 if (disk_buffer_size > 16 * 1024)
2705 disconnect("invalid piece size", 2);
2706 return false;
2709 m_disk_recv_buffer.reset(m_ses.allocate_disk_buffer());
2710 if (!m_disk_recv_buffer)
2712 disconnect("out of memory");
2713 return false;
2715 m_disk_recv_buffer_size = disk_buffer_size;
2716 return true;
2719 char* peer_connection::release_disk_receive_buffer()
2721 m_disk_recv_buffer_size = 0;
2722 return m_disk_recv_buffer.release();
2725 void peer_connection::cut_receive_buffer(int size, int packet_size)
2727 INVARIANT_CHECK;
2729 TORRENT_ASSERT(packet_size > 0);
2730 TORRENT_ASSERT(int(m_recv_buffer.size()) >= size);
2731 TORRENT_ASSERT(int(m_recv_buffer.size()) >= m_recv_pos);
2732 TORRENT_ASSERT(m_recv_pos >= size);
2734 if (size > 0)
2735 std::memmove(&m_recv_buffer[0], &m_recv_buffer[0] + size, m_recv_pos - size);
2737 m_recv_pos -= size;
2739 #ifndef NDEBUG
2740 std::fill(m_recv_buffer.begin() + m_recv_pos, m_recv_buffer.end(), 0);
2741 #endif
2743 m_packet_size = packet_size;
2746 void peer_connection::calc_ip_overhead()
2748 m_statistics.calc_ip_overhead();
2751 void peer_connection::second_tick(float tick_interval)
2753 ptime now(time_now());
2754 boost::intrusive_ptr<peer_connection> me(self());
2756 // the invariant check must be run before me is destructed
2757 // in case the peer got disconnected
2758 INVARIANT_CHECK;
2760 boost::shared_ptr<torrent> t = m_torrent.lock();
2761 if (!t || m_disconnecting)
2763 m_ses.m_half_open.done(m_connection_ticket);
2764 m_connecting = false;
2765 disconnect("torrent aborted");
2766 return;
2769 on_tick();
2771 #ifndef TORRENT_DISABLE_EXTENSIONS
2772 for (extension_list_t::iterator i = m_extensions.begin()
2773 , end(m_extensions.end()); i != end; ++i)
2775 (*i)->tick();
2777 if (is_disconnecting()) return;
2778 #endif
2780 // if the peer hasn't said a thing for a certain
2781 // time, it is considered to have timed out
2782 time_duration d;
2783 d = now - m_last_receive;
2784 if (d > seconds(m_timeout) && !m_connecting)
2786 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2787 (*m_logger) << time_now_string() << " *** LAST ACTIVITY [ "
2788 << total_seconds(d) << " seconds ago ] ***\n";
2789 #endif
2790 disconnect("timed out: inactivity");
2791 return;
2794 // do not stall waiting for a handshake
2795 if (!m_connecting
2796 && in_handshake()
2797 && d > seconds(m_ses.settings().handshake_timeout))
2799 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2800 (*m_logger) << time_now_string() << " *** NO HANDSHAKE [ waited "
2801 << total_seconds(d) << " seconds ] ***\n";
2802 #endif
2803 disconnect("timed out: no handshake");
2804 return;
2807 // disconnect peers that we unchoked, but
2808 // they didn't send a request within 20 seconds.
2809 // but only if we're a seed
2810 d = now - (std::max)(m_last_unchoke, m_last_incoming_request);
2811 if (!m_connecting
2812 && m_requests.empty()
2813 && !m_choked
2814 && m_peer_interested
2815 && t && t->is_finished()
2816 && d > seconds(20))
2818 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2819 (*m_logger) << time_now_string() << " *** NO REQUEST [ t: "
2820 << total_seconds(d) << " ] ***\n";
2821 #endif
2822 disconnect("timed out: no request when unchoked");
2823 return;
2826 // if the peer hasn't become interested and we haven't
2827 // become interested in the peer for 10 minutes, it
2828 // has also timed out.
2829 time_duration d1;
2830 time_duration d2;
2831 d1 = now - m_became_uninterested;
2832 d2 = now - m_became_uninteresting;
2833 time_duration time_limit = seconds(
2834 m_ses.settings().inactivity_timeout);
2836 // don't bother disconnect peers we haven't been interested
2837 // in (and that hasn't been interested in us) for a while
2838 // unless we have used up all our connection slots
2839 if (!m_interesting
2840 && !m_peer_interested
2841 && d1 > time_limit
2842 && d2 > time_limit
2843 && (m_ses.num_connections() >= m_ses.max_connections()
2844 || (t && t->num_peers() >= t->max_connections())))
2846 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2847 (*m_logger) << time_now_string() << " *** MUTUAL NO INTEREST [ "
2848 "t1: " << total_seconds(d1) << " | "
2849 "t2: " << total_seconds(d2) << " ] ***\n";
2850 #endif
2851 disconnect("timed out: no interest");
2852 return;
2855 if (!m_download_queue.empty()
2856 && now > m_requested + seconds(m_ses.settings().request_timeout
2857 + m_timeout_extend))
2859 snub_peer();
2862 // if we haven't sent something in too long, send a keep-alive
2863 keep_alive();
2865 m_ignore_bandwidth_limits = m_ses.settings().ignore_limits_on_local_network
2866 && on_local_network();
2868 m_statistics.second_tick(tick_interval);
2870 if (m_statistics.upload_payload_rate() > m_upload_rate_peak)
2872 m_upload_rate_peak = m_statistics.upload_payload_rate();
2874 if (m_statistics.download_payload_rate() > m_download_rate_peak)
2876 m_download_rate_peak = m_statistics.download_payload_rate();
2877 #ifndef TORRENT_DISABLE_GEO_IP
2878 if (peer_info_struct())
2880 std::pair<const int, int>* as_stats = peer_info_struct()->inet_as;
2881 if (as_stats && as_stats->second < m_download_rate_peak)
2882 as_stats->second = m_download_rate_peak;
2884 #endif
2886 if (is_disconnecting()) return;
2888 if (!t->ready_for_connections()) return;
2890 // calculate the desired download queue size
2891 const float queue_time = m_ses.settings().request_queue_time;
2892 // (if the latency is more than this, the download will stall)
2893 // so, the queue size is queue_time * down_rate / 16 kiB
2894 // (16 kB is the size of each request)
2895 // the minimum number of requests is 2 and the maximum is 48
2896 // the block size doesn't have to be 16. So we first query the
2897 // torrent for it
2898 const int block_size = m_request_large_blocks
2899 ? t->torrent_file().piece_length() : t->block_size();
2900 TORRENT_ASSERT(block_size > 0);
2902 if (m_snubbed)
2904 m_desired_queue_size = 1;
2906 else
2908 m_desired_queue_size = static_cast<int>(queue_time
2909 * statistics().download_rate() / block_size);
2910 if (m_desired_queue_size > m_max_out_request_queue)
2911 m_desired_queue_size = m_max_out_request_queue;
2912 if (m_desired_queue_size < min_request_queue)
2913 m_desired_queue_size = min_request_queue;
2915 if (m_desired_queue_size == m_max_out_request_queue
2916 && t->alerts().should_post<performance_alert>())
2918 t->alerts().post_alert(performance_alert(t->get_handle()
2919 , performance_alert::outstanding_request_limit_reached));
2923 if (!m_download_queue.empty()
2924 && now - m_last_piece > seconds(m_ses.settings().piece_timeout
2925 + m_timeout_extend))
2927 // this peer isn't sending the pieces we've
2928 // requested (this has been observed by BitComet)
2929 // in this case we'll clear our download queue and
2930 // re-request the blocks.
2931 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2932 (*m_logger) << time_now_string()
2933 << " *** PIECE_REQUESTS TIMED OUT [ " << (int)m_download_queue.size()
2934 << " " << total_seconds(now - m_last_piece) << "] ***\n";
2935 #endif
2937 snub_peer();
2940 // If the client sends more data
2941 // we send it data faster, otherwise, slower.
2942 // It will also depend on how much data the
2943 // client has sent us. This is the mean to
2944 // maintain the share ratio given by m_ratio
2945 // with all peers.
2947 if (t->is_finished() || is_choked() || t->ratio() == 0.0f)
2949 // if we have downloaded more than one piece more
2950 // than we have uploaded OR if we are a seed
2951 // have an unlimited upload rate
2952 m_bandwidth_limit[upload_channel].throttle(m_upload_limit);
2954 else
2956 size_type bias = 0x10000 + 2 * t->block_size() + m_free_upload;
2958 double break_even_time = 15; // seconds.
2959 size_type have_uploaded = m_statistics.total_payload_upload();
2960 size_type have_downloaded = m_statistics.total_payload_download();
2961 double download_speed = m_statistics.download_rate();
2963 size_type soon_downloaded =
2964 have_downloaded + (size_type)(download_speed * break_even_time*1.5);
2966 if (t->ratio() != 1.f)
2967 soon_downloaded = (size_type)(soon_downloaded*(double)t->ratio());
2969 double upload_speed_limit = (std::min)((soon_downloaded - have_uploaded
2970 + bias) / break_even_time, double(m_upload_limit));
2972 upload_speed_limit = (std::min)(upload_speed_limit,
2973 (double)(std::numeric_limits<int>::max)());
2975 m_bandwidth_limit[upload_channel].throttle(
2976 (std::min)((std::max)((int)upload_speed_limit, 20)
2977 , m_upload_limit));
2980 // update once every minute
2981 if (now - m_remote_dl_update >= seconds(60))
2983 float factor = 0.6666666666667f;
2985 if (m_remote_dl_rate == 0) factor = 0.0f;
2987 m_remote_dl_rate = int((m_remote_dl_rate * factor) +
2988 ((m_remote_bytes_dled * (1.0f-factor)) / 60.f));
2990 m_remote_bytes_dled = 0;
2991 m_remote_dl_update = now;
2994 fill_send_buffer();
2997 void peer_connection::snub_peer()
2999 INVARIANT_CHECK;
3001 boost::shared_ptr<torrent> t = m_torrent.lock();
3002 TORRENT_ASSERT(t);
3004 if (!m_snubbed)
3006 m_snubbed = true;
3007 if (m_ses.m_alerts.should_post<peer_snubbed_alert>())
3009 m_ses.m_alerts.post_alert(peer_snubbed_alert(t->get_handle()
3010 , m_remote, m_peer_id));
3013 m_desired_queue_size = 1;
3015 if (on_parole())
3017 m_timeout_extend += m_ses.settings().request_timeout;
3018 return;
3020 if (!t->has_picker()) return;
3021 piece_picker& picker = t->picker();
3023 piece_block r(-1, -1);
3024 // time out the last request in the queue
3025 if (!m_request_queue.empty())
3027 r = m_request_queue.back();
3028 m_request_queue.pop_back();
3030 else
3032 TORRENT_ASSERT(!m_download_queue.empty());
3033 r = m_download_queue.back().block;
3035 // only time out a request if it blocks the piece
3036 // from being completed (i.e. no free blocks to
3037 // request from it)
3038 piece_picker::downloading_piece p;
3039 picker.piece_info(r.piece_index, p);
3040 int free_blocks = picker.blocks_in_piece(r.piece_index)
3041 - p.finished - p.writing - p.requested;
3042 if (free_blocks > 0)
3044 m_timeout_extend += m_ses.settings().request_timeout;
3045 return;
3048 if (m_ses.m_alerts.should_post<block_timeout_alert>())
3050 m_ses.m_alerts.post_alert(block_timeout_alert(t->get_handle()
3051 , remote(), pid(), r.block_index, r.piece_index));
3053 m_download_queue.pop_back();
3055 if (!m_download_queue.empty() || !m_request_queue.empty())
3056 m_timeout_extend += m_ses.settings().request_timeout;
3058 m_desired_queue_size = 2;
3059 request_a_block(*t, *this);
3060 m_desired_queue_size = 1;
3062 // abort the block after the new one has
3063 // been requested in order to prevent it from
3064 // picking the same block again, stalling the
3065 // same piece indefinitely.
3066 if (r != piece_block(-1, -1))
3067 picker.abort_download(r);
3069 send_block_requests();
3072 void peer_connection::fill_send_buffer()
3074 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3075 INVARIANT_CHECK;
3076 #endif
3078 boost::shared_ptr<torrent> t = m_torrent.lock();
3079 if (!t) return;
3081 // only add new piece-chunks if the send buffer is small enough
3082 // otherwise there will be no end to how large it will be!
3084 int buffer_size_watermark = int(m_statistics.upload_rate()) / 2;
3085 if (buffer_size_watermark < 512) buffer_size_watermark = 512;
3086 else if (buffer_size_watermark > m_ses.settings().send_buffer_watermark)
3087 buffer_size_watermark = m_ses.settings().send_buffer_watermark;
3089 while (!m_requests.empty()
3090 && (send_buffer_size() + m_reading_bytes < buffer_size_watermark))
3092 TORRENT_ASSERT(t->ready_for_connections());
3093 peer_request& r = m_requests.front();
3095 TORRENT_ASSERT(r.piece >= 0);
3096 TORRENT_ASSERT(r.piece < (int)m_have_piece.size());
3097 TORRENT_ASSERT(t->have_piece(r.piece));
3098 TORRENT_ASSERT(r.start + r.length <= t->torrent_file().piece_size(r.piece));
3099 TORRENT_ASSERT(r.length > 0 && r.start >= 0);
3101 t->filesystem().async_read(r, bind(&peer_connection::on_disk_read_complete
3102 , self(), _1, _2, r));
3103 m_reading_bytes += r.length;
3105 m_requests.erase(m_requests.begin());
3109 void peer_connection::on_disk_read_complete(int ret, disk_io_job const& j, peer_request r)
3111 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3113 m_reading_bytes -= r.length;
3115 disk_buffer_holder buffer(m_ses, j.buffer);
3117 if (ret != r.length || m_torrent.expired())
3119 boost::shared_ptr<torrent> t = m_torrent.lock();
3120 if (!t)
3122 disconnect(j.str.c_str());
3123 return;
3126 if (t->alerts().should_post<file_error_alert>())
3127 t->alerts().post_alert(file_error_alert(j.error_file, t->get_handle(), j.str));
3128 t->set_error(j.str);
3129 t->pause();
3130 return;
3133 #ifdef TORRENT_VERBOSE_LOGGING
3134 (*m_logger) << time_now_string()
3135 << " ==> PIECE [ piece: " << r.piece << " | s: " << r.start
3136 << " | l: " << r.length << " ]\n";
3137 #endif
3139 write_piece(r, buffer);
3140 setup_send();
3143 void peer_connection::assign_bandwidth(int channel, int amount)
3145 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3147 #ifdef TORRENT_VERBOSE_LOGGING
3148 (*m_logger) << "bandwidth [ " << channel << " ] + " << amount << "\n";
3149 #endif
3151 m_bandwidth_limit[channel].assign(amount);
3152 TORRENT_ASSERT(m_channel_state[channel] == peer_info::bw_global);
3153 m_channel_state[channel] = peer_info::bw_idle;
3154 if (channel == upload_channel)
3156 setup_send();
3158 else if (channel == download_channel)
3160 setup_receive();
3164 void peer_connection::expire_bandwidth(int channel, int amount)
3166 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3168 m_bandwidth_limit[channel].expire(amount);
3169 if (channel == upload_channel)
3171 setup_send();
3173 else if (channel == download_channel)
3175 setup_receive();
3179 void peer_connection::setup_send()
3181 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3183 if (m_channel_state[upload_channel] != peer_info::bw_idle) return;
3185 shared_ptr<torrent> t = m_torrent.lock();
3187 if (m_bandwidth_limit[upload_channel].quota_left() == 0
3188 && !m_send_buffer.empty()
3189 && !m_connecting
3190 && t
3191 && !m_ignore_bandwidth_limits)
3193 // in this case, we have data to send, but no
3194 // bandwidth. So, we simply request bandwidth
3195 // from the torrent
3196 TORRENT_ASSERT(t);
3197 if (m_bandwidth_limit[upload_channel].max_assignable() > 0)
3199 int priority = is_interesting() * 2 + m_requests_in_buffer.size();
3200 // peers that we are not interested in are non-prioritized
3201 m_channel_state[upload_channel] = peer_info::bw_torrent;
3202 t->request_bandwidth(upload_channel, self()
3203 , m_send_buffer.size(), priority);
3204 #ifdef TORRENT_VERBOSE_LOGGING
3205 (*m_logger) << time_now_string() << " *** REQUEST_BANDWIDTH [ upload prio: "
3206 << priority << "]\n";
3207 #endif
3210 return;
3213 if (!can_write())
3215 #ifdef TORRENT_VERBOSE_LOGGING
3216 (*m_logger) << time_now_string() << " *** CANNOT WRITE ["
3217 " quota: " << m_bandwidth_limit[download_channel].quota_left() <<
3218 " ignore: " << (m_ignore_bandwidth_limits?"yes":"no") <<
3219 " buf: " << m_send_buffer.size() <<
3220 " connecting: " << (m_connecting?"yes":"no") <<
3221 " ]\n";
3222 #endif
3223 return;
3226 // send the actual buffer
3227 if (!m_send_buffer.empty())
3229 int amount_to_send = m_send_buffer.size();
3230 int quota_left = m_bandwidth_limit[upload_channel].quota_left();
3231 if (!m_ignore_bandwidth_limits && amount_to_send > quota_left)
3232 amount_to_send = quota_left;
3234 TORRENT_ASSERT(amount_to_send > 0);
3236 #ifdef TORRENT_VERBOSE_LOGGING
3237 (*m_logger) << time_now_string() << " *** ASYNC_WRITE [ bytes: " << amount_to_send << " ]\n";
3238 #endif
3239 std::list<asio::const_buffer> const& vec = m_send_buffer.build_iovec(amount_to_send);
3240 m_socket->async_write_some(vec, bind(&peer_connection::on_send_data, self(), _1, _2));
3242 m_channel_state[upload_channel] = peer_info::bw_network;
3246 void peer_connection::setup_receive()
3248 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3250 INVARIANT_CHECK;
3252 if (m_channel_state[download_channel] != peer_info::bw_idle) return;
3254 shared_ptr<torrent> t = m_torrent.lock();
3256 if (m_bandwidth_limit[download_channel].quota_left() == 0
3257 && !m_connecting
3258 && t
3259 && !m_ignore_bandwidth_limits)
3261 if (m_bandwidth_limit[download_channel].max_assignable() > 0)
3263 #ifdef TORRENT_VERBOSE_LOGGING
3264 (*m_logger) << time_now_string() << " *** REQUEST_BANDWIDTH [ download ]\n";
3265 #endif
3266 TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_idle);
3267 m_channel_state[download_channel] = peer_info::bw_torrent;
3268 t->request_bandwidth(download_channel, self()
3269 , m_download_queue.size() * 16 * 1024 + 30, m_priority);
3271 return;
3274 if (!can_read())
3276 #ifdef TORRENT_VERBOSE_LOGGING
3277 (*m_logger) << time_now_string() << " *** CANNOT READ ["
3278 " quota: " << m_bandwidth_limit[download_channel].quota_left() <<
3279 " ignore: " << (m_ignore_bandwidth_limits?"yes":"no") <<
3280 " outstanding: " << m_outstanding_writing_bytes <<
3281 " outstanding-limit: " << m_ses.settings().max_outstanding_disk_bytes_per_connection <<
3282 " ]\n";
3283 #endif
3284 return;
3287 TORRENT_ASSERT(m_packet_size > 0);
3288 int max_receive = m_packet_size - m_recv_pos;
3289 int quota_left = m_bandwidth_limit[download_channel].quota_left();
3290 if (!m_ignore_bandwidth_limits && max_receive > quota_left)
3291 max_receive = quota_left;
3293 if (max_receive == 0) return;
3295 TORRENT_ASSERT(m_recv_pos >= 0);
3296 TORRENT_ASSERT(m_packet_size > 0);
3297 TORRENT_ASSERT(can_read());
3298 #ifdef TORRENT_VERBOSE_LOGGING
3299 (*m_logger) << time_now_string() << " *** ASYNC_READ [ max: " << max_receive << " bytes ]\n";
3300 #endif
3302 int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size;
3304 if (int(m_recv_buffer.size()) < regular_buffer_size)
3305 m_recv_buffer.resize(regular_buffer_size);
3307 if (!m_disk_recv_buffer || regular_buffer_size >= m_recv_pos + max_receive)
3309 // only receive into regular buffer
3310 TORRENT_ASSERT(m_recv_pos + max_receive <= int(m_recv_buffer.size()));
3311 m_socket->async_read_some(asio::buffer(&m_recv_buffer[m_recv_pos]
3312 , max_receive), bind(&peer_connection::on_receive_data, self(), _1, _2));
3314 else if (m_recv_pos >= regular_buffer_size)
3316 // only receive into disk buffer
3317 TORRENT_ASSERT(m_recv_pos - regular_buffer_size >= 0);
3318 TORRENT_ASSERT(m_recv_pos - regular_buffer_size + max_receive <= m_disk_recv_buffer_size);
3319 m_socket->async_read_some(asio::buffer(m_disk_recv_buffer.get() + m_recv_pos - regular_buffer_size
3320 , max_receive)
3321 , bind(&peer_connection::on_receive_data, self(), _1, _2));
3323 else
3325 // receive into both regular and disk buffer
3326 TORRENT_ASSERT(max_receive + m_recv_pos > regular_buffer_size);
3327 TORRENT_ASSERT(m_recv_pos < regular_buffer_size);
3328 TORRENT_ASSERT(max_receive - regular_buffer_size
3329 + m_recv_pos <= m_disk_recv_buffer_size);
3331 boost::array<asio::mutable_buffer, 2> vec;
3332 vec[0] = asio::buffer(&m_recv_buffer[m_recv_pos]
3333 , regular_buffer_size - m_recv_pos);
3334 vec[1] = asio::buffer(m_disk_recv_buffer.get()
3335 , max_receive - regular_buffer_size + m_recv_pos);
3336 m_socket->async_read_some(vec, bind(&peer_connection::on_receive_data
3337 , self(), _1, _2));
3339 m_channel_state[download_channel] = peer_info::bw_network;
3342 #ifndef TORRENT_DISABLE_ENCRYPTION
3344 // returns the last 'bytes' from the receive buffer
3345 std::pair<buffer::interval, buffer::interval> peer_connection::wr_recv_buffers(int bytes)
3347 TORRENT_ASSERT(bytes <= m_recv_pos);
3349 std::pair<buffer::interval, buffer::interval> vec;
3350 int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size;
3351 TORRENT_ASSERT(regular_buffer_size >= 0);
3352 if (!m_disk_recv_buffer || regular_buffer_size >= m_recv_pos)
3354 vec.first = buffer::interval(&m_recv_buffer[0]
3355 + m_recv_pos - bytes, &m_recv_buffer[0] + m_recv_pos);
3356 vec.second = buffer::interval(0,0);
3358 else if (m_recv_pos - bytes >= regular_buffer_size)
3360 vec.first = buffer::interval(m_disk_recv_buffer.get() + m_recv_pos
3361 - regular_buffer_size - bytes, m_disk_recv_buffer.get() + m_recv_pos
3362 - regular_buffer_size);
3363 vec.second = buffer::interval(0,0);
3365 else
3367 TORRENT_ASSERT(m_recv_pos - bytes < regular_buffer_size);
3368 TORRENT_ASSERT(m_recv_pos > regular_buffer_size);
3369 vec.first = buffer::interval(&m_recv_buffer[0] + m_recv_pos - bytes
3370 , &m_recv_buffer[0] + regular_buffer_size);
3371 vec.second = buffer::interval(m_disk_recv_buffer.get()
3372 , m_disk_recv_buffer.get() + m_recv_pos - regular_buffer_size);
3374 TORRENT_ASSERT(vec.first.left() + vec.second.left() == bytes);
3375 return vec;
3377 #endif
3379 void peer_connection::reset_recv_buffer(int packet_size)
3381 TORRENT_ASSERT(packet_size > 0);
3382 if (m_recv_pos > m_packet_size)
3384 cut_receive_buffer(m_packet_size, packet_size);
3385 return;
3387 m_recv_pos = 0;
3388 m_packet_size = packet_size;
3391 void peer_connection::send_buffer(char const* buf, int size, int flags)
3393 if (flags == message_type_request)
3394 m_requests_in_buffer.push_back(m_send_buffer.size() + size);
3396 int free_space = m_send_buffer.space_in_last_buffer();
3397 if (free_space > size) free_space = size;
3398 if (free_space > 0)
3400 m_send_buffer.append(buf, free_space);
3401 size -= free_space;
3402 buf += free_space;
3403 #ifdef TORRENT_STATS
3404 m_ses.m_buffer_usage_logger << log_time() << " send_buffer: "
3405 << free_space << std::endl;
3406 m_ses.log_buffer_usage();
3407 #endif
3409 if (size <= 0) return;
3411 std::pair<char*, int> buffer = m_ses.allocate_buffer(size);
3412 if (buffer.first == 0)
3414 disconnect("out of memory");
3415 return;
3417 TORRENT_ASSERT(buffer.second >= size);
3418 std::memcpy(buffer.first, buf, size);
3419 m_send_buffer.append_buffer(buffer.first, buffer.second, size
3420 , bind(&session_impl::free_buffer, boost::ref(m_ses), _1, buffer.second));
3421 #ifdef TORRENT_STATS
3422 m_ses.m_buffer_usage_logger << log_time() << " send_buffer_alloc: " << size << std::endl;
3423 m_ses.log_buffer_usage();
3424 #endif
3425 setup_send();
3428 // TODO: change this interface to automatically call setup_send() when the
3429 // return value is destructed
3430 buffer::interval peer_connection::allocate_send_buffer(int size)
3432 TORRENT_ASSERT(size > 0);
3433 char* insert = m_send_buffer.allocate_appendix(size);
3434 if (insert == 0)
3436 std::pair<char*, int> buffer = m_ses.allocate_buffer(size);
3437 if (buffer.first == 0)
3439 disconnect("out of memory");
3440 return buffer::interval(0, 0);
3442 TORRENT_ASSERT(buffer.second >= size);
3443 m_send_buffer.append_buffer(buffer.first, buffer.second, size
3444 , bind(&session_impl::free_buffer, boost::ref(m_ses), _1, buffer.second));
3445 buffer::interval ret(buffer.first, buffer.first + size);
3446 #ifdef TORRENT_STATS
3447 m_ses.m_buffer_usage_logger << log_time() << " allocate_buffer_alloc: " << size << std::endl;
3448 m_ses.log_buffer_usage();
3449 #endif
3450 return ret;
3452 else
3454 #ifdef TORRENT_STATS
3455 m_ses.m_buffer_usage_logger << log_time() << " allocate_buffer: " << size << std::endl;
3456 m_ses.log_buffer_usage();
3457 #endif
3458 buffer::interval ret(insert, insert + size);
3459 return ret;
3463 template<class T>
3464 struct set_to_zero
3466 set_to_zero(T& v, bool cond): m_val(v), m_cond(cond) {}
3467 void fire() { if (!m_cond) return; m_cond = false; m_val = 0; }
3468 ~set_to_zero() { if (m_cond) m_val = 0; }
3469 private:
3470 T& m_val;
3471 bool m_cond;
3474 // --------------------------
3475 // RECEIVE DATA
3476 // --------------------------
3478 // throws exception when the client should be disconnected
3479 void peer_connection::on_receive_data(const error_code& error
3480 , std::size_t bytes_transferred)
3482 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3484 INVARIANT_CHECK;
3486 TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_network);
3487 m_channel_state[download_channel] = peer_info::bw_idle;
3489 if (error)
3491 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
3492 (*m_logger) << time_now_string() << " **ERROR**: "
3493 << error.message() << "[in peer_connection::on_receive_data]\n";
3494 #endif
3495 on_receive(error, bytes_transferred);
3496 disconnect(error.message().c_str());
3497 return;
3500 int max_receive = 0;
3503 #ifdef TORRENT_VERBOSE_LOGGING
3504 (*m_logger) << "read " << bytes_transferred << " bytes\n";
3505 #endif
3506 // correct the dl quota usage, if not all of the buffer was actually read
3507 if (!m_ignore_bandwidth_limits)
3508 m_bandwidth_limit[download_channel].use_quota(bytes_transferred);
3510 if (m_disconnecting) return;
3512 TORRENT_ASSERT(m_packet_size > 0);
3513 TORRENT_ASSERT(bytes_transferred > 0);
3515 m_last_receive = time_now();
3516 m_recv_pos += bytes_transferred;
3517 TORRENT_ASSERT(m_recv_pos <= int(m_recv_buffer.size()
3518 + m_disk_recv_buffer_size));
3520 #ifndef NDEBUG
3521 size_type cur_payload_dl = m_statistics.last_payload_downloaded();
3522 size_type cur_protocol_dl = m_statistics.last_protocol_downloaded();
3523 #endif
3524 on_receive(error, bytes_transferred);
3525 #ifndef NDEBUG
3526 TORRENT_ASSERT(m_statistics.last_payload_downloaded() - cur_payload_dl >= 0);
3527 TORRENT_ASSERT(m_statistics.last_protocol_downloaded() - cur_protocol_dl >= 0);
3528 size_type stats_diff = m_statistics.last_payload_downloaded() - cur_payload_dl +
3529 m_statistics.last_protocol_downloaded() - cur_protocol_dl;
3530 TORRENT_ASSERT(stats_diff == bytes_transferred);
3531 #endif
3533 TORRENT_ASSERT(m_packet_size > 0);
3535 if (m_peer_choked
3536 && m_recv_pos == 0
3537 && (m_recv_buffer.capacity() - m_packet_size) > 128)
3539 buffer(m_packet_size).swap(m_recv_buffer);
3542 max_receive = m_packet_size - m_recv_pos;
3543 int quota_left = m_bandwidth_limit[download_channel].quota_left();
3544 if (!m_ignore_bandwidth_limits && max_receive > quota_left)
3545 max_receive = quota_left;
3547 if (max_receive == 0) break;
3549 int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size;
3551 if (int(m_recv_buffer.size()) < regular_buffer_size)
3552 m_recv_buffer.resize(regular_buffer_size);
3554 error_code ec;
3555 if (!m_disk_recv_buffer || regular_buffer_size >= m_recv_pos + max_receive)
3557 // only receive into regular buffer
3558 TORRENT_ASSERT(m_recv_pos + max_receive <= int(m_recv_buffer.size()));
3559 bytes_transferred = m_socket->read_some(asio::buffer(&m_recv_buffer[m_recv_pos]
3560 , max_receive), ec);
3562 else if (m_recv_pos >= regular_buffer_size)
3564 // only receive into disk buffer
3565 TORRENT_ASSERT(m_recv_pos - regular_buffer_size >= 0);
3566 TORRENT_ASSERT(m_recv_pos - regular_buffer_size + max_receive <= m_disk_recv_buffer_size);
3567 bytes_transferred = m_socket->read_some(asio::buffer(m_disk_recv_buffer.get()
3568 + m_recv_pos - regular_buffer_size, (std::min)(m_packet_size
3569 - m_recv_pos, max_receive)), ec);
3571 else
3573 // receive into both regular and disk buffer
3574 TORRENT_ASSERT(max_receive + m_recv_pos > regular_buffer_size);
3575 TORRENT_ASSERT(m_recv_pos < regular_buffer_size);
3576 TORRENT_ASSERT(max_receive - regular_buffer_size
3577 + m_recv_pos <= m_disk_recv_buffer_size);
3579 boost::array<asio::mutable_buffer, 2> vec;
3580 vec[0] = asio::buffer(&m_recv_buffer[m_recv_pos]
3581 , regular_buffer_size - m_recv_pos);
3582 vec[1] = asio::buffer(m_disk_recv_buffer.get()
3583 , (std::min)(m_disk_recv_buffer_size
3584 , max_receive - regular_buffer_size + m_recv_pos));
3585 bytes_transferred = m_socket->read_some(vec, ec);
3587 if (ec && ec != asio::error::would_block)
3589 disconnect(ec.message().c_str());
3590 return;
3592 if (ec == asio::error::would_block) break;
3594 while (bytes_transferred > 0);
3596 setup_receive();
3599 bool peer_connection::can_write() const
3601 // if we have requests or pending data to be sent or announcements to be made
3602 // we want to send data
3603 return !m_send_buffer.empty()
3604 && (m_bandwidth_limit[upload_channel].quota_left() > 0
3605 || m_ignore_bandwidth_limits)
3606 && !m_connecting;
3609 bool peer_connection::can_read() const
3611 bool ret = (m_bandwidth_limit[download_channel].quota_left() > 0
3612 || m_ignore_bandwidth_limits)
3613 && !m_connecting
3614 && m_outstanding_writing_bytes <
3615 m_ses.settings().max_outstanding_disk_bytes_per_connection;
3617 return ret;
3620 void peer_connection::connect(int ticket)
3622 #ifndef NDEBUG
3623 // in case we disconnect here, we need to
3624 // keep the connection alive until the
3625 // exit invariant check is run
3626 boost::intrusive_ptr<peer_connection> me(self());
3627 #endif
3628 INVARIANT_CHECK;
3630 error_code ec;
3631 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
3632 (*m_ses.m_logger) << time_now_string() << " CONNECTING: " << m_remote.address().to_string(ec)
3633 << ":" << m_remote.port() << "\n";
3634 #endif
3636 m_connection_ticket = ticket;
3637 boost::shared_ptr<torrent> t = m_torrent.lock();
3639 m_queued = false;
3640 TORRENT_ASSERT(m_connecting);
3642 if (!t)
3644 disconnect("torrent aborted");
3645 return;
3648 m_socket->open(t->get_interface().protocol(), ec);
3649 if (ec)
3651 disconnect(ec.message().c_str());
3652 return;
3655 // set the socket to non-blocking, so that we can
3656 // read the entire buffer on each read event we get
3657 tcp::socket::non_blocking_io ioc(true);
3658 m_socket->io_control(ioc, ec);
3659 if (ec)
3661 disconnect(ec.message().c_str());
3662 return;
3665 tcp::endpoint bind_interface = t->get_interface();
3667 std::pair<int, int> const& out_ports = m_ses.settings().outgoing_ports;
3668 if (out_ports.first > 0 && out_ports.second >= out_ports.first)
3670 m_socket->set_option(socket_acceptor::reuse_address(true), ec);
3671 if (ec)
3673 disconnect(ec.message().c_str());
3674 return;
3676 bind_interface.port(m_ses.next_port());
3679 m_socket->bind(bind_interface, ec);
3680 if (ec)
3682 disconnect(ec.message().c_str());
3683 return;
3685 m_socket->async_connect(m_remote
3686 , bind(&peer_connection::on_connection_complete, self(), _1));
3687 m_connect = time_now();
3688 m_statistics.sent_syn();
3690 if (t->alerts().should_post<peer_connect_alert>())
3692 t->alerts().post_alert(peer_connect_alert(
3693 t->get_handle(), remote(), pid()));
3697 void peer_connection::on_connection_complete(error_code const& e)
3699 ptime completed = time_now();
3701 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3703 INVARIANT_CHECK;
3705 m_rtt = total_milliseconds(completed - m_connect);
3707 if (m_disconnecting) return;
3709 m_connecting = false;
3710 m_ses.m_half_open.done(m_connection_ticket);
3712 if (e)
3714 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
3715 (*m_ses.m_logger) << time_now_string() << " CONNECTION FAILED: " << m_remote.address().to_string()
3716 << ": " << e.message() << "\n";
3717 #endif
3718 disconnect(e.message().c_str(), 1);
3719 return;
3722 if (m_disconnecting) return;
3723 m_last_receive = time_now();
3725 // this means the connection just succeeded
3727 m_statistics.received_synack();
3729 TORRENT_ASSERT(m_socket);
3730 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING
3731 (*m_ses.m_logger) << time_now_string() << " COMPLETED: " << m_remote.address().to_string()
3732 << " rtt = " << m_rtt << "\n";
3733 #endif
3735 error_code ec;
3736 if (m_remote == m_socket->local_endpoint(ec))
3738 // if the remote endpoint is the same as the local endpoint, we're connected
3739 // to ourselves
3740 disconnect("connected to ourselves", 1);
3741 return;
3744 if (m_remote.address().is_v4())
3746 error_code ec;
3747 m_socket->set_option(type_of_service(m_ses.settings().peer_tos), ec);
3750 on_connected();
3751 setup_send();
3752 setup_receive();
3755 // --------------------------
3756 // SEND DATA
3757 // --------------------------
3759 // throws exception when the client should be disconnected
3760 void peer_connection::on_send_data(error_code const& error
3761 , std::size_t bytes_transferred)
3763 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3765 INVARIANT_CHECK;
3767 TORRENT_ASSERT(m_channel_state[upload_channel] == peer_info::bw_network);
3769 m_send_buffer.pop_front(bytes_transferred);
3771 for (std::vector<int>::iterator i = m_requests_in_buffer.begin()
3772 , end(m_requests_in_buffer.end()); i != end; ++i)
3773 *i -= bytes_transferred;
3775 while (!m_requests_in_buffer.empty()
3776 && m_requests_in_buffer.front() <= 0)
3777 m_requests_in_buffer.erase(m_requests_in_buffer.begin());
3779 m_channel_state[upload_channel] = peer_info::bw_idle;
3781 if (!m_ignore_bandwidth_limits)
3782 m_bandwidth_limit[upload_channel].use_quota(bytes_transferred);
3784 #ifdef TORRENT_VERBOSE_LOGGING
3785 (*m_logger) << "wrote " << bytes_transferred << " bytes\n";
3786 #endif
3788 if (error)
3790 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
3791 (*m_logger) << "**ERROR**: " << error.message() << " [in peer_connection::on_send_data]\n";
3792 #endif
3793 disconnect(error.message().c_str());
3794 return;
3796 if (m_disconnecting) return;
3798 TORRENT_ASSERT(!m_connecting);
3799 TORRENT_ASSERT(bytes_transferred > 0);
3801 m_last_sent = time_now();
3803 #ifndef NDEBUG
3804 size_type cur_payload_ul = m_statistics.last_payload_uploaded();
3805 size_type cur_protocol_ul = m_statistics.last_protocol_uploaded();
3806 #endif
3807 on_sent(error, bytes_transferred);
3808 #ifndef NDEBUG
3809 TORRENT_ASSERT(m_statistics.last_payload_uploaded() - cur_payload_ul >= 0);
3810 TORRENT_ASSERT(m_statistics.last_protocol_uploaded() - cur_protocol_ul >= 0);
3811 size_type stats_diff = m_statistics.last_payload_uploaded() - cur_payload_ul
3812 + m_statistics.last_protocol_uploaded() - cur_protocol_ul;
3813 TORRENT_ASSERT(stats_diff == bytes_transferred);
3814 #endif
3816 fill_send_buffer();
3818 setup_send();
3821 #ifndef NDEBUG
3822 void peer_connection::check_invariant() const
3824 TORRENT_ASSERT(bool(m_disk_recv_buffer) == (m_disk_recv_buffer_size > 0));
3826 boost::shared_ptr<torrent> t = m_torrent.lock();
3827 if (m_disconnecting)
3829 TORRENT_ASSERT(!t);
3830 TORRENT_ASSERT(m_disconnect_started);
3832 else if (!m_in_constructor)
3834 TORRENT_ASSERT(m_ses.has_peer((peer_connection*)this));
3838 // this assertion correct most of the time, but sometimes right when the
3839 // limit is changed it might break
3840 for (int i = 0; i < 2; ++i)
3842 // this peer is in the bandwidth history iff max_assignable < limit
3843 TORRENT_ASSERT((m_bandwidth_limit[i].max_assignable() < m_bandwidth_limit[i].throttle())
3844 == m_ses.m_bandwidth_manager[i]->is_in_history(this)
3845 || m_bandwidth_limit[i].throttle() == bandwidth_limit::inf);
3849 if (m_channel_state[download_channel] == peer_info::bw_torrent
3850 || m_channel_state[download_channel] == peer_info::bw_global)
3851 TORRENT_ASSERT(m_bandwidth_limit[download_channel].quota_left() == 0);
3852 if (m_channel_state[upload_channel] == peer_info::bw_torrent
3853 || m_channel_state[upload_channel] == peer_info::bw_global)
3854 TORRENT_ASSERT(m_bandwidth_limit[upload_channel].quota_left() == 0);
3856 std::set<piece_block> unique;
3857 std::transform(m_download_queue.begin(), m_download_queue.end()
3858 , std::inserter(unique, unique.begin()), boost::bind(&pending_block::block, _1));
3859 std::copy(m_request_queue.begin(), m_request_queue.end(), std::inserter(unique, unique.begin()));
3860 TORRENT_ASSERT(unique.size() == m_download_queue.size() + m_request_queue.size());
3861 if (m_peer_info)
3863 TORRENT_ASSERT(m_peer_info->prev_amount_upload == 0);
3864 TORRENT_ASSERT(m_peer_info->prev_amount_download == 0);
3865 TORRENT_ASSERT(m_peer_info->connection == this
3866 || m_peer_info->connection == 0);
3868 if (m_peer_info->optimistically_unchoked)
3869 TORRENT_ASSERT(!is_choked());
3872 TORRENT_ASSERT(m_have_piece.count() == m_num_pieces);
3874 if (!t)
3876 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3877 // since this connection doesn't have a torrent reference
3878 // no torrent should have a reference to this connection either
3879 for (aux::session_impl::torrent_map::const_iterator i = m_ses.m_torrents.begin()
3880 , end(m_ses.m_torrents.end()); i != end; ++i)
3881 TORRENT_ASSERT(!i->second->has_peer((peer_connection*)this));
3882 #endif
3883 return;
3886 if (t->ready_for_connections() && m_initialized)
3887 TORRENT_ASSERT(t->torrent_file().num_pieces() == int(m_have_piece.size()));
3889 if (m_ses.settings().close_redundant_connections)
3891 // make sure upload only peers are disconnected
3892 if (t->is_finished() && m_upload_only)
3893 TORRENT_ASSERT(m_disconnect_started);
3894 if (m_upload_only
3895 && !m_interesting
3896 && m_bitfield_received
3897 && t->are_files_checked())
3898 TORRENT_ASSERT(m_disconnect_started);
3901 if (!m_disconnect_started && m_initialized)
3903 // none of this matters if we're disconnecting anyway
3904 if (t->is_finished())
3905 TORRENT_ASSERT(!m_interesting);
3906 if (is_seed())
3907 TORRENT_ASSERT(m_upload_only);
3910 if (t->has_picker())
3912 std::map<piece_block, int> num_requests;
3913 for (torrent::const_peer_iterator i = t->begin(); i != t->end(); ++i)
3915 // make sure this peer is not a dangling pointer
3916 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3917 TORRENT_ASSERT(m_ses.has_peer(*i));
3918 #endif
3919 peer_connection const& p = *(*i);
3920 for (std::deque<piece_block>::const_iterator i = p.request_queue().begin()
3921 , end(p.request_queue().end()); i != end; ++i)
3922 ++num_requests[*i];
3923 for (std::deque<pending_block>::const_iterator i = p.download_queue().begin()
3924 , end(p.download_queue().end()); i != end; ++i)
3925 ++num_requests[i->block];
3927 for (std::map<piece_block, int>::iterator i = num_requests.begin()
3928 , end(num_requests.end()); i != end; ++i)
3930 if (!t->picker().is_downloaded(i->first))
3931 TORRENT_ASSERT(t->picker().num_peers(i->first) == i->second);
3934 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3935 if (m_peer_info)
3937 policy::const_iterator i = t->get_policy().begin_peer();
3938 policy::const_iterator end = t->get_policy().end_peer();
3939 for (; i != end; ++i)
3941 if (&i->second == m_peer_info) break;
3943 TORRENT_ASSERT(i != end);
3945 #endif
3946 if (t->has_picker() && !t->is_aborted())
3948 // make sure that pieces that have completed the download
3949 // of all their blocks are in the disk io thread's queue
3950 // to be checked.
3951 const std::vector<piece_picker::downloading_piece>& dl_queue
3952 = t->picker().get_download_queue();
3953 for (std::vector<piece_picker::downloading_piece>::const_iterator i =
3954 dl_queue.begin(); i != dl_queue.end(); ++i)
3956 const int blocks_per_piece = t->picker().blocks_in_piece(i->index);
3958 bool complete = true;
3959 for (int j = 0; j < blocks_per_piece; ++j)
3961 if (i->info[j].state == piece_picker::block_info::state_finished)
3962 continue;
3963 complete = false;
3964 break;
3967 // this invariant is not valid anymore since the completion event
3968 // might be queued in the io service
3969 if (complete && !piece_failed)
3971 disk_io_job ret = m_ses.m_disk_thread.find_job(
3972 &t->filesystem(), -1, i->index);
3973 TORRENT_ASSERT(ret.action == disk_io_job::hash || ret.action == disk_io_job::write);
3974 TORRENT_ASSERT(ret.piece == i->index);
3980 // extremely expensive invariant check
3982 if (!t->is_seed())
3984 piece_picker& p = t->picker();
3985 const std::vector<piece_picker::downloading_piece>& dlq = p.get_download_queue();
3986 const int blocks_per_piece = static_cast<int>(
3987 t->torrent_file().piece_length() / t->block_size());
3989 for (std::vector<piece_picker::downloading_piece>::const_iterator i =
3990 dlq.begin(); i != dlq.end(); ++i)
3992 for (int j = 0; j < blocks_per_piece; ++j)
3994 if (std::find(m_request_queue.begin(), m_request_queue.end()
3995 , piece_block(i->index, j)) != m_request_queue.end()
3997 std::find(m_download_queue.begin(), m_download_queue.end()
3998 , piece_block(i->index, j)) != m_download_queue.end())
4000 TORRENT_ASSERT(i->info[j].peer == m_remote);
4002 else
4004 TORRENT_ASSERT(i->info[j].peer != m_remote || i->info[j].finished);
4011 #endif
4013 peer_connection::peer_speed_t peer_connection::peer_speed()
4015 shared_ptr<torrent> t = m_torrent.lock();
4016 TORRENT_ASSERT(t);
4018 int download_rate = int(statistics().download_payload_rate());
4019 int torrent_download_rate = int(t->statistics().download_payload_rate());
4021 if (download_rate > 512 && download_rate > torrent_download_rate / 16)
4022 m_speed = fast;
4023 else if (download_rate > 4096 && download_rate > torrent_download_rate / 64)
4024 m_speed = medium;
4025 else if (download_rate < torrent_download_rate / 15 && m_speed == fast)
4026 m_speed = medium;
4027 else
4028 m_speed = slow;
4030 return m_speed;
4033 void peer_connection::keep_alive()
4035 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
4036 INVARIANT_CHECK;
4037 #endif
4039 time_duration d;
4040 d = time_now() - m_last_sent;
4041 if (total_seconds(d) < m_timeout / 2) return;
4043 if (m_connecting) return;
4044 if (in_handshake()) return;
4046 // if the last send has not completed yet, do not send a keep
4047 // alive
4048 if (m_channel_state[upload_channel] != peer_info::bw_idle) return;
4050 #ifdef TORRENT_VERBOSE_LOGGING
4051 (*m_logger) << time_now_string() << " ==> KEEPALIVE\n";
4052 #endif
4054 m_last_sent = time_now();
4055 write_keepalive();
4058 bool peer_connection::is_seed() const
4060 // if m_num_pieces == 0, we probably don't have the
4061 // metadata yet.
4062 boost::shared_ptr<torrent> t = m_torrent.lock();
4063 return m_num_pieces == (int)m_have_piece.size() && m_num_pieces > 0 && t && t->valid_metadata();