AUTO_LT_SYNC
[tore.git] / libtorrent / src / disk_io_thread.cpp
blob0635a60b5890156615db148f4538ce08dbef3014
1 /*
3 Copyright (c) 2007, Arvid Norberg
4 All rights reserved.
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #include "libtorrent/storage.hpp"
34 #include <deque>
35 #include "libtorrent/disk_io_thread.hpp"
36 #include "libtorrent/disk_buffer_holder.hpp"
37 #include <boost/scoped_array.hpp>
39 #ifdef _WIN32
40 #include <malloc.h>
41 #ifndef alloca
42 #define alloca(s) _alloca(s)
43 #endif
44 #endif
46 #ifdef TORRENT_DISK_STATS
47 #include "libtorrent/time.hpp"
48 #endif
50 namespace libtorrent
53 disk_io_thread::disk_io_thread(asio::io_service& ios, int block_size)
54 : m_abort(false)
55 , m_queue_buffer_size(0)
56 , m_cache_size(512) // 512 * 16kB = 8MB
57 , m_cache_expiry(60) // 1 minute
58 , m_coalesce_writes(true)
59 , m_coalesce_reads(true)
60 , m_use_read_cache(true)
61 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
62 , m_pool(block_size)
63 #endif
64 , m_block_size(block_size)
65 , m_ios(ios)
66 , m_disk_io_thread(boost::ref(*this))
68 #ifdef TORRENT_STATS
69 m_allocations = 0;
70 #endif
71 #ifdef TORRENT_DISK_STATS
72 m_log.open("disk_io_thread.log", std::ios::trunc);
73 #endif
76 disk_io_thread::~disk_io_thread()
78 TORRENT_ASSERT(m_abort == true);
81 void disk_io_thread::join()
83 mutex_t::scoped_lock l(m_queue_mutex);
84 disk_io_job j;
85 j.action = disk_io_job::abort_thread;
86 m_jobs.insert(m_jobs.begin(), j);
87 m_signal.notify_all();
88 l.unlock();
90 m_disk_io_thread.join();
93 void disk_io_thread::get_cache_info(sha1_hash const& ih, std::vector<cached_piece_info>& ret) const
95 mutex_t::scoped_lock l(m_piece_mutex);
96 ret.clear();
97 ret.reserve(m_pieces.size());
98 for (cache_t::const_iterator i = m_pieces.begin()
99 , end(m_pieces.end()); i != end; ++i)
101 torrent_info const& ti = *i->storage->info();
102 if (ti.info_hash() != ih) continue;
103 cached_piece_info info;
104 info.piece = i->piece;
105 info.last_use = i->last_use;
106 info.kind = cached_piece_info::write_cache;
107 int blocks_in_piece = (ti.piece_size(i->piece) + (m_block_size) - 1) / m_block_size;
108 info.blocks.resize(blocks_in_piece);
109 for (int b = 0; b < blocks_in_piece; ++b)
110 if (i->blocks[b]) info.blocks[b] = true;
111 ret.push_back(info);
113 for (cache_t::const_iterator i = m_read_pieces.begin()
114 , end(m_read_pieces.end()); i != end; ++i)
116 torrent_info const& ti = *i->storage->info();
117 if (ti.info_hash() != ih) continue;
118 cached_piece_info info;
119 info.piece = i->piece;
120 info.last_use = i->last_use;
121 info.kind = cached_piece_info::read_cache;
122 int blocks_in_piece = (ti.piece_size(i->piece) + (m_block_size) - 1) / m_block_size;
123 info.blocks.resize(blocks_in_piece);
124 for (int b = 0; b < blocks_in_piece; ++b)
125 if (i->blocks[b]) info.blocks[b] = true;
126 ret.push_back(info);
130 cache_status disk_io_thread::status() const
132 mutex_t::scoped_lock l(m_piece_mutex);
133 return m_cache_stats;
136 void disk_io_thread::set_cache_size(int s)
138 mutex_t::scoped_lock l(m_piece_mutex);
139 TORRENT_ASSERT(s >= 0);
140 m_cache_size = s;
143 void disk_io_thread::set_cache_expiry(int ex)
145 mutex_t::scoped_lock l(m_piece_mutex);
146 TORRENT_ASSERT(ex > 0);
147 m_cache_expiry = ex;
150 // aborts read operations
151 void disk_io_thread::stop(boost::intrusive_ptr<piece_manager> s)
153 mutex_t::scoped_lock l(m_queue_mutex);
154 // read jobs are aborted, write and move jobs are syncronized
155 for (std::list<disk_io_job>::iterator i = m_jobs.begin();
156 i != m_jobs.end();)
158 if (i->storage != s)
160 ++i;
161 continue;
163 if (i->action == disk_io_job::read)
165 if (i->callback) m_ios.post(bind(i->callback, -1, *i));
166 m_jobs.erase(i++);
167 continue;
169 if (i->action == disk_io_job::check_files)
171 if (i->callback) m_ios.post(bind(i->callback
172 , piece_manager::disk_check_aborted, *i));
173 m_jobs.erase(i++);
174 continue;
176 ++i;
178 m_signal.notify_all();
181 bool range_overlap(int start1, int length1, int start2, int length2)
183 return (start1 <= start2 && start1 + length1 > start2)
184 || (start2 <= start1 && start2 + length2 > start1);
187 namespace
189 // The semantic of this operator is:
190 // should lhs come before rhs in the job queue
191 bool operator<(disk_io_job const& lhs, disk_io_job const& rhs)
193 // NOTE: comparison inverted to make higher priority
194 // skip _in_front_of_ lower priority
195 if (lhs.priority > rhs.priority) return true;
196 if (lhs.priority < rhs.priority) return false;
198 if (lhs.storage.get() < rhs.storage.get()) return true;
199 if (lhs.storage.get() > rhs.storage.get()) return false;
200 if (lhs.piece < rhs.piece) return true;
201 if (lhs.piece > rhs.piece) return false;
202 if (lhs.offset < rhs.offset) return true;
203 // if (lhs.offset > rhs.offset) return false;
204 return false;
208 disk_io_thread::cache_t::iterator disk_io_thread::find_cached_piece(
209 disk_io_thread::cache_t& cache
210 , disk_io_job const& j, mutex_t::scoped_lock& l)
212 for (cache_t::iterator i = cache.begin()
213 , end(cache.end()); i != end; ++i)
215 if (i->storage != j.storage || i->piece != j.piece) continue;
216 return i;
218 return cache.end();
221 void disk_io_thread::flush_expired_pieces()
223 ptime now = time_now();
225 mutex_t::scoped_lock l(m_piece_mutex);
227 INVARIANT_CHECK;
228 for (;;)
230 cache_t::iterator i = std::min_element(
231 m_pieces.begin(), m_pieces.end()
232 , bind(&cached_piece_entry::last_use, _1)
233 < bind(&cached_piece_entry::last_use, _2));
234 if (i == m_pieces.end()) return;
235 int age = total_seconds(now - i->last_use);
236 if (age < m_cache_expiry) return;
237 flush_and_remove(i, l);
241 void disk_io_thread::free_piece(cached_piece_entry& p, mutex_t::scoped_lock& l)
243 int piece_size = p.storage->info()->piece_size(p.piece);
244 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
246 for (int i = 0; i < blocks_in_piece; ++i)
248 if (p.blocks[i] == 0) continue;
249 free_buffer(p.blocks[i]);
250 p.blocks[i] = 0;
251 --p.num_blocks;
252 --m_cache_stats.cache_size;
253 --m_cache_stats.read_cache_size;
257 bool disk_io_thread::clear_oldest_read_piece(
258 cache_t::iterator ignore
259 , mutex_t::scoped_lock& l)
261 INVARIANT_CHECK;
263 cache_t::iterator i = std::min_element(
264 m_read_pieces.begin(), m_read_pieces.end()
265 , bind(&cached_piece_entry::last_use, _1)
266 < bind(&cached_piece_entry::last_use, _2));
267 if (i != m_read_pieces.end() && i != ignore)
269 // don't replace an entry that is less than one second old
270 if (time_now() - i->last_use < seconds(1)) return false;
271 free_piece(*i, l);
272 m_read_pieces.erase(i);
273 return true;
275 return false;
278 void disk_io_thread::flush_oldest_piece(mutex_t::scoped_lock& l)
280 INVARIANT_CHECK;
281 // first look if there are any read cache entries that can
282 // be cleared
283 if (clear_oldest_read_piece(m_read_pieces.end(), l)) return;
285 cache_t::iterator i = std::min_element(
286 m_pieces.begin(), m_pieces.end()
287 , bind(&cached_piece_entry::last_use, _1)
288 < bind(&cached_piece_entry::last_use, _2));
289 if (i == m_pieces.end()) return;
290 flush_and_remove(i, l);
293 void disk_io_thread::flush_and_remove(disk_io_thread::cache_t::iterator e
294 , mutex_t::scoped_lock& l)
296 flush(e, l);
297 m_pieces.erase(e);
300 void disk_io_thread::flush(disk_io_thread::cache_t::iterator e
301 , mutex_t::scoped_lock& l)
303 INVARIANT_CHECK;
304 cached_piece_entry& p = *e;
305 int piece_size = p.storage->info()->piece_size(p.piece);
306 #ifdef TORRENT_DISK_STATS
307 m_log << log_time() << " flushing " << piece_size << std::endl;
308 #endif
309 TORRENT_ASSERT(piece_size > 0);
310 boost::scoped_array<char> buf;
311 if (m_coalesce_writes) buf.reset(new (std::nothrow) char[piece_size]);
313 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
314 int buffer_size = 0;
315 int offset = 0;
316 for (int i = 0; i <= blocks_in_piece; ++i)
318 if (i == blocks_in_piece || p.blocks[i] == 0)
320 if (buffer_size == 0) continue;
321 TORRENT_ASSERT(buf);
323 TORRENT_ASSERT(buffer_size <= i * m_block_size);
324 l.unlock();
325 p.storage->write_impl(buf.get(), p.piece, (std::min)(
326 i * m_block_size, piece_size) - buffer_size, buffer_size);
327 l.lock();
328 ++m_cache_stats.writes;
329 // std::cerr << " flushing p: " << p.piece << " bytes: " << buffer_size << std::endl;
330 buffer_size = 0;
331 offset = 0;
332 continue;
334 int block_size = (std::min)(piece_size - i * m_block_size, m_block_size);
335 TORRENT_ASSERT(offset + block_size <= piece_size);
336 TORRENT_ASSERT(offset + block_size > 0);
337 if (!buf)
339 l.unlock();
340 p.storage->write_impl(p.blocks[i], p.piece, i * m_block_size, block_size);
341 l.lock();
342 ++m_cache_stats.writes;
344 else
346 std::memcpy(buf.get() + offset, p.blocks[i], block_size);
347 offset += m_block_size;
348 buffer_size += block_size;
350 free_buffer(p.blocks[i]);
351 p.blocks[i] = 0;
352 TORRENT_ASSERT(p.num_blocks > 0);
353 --p.num_blocks;
354 ++m_cache_stats.blocks_written;
355 --m_cache_stats.cache_size;
357 TORRENT_ASSERT(buffer_size == 0);
358 // std::cerr << " flushing p: " << p.piece << " cached_blocks: " << m_cache_stats.cache_size << std::endl;
359 #ifndef NDEBUG
360 for (int i = 0; i < blocks_in_piece; ++i)
361 TORRENT_ASSERT(p.blocks[i] == 0);
362 #endif
365 void disk_io_thread::cache_block(disk_io_job& j, mutex_t::scoped_lock& l)
367 INVARIANT_CHECK;
368 TORRENT_ASSERT(find_cached_piece(m_pieces, j, l) == m_pieces.end());
369 cached_piece_entry p;
371 int piece_size = j.storage->info()->piece_size(j.piece);
372 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
374 p.piece = j.piece;
375 p.storage = j.storage;
376 p.last_use = time_now();
377 p.num_blocks = 1;
378 p.blocks.reset(new char*[blocks_in_piece]);
379 std::memset(&p.blocks[0], 0, blocks_in_piece * sizeof(char*));
380 int block = j.offset / m_block_size;
381 // std::cerr << " adding cache entry for p: " << j.piece << " block: " << block << " cached_blocks: " << m_cache_stats.cache_size << std::endl;
382 p.blocks[block] = j.buffer;
383 ++m_cache_stats.cache_size;
384 m_pieces.push_back(p);
387 // fills a piece with data from disk, returns the total number of bytes
388 // read or -1 if there was an error
389 int disk_io_thread::read_into_piece(cached_piece_entry& p, int start_block, mutex_t::scoped_lock& l)
391 int piece_size = p.storage->info()->piece_size(p.piece);
392 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
394 int end_block = start_block;
395 for (int i = start_block; i < blocks_in_piece
396 && m_cache_stats.cache_size < m_cache_size; ++i)
398 // this is a block that is already allocated
399 // stop allocating and don't read more than
400 // what we've allocated now
401 if (p.blocks[i]) break;
402 p.blocks[i] = allocate_buffer();
404 // the allocation failed, break
405 if (p.blocks[i] == 0) break;
406 ++p.num_blocks;
407 ++m_cache_stats.cache_size;
408 ++m_cache_stats.read_cache_size;
409 ++end_block;
412 if (end_block == start_block) return -2;
414 int buffer_size = piece_size - (end_block - 1) * m_block_size + (end_block - start_block - 1) * m_block_size;
415 TORRENT_ASSERT(buffer_size <= piece_size);
416 TORRENT_ASSERT(buffer_size + start_block * m_block_size <= piece_size);
417 boost::scoped_array<char> buf;
418 if (m_coalesce_reads) buf.reset(new (std::nothrow) char[buffer_size]);
419 int ret = 0;
420 if (buf)
422 l.unlock();
423 ret += p.storage->read_impl(buf.get(), p.piece, start_block * m_block_size, buffer_size);
424 l.lock();
425 if (p.storage->error()) { return -1; }
426 ++m_cache_stats.reads;
429 int piece_offset = start_block * m_block_size;
430 int offset = 0;
431 for (int i = start_block; i < end_block; ++i)
433 int block_size = (std::min)(piece_size - piece_offset, m_block_size);
434 if (p.blocks[i] == 0) break;
435 TORRENT_ASSERT(offset <= buffer_size);
436 TORRENT_ASSERT(piece_offset <= piece_size);
437 if (buf)
439 std::memcpy(p.blocks[i], buf.get() + offset, block_size);
441 else
443 l.unlock();
444 ret += p.storage->read_impl(p.blocks[i], p.piece, piece_offset, block_size);
445 if (!p.storage->error()) { return -1; }
446 l.lock();
447 ++m_cache_stats.reads;
449 offset += m_block_size;
450 piece_offset += m_block_size;
452 TORRENT_ASSERT(ret <= buffer_size);
453 return (ret != buffer_size) ? -1 : ret;
456 bool disk_io_thread::make_room(int num_blocks
457 , cache_t::iterator ignore
458 , mutex_t::scoped_lock& l)
460 if (m_cache_size - m_cache_stats.cache_size < num_blocks)
462 // there's not enough room in the cache, clear a piece
463 // from the read cache
464 if (!clear_oldest_read_piece(ignore, l)) return false;
467 return m_cache_size - m_cache_stats.cache_size >= num_blocks;
470 // returns -1 on read error, -2 if there isn't any space in the cache
471 // or the number of bytes read
472 int disk_io_thread::cache_read_block(disk_io_job const& j, mutex_t::scoped_lock& l)
474 INVARIANT_CHECK;
476 int piece_size = j.storage->info()->piece_size(j.piece);
477 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
479 int start_block = j.offset / m_block_size;
481 if (!make_room(blocks_in_piece - start_block
482 , m_read_pieces.end(), l)) return -2;
484 cached_piece_entry p;
485 p.piece = j.piece;
486 p.storage = j.storage;
487 p.last_use = time_now();
488 p.num_blocks = 0;
489 p.blocks.reset(new char*[blocks_in_piece]);
490 std::memset(&p.blocks[0], 0, blocks_in_piece * sizeof(char*));
491 int ret = read_into_piece(p, start_block, l);
493 if (ret == -1)
494 free_piece(p, l);
495 else
496 m_read_pieces.push_back(p);
498 return ret;
501 #ifndef NDEBUG
502 void disk_io_thread::check_invariant() const
504 int cached_write_blocks = 0;
505 for (cache_t::const_iterator i = m_pieces.begin()
506 , end(m_pieces.end()); i != end; ++i)
508 cached_piece_entry const& p = *i;
509 TORRENT_ASSERT(p.blocks);
511 if (!p.storage) continue;
512 int piece_size = p.storage->info()->piece_size(p.piece);
513 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
514 int blocks = 0;
515 for (int k = 0; k < blocks_in_piece; ++k)
517 if (p.blocks[k])
519 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
520 TORRENT_ASSERT(is_disk_buffer(p.blocks[k]));
521 #endif
522 ++blocks;
525 // TORRENT_ASSERT(blocks == p.num_blocks);
526 cached_write_blocks += blocks;
529 int cached_read_blocks = 0;
530 for (cache_t::const_iterator i = m_read_pieces.begin()
531 , end(m_read_pieces.end()); i != end; ++i)
533 cached_piece_entry const& p = *i;
534 TORRENT_ASSERT(p.blocks);
536 int piece_size = p.storage->info()->piece_size(p.piece);
537 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
538 int blocks = 0;
539 for (int k = 0; k < blocks_in_piece; ++k)
541 if (p.blocks[k])
543 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
544 TORRENT_ASSERT(is_disk_buffer(p.blocks[k]));
545 #endif
546 ++blocks;
549 // TORRENT_ASSERT(blocks == p.num_blocks);
550 cached_read_blocks += blocks;
553 TORRENT_ASSERT(cached_read_blocks + cached_write_blocks == m_cache_stats.cache_size);
554 TORRENT_ASSERT(cached_read_blocks == m_cache_stats.read_cache_size);
556 // when writing, there may be a one block difference, right before an old piece
557 // is flushed
558 TORRENT_ASSERT(m_cache_stats.cache_size <= m_cache_size + 1);
560 #endif
562 int disk_io_thread::try_read_from_cache(disk_io_job const& j)
564 TORRENT_ASSERT(j.buffer);
566 mutex_t::scoped_lock l(m_piece_mutex);
567 if (!m_use_read_cache) return -2;
569 cache_t::iterator p
570 = find_cached_piece(m_read_pieces, j, l);
572 bool hit = true;
573 int ret = 0;
575 // if the piece cannot be found in the cache,
576 // read the whole piece starting at the block
577 // we got a request for.
578 if (p == m_read_pieces.end())
580 ret = cache_read_block(j, l);
581 hit = false;
582 if (ret < 0) return ret;
583 p = m_read_pieces.end();
584 --p;
585 TORRENT_ASSERT(!m_read_pieces.empty());
586 TORRENT_ASSERT(p->piece == j.piece);
587 TORRENT_ASSERT(p->storage == j.storage);
590 if (p != m_read_pieces.end())
592 // copy from the cache and update the last use timestamp
593 int block = j.offset / m_block_size;
594 int block_offset = j.offset % m_block_size;
595 int buffer_offset = 0;
596 int size = j.buffer_size;
597 if (p->blocks[block] == 0)
599 int piece_size = j.storage->info()->piece_size(j.piece);
600 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
601 int end_block = block;
602 while (end_block < blocks_in_piece && p->blocks[end_block] == 0) ++end_block;
603 if (!make_room(end_block - block, p, l)) return -2;
604 ret = read_into_piece(*p, block, l);
605 hit = false;
606 if (ret < 0) return ret;
607 TORRENT_ASSERT(p->blocks[block]);
610 p->last_use = time_now();
611 while (size > 0)
613 TORRENT_ASSERT(p->blocks[block]);
614 int to_copy = (std::min)(m_block_size
615 - block_offset, size);
616 std::memcpy(j.buffer + buffer_offset
617 , p->blocks[block] + block_offset
618 , to_copy);
619 size -= to_copy;
620 block_offset = 0;
621 buffer_offset += to_copy;
622 ++block;
624 ret = j.buffer_size;
625 ++m_cache_stats.blocks_read;
626 if (hit) ++m_cache_stats.blocks_read_hit;
628 return ret;
631 void disk_io_thread::add_job(disk_io_job const& j
632 , boost::function<void(int, disk_io_job const&)> const& f)
634 TORRENT_ASSERT(!j.callback);
635 TORRENT_ASSERT(j.storage);
636 TORRENT_ASSERT(j.buffer_size <= m_block_size);
637 mutex_t::scoped_lock l(m_queue_mutex);
639 std::list<disk_io_job>::reverse_iterator i = m_jobs.rbegin();
640 if (j.action == disk_io_job::read)
642 // when we're reading, we may not skip
643 // ahead of any write operation that overlaps
644 // the region we're reading
645 for (; i != m_jobs.rend(); i++)
647 // if *i should come before j, stop
648 // and insert j before i
649 if (*i < j) break;
650 // if we come across a write operation that
651 // overlaps the region we're reading, we need
652 // to stop
653 if (i->action == disk_io_job::write
654 && i->storage == j.storage
655 && i->piece == j.piece
656 && range_overlap(i->offset, i->buffer_size
657 , j.offset, j.buffer_size))
658 break;
661 else if (j.action == disk_io_job::write)
663 for (; i != m_jobs.rend(); ++i)
665 if (*i < j)
667 if (i != m_jobs.rbegin()
668 && i.base()->storage.get() != j.storage.get())
669 i = m_jobs.rbegin();
670 break;
675 // if we are placed in front of all other jobs, put it on the back of
676 // the queue, to sweep the disk in the same direction, and to avoid
677 // starvation. The exception is if the priority is higher than the
678 // job at the front of the queue
679 if (i == m_jobs.rend() && (m_jobs.empty() || j.priority <= m_jobs.back().priority))
680 i = m_jobs.rbegin();
682 std::list<disk_io_job>::iterator k = m_jobs.insert(i.base(), j);
683 k->callback.swap(const_cast<boost::function<void(int, disk_io_job const&)>&>(f));
684 if (j.action == disk_io_job::write)
685 m_queue_buffer_size += j.buffer_size;
686 TORRENT_ASSERT(j.storage.get());
687 m_signal.notify_all();
690 #ifndef NDEBUG
691 bool disk_io_thread::is_disk_buffer(char* buffer) const
693 #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
694 return true;
695 #else
696 mutex_t::scoped_lock l(m_pool_mutex);
697 return m_pool.is_from(buffer);
698 #endif
700 #endif
702 char* disk_io_thread::allocate_buffer()
704 mutex_t::scoped_lock l(m_pool_mutex);
705 #ifdef TORRENT_STATS
706 ++m_allocations;
707 #endif
708 #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
709 return (char*)malloc(m_block_size);
710 #else
711 return (char*)m_pool.ordered_malloc();
712 #endif
715 void disk_io_thread::free_buffer(char* buf)
717 mutex_t::scoped_lock l(m_pool_mutex);
718 #ifdef TORRENT_STATS
719 --m_allocations;
720 #endif
721 #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
722 free(buf);
723 #else
724 m_pool.ordered_free(buf);
725 #endif
728 bool disk_io_thread::test_error(disk_io_job& j)
730 error_code const& ec = j.storage->error();
731 if (ec)
733 j.str = ec.message();
734 j.error = ec;
735 j.error_file = j.storage->error_file();
736 j.storage->clear_error();
737 #ifndef NDEBUG
738 std::cout << "ERROR: '" << j.str << "' " << j.error_file << std::endl;
739 #endif
740 return true;
742 return false;
745 void disk_io_thread::operator()()
747 for (;;)
749 #ifdef TORRENT_DISK_STATS
750 m_log << log_time() << " idle" << std::endl;
751 #endif
752 mutex_t::scoped_lock jl(m_queue_mutex);
754 while (m_jobs.empty() && !m_abort)
755 m_signal.wait(jl);
756 if (m_abort && m_jobs.empty())
758 jl.unlock();
760 mutex_t::scoped_lock l(m_piece_mutex);
761 // flush all disk caches
762 for (cache_t::iterator i = m_pieces.begin()
763 , end(m_pieces.end()); i != end; ++i)
764 flush(i, l);
765 for (cache_t::iterator i = m_read_pieces.begin()
766 , end(m_read_pieces.end()); i != end; ++i)
767 free_piece(*i, l);
768 m_pieces.clear();
769 m_read_pieces.clear();
770 return;
773 // if there's a buffer in this job, it will be freed
774 // when this holder is destructed, unless it has been
775 // released.
776 disk_buffer_holder holder(*this
777 , m_jobs.front().action != disk_io_job::check_fastresume
778 ? m_jobs.front().buffer : 0);
780 boost::function<void(int, disk_io_job const&)> handler;
781 handler.swap(m_jobs.front().callback);
783 disk_io_job j = m_jobs.front();
784 m_jobs.pop_front();
785 m_queue_buffer_size -= j.buffer_size;
786 jl.unlock();
788 flush_expired_pieces();
790 int ret = 0;
792 TORRENT_ASSERT(j.storage || j.action == disk_io_job::abort_thread);
793 #ifdef TORRENT_DISK_STATS
794 ptime start = time_now();
795 #endif
796 #ifndef BOOST_NO_EXCEPTIONS
797 try {
798 #endif
800 switch (j.action)
802 case disk_io_job::abort_thread:
804 mutex_t::scoped_lock jl(m_queue_mutex);
805 m_abort = true;
807 for (std::list<disk_io_job>::iterator i = m_jobs.begin();
808 i != m_jobs.end();)
810 if (i->action == disk_io_job::read)
812 if (i->callback) m_ios.post(bind(i->callback, -1, *i));
813 m_jobs.erase(i++);
814 continue;
816 if (i->action == disk_io_job::check_files)
818 if (i->callback) m_ios.post(bind(i->callback
819 , piece_manager::disk_check_aborted, *i));
820 m_jobs.erase(i++);
821 continue;
823 ++i;
825 break;
827 case disk_io_job::read:
829 if (test_error(j))
831 ret = -1;
832 return;
834 #ifdef TORRENT_DISK_STATS
835 m_log << log_time() << " read " << j.buffer_size << std::endl;
836 #endif
837 INVARIANT_CHECK;
838 TORRENT_ASSERT(j.buffer == 0);
839 j.buffer = allocate_buffer();
840 TORRENT_ASSERT(j.buffer_size <= m_block_size);
841 if (j.buffer == 0)
843 ret = -1;
844 j.error = error_code(ENOMEM, get_posix_category());
845 j.str = j.error.message();
846 break;
849 disk_buffer_holder read_holder(*this, j.buffer);
850 ret = try_read_from_cache(j);
852 // -2 means there's no space in the read cache
853 // or that the read cache is disabled
854 if (ret == -1)
856 j.buffer = 0;
857 test_error(j);
858 break;
860 else if (ret == -2)
862 ret = j.storage->read_impl(j.buffer, j.piece, j.offset
863 , j.buffer_size);
864 if (ret < 0)
866 test_error(j);
867 break;
869 ++m_cache_stats.blocks_read;
871 read_holder.release();
872 break;
874 case disk_io_job::write:
876 if (test_error(j))
878 ret = -1;
879 break;
881 #ifdef TORRENT_DISK_STATS
882 m_log << log_time() << " write " << j.buffer_size << std::endl;
883 #endif
884 mutex_t::scoped_lock l(m_piece_mutex);
885 INVARIANT_CHECK;
886 cache_t::iterator p
887 = find_cached_piece(m_pieces, j, l);
888 int block = j.offset / m_block_size;
889 TORRENT_ASSERT(j.buffer);
890 TORRENT_ASSERT(j.buffer_size <= m_block_size);
891 if (p != m_pieces.end())
893 TORRENT_ASSERT(p->blocks[block] == 0);
894 if (p->blocks[block])
896 free_buffer(p->blocks[block]);
897 --p->num_blocks;
899 p->blocks[block] = j.buffer;
900 ++m_cache_stats.cache_size;
901 ++p->num_blocks;
902 p->last_use = time_now();
904 else
906 cache_block(j, l);
908 // we've now inserted the buffer
909 // in the cache, we should not
910 // free it at the end
911 holder.release();
912 if (m_cache_stats.cache_size >= m_cache_size)
913 flush_oldest_piece(l);
914 break;
916 case disk_io_job::hash:
918 #ifdef TORRENT_DISK_STATS
919 m_log << log_time() << " hash" << std::endl;
920 #endif
921 mutex_t::scoped_lock l(m_piece_mutex);
922 INVARIANT_CHECK;
924 cache_t::iterator i
925 = find_cached_piece(m_pieces, j, l);
926 if (i != m_pieces.end())
928 flush_and_remove(i, l);
929 if (test_error(j))
931 ret = -1;
932 j.storage->mark_failed(j.piece);
933 break;
936 l.unlock();
937 sha1_hash h = j.storage->hash_for_piece_impl(j.piece);
938 if (test_error(j))
940 ret = -1;
941 j.storage->mark_failed(j.piece);
942 break;
944 ret = (j.storage->info()->hash_for_piece(j.piece) == h)?0:-2;
945 if (ret == -2) j.storage->mark_failed(j.piece);
946 break;
948 case disk_io_job::move_storage:
950 #ifdef TORRENT_DISK_STATS
951 m_log << log_time() << " move" << std::endl;
952 #endif
953 TORRENT_ASSERT(j.buffer == 0);
954 ret = j.storage->move_storage_impl(j.str) ? 1 : 0;
955 if (ret != 0)
957 test_error(j);
958 break;
960 j.str = j.storage->save_path().string();
961 break;
963 case disk_io_job::release_files:
965 #ifdef TORRENT_DISK_STATS
966 m_log << log_time() << " release" << std::endl;
967 #endif
968 TORRENT_ASSERT(j.buffer == 0);
970 mutex_t::scoped_lock l(m_piece_mutex);
971 INVARIANT_CHECK;
973 for (cache_t::iterator i = m_pieces.begin(); i != m_pieces.end();)
975 if (i->storage == j.storage)
977 flush(i, l);
978 i = m_pieces.erase(i);
980 else
982 ++i;
985 l.unlock();
986 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
988 mutex_t::scoped_lock l(m_pool_mutex);
989 m_pool.release_memory();
991 #endif
992 ret = j.storage->release_files_impl();
993 if (ret != 0) test_error(j);
994 break;
996 case disk_io_job::clear_read_cache:
998 #ifdef TORRENT_DISK_STATS
999 m_log << log_time() << " clear-cache" << std::endl;
1000 #endif
1001 TORRENT_ASSERT(j.buffer == 0);
1003 mutex_t::scoped_lock l(m_piece_mutex);
1004 INVARIANT_CHECK;
1006 for (cache_t::iterator i = m_read_pieces.begin();
1007 i != m_read_pieces.end();)
1009 if (i->storage == j.storage)
1011 free_piece(*i, l);
1012 i = m_read_pieces.erase(i);
1014 else
1016 ++i;
1019 l.unlock();
1020 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
1022 mutex_t::scoped_lock l(m_pool_mutex);
1023 m_pool.release_memory();
1025 #endif
1026 ret = 0;
1027 break;
1029 case disk_io_job::delete_files:
1031 #ifdef TORRENT_DISK_STATS
1032 m_log << log_time() << " delete" << std::endl;
1033 #endif
1034 TORRENT_ASSERT(j.buffer == 0);
1036 mutex_t::scoped_lock l(m_piece_mutex);
1037 INVARIANT_CHECK;
1039 cache_t::iterator i = std::remove_if(
1040 m_pieces.begin(), m_pieces.end(), bind(&cached_piece_entry::storage, _1) == j.storage);
1042 for (cache_t::iterator k = i; k != m_pieces.end(); ++k)
1044 torrent_info const& ti = *k->storage->info();
1045 int blocks_in_piece = (ti.piece_size(k->piece) + m_block_size - 1) / m_block_size;
1046 for (int j = 0; j < blocks_in_piece; ++j)
1048 if (k->blocks[j] == 0) continue;
1049 free_buffer(k->blocks[j]);
1050 k->blocks[j] = 0;
1051 --m_cache_stats.cache_size;
1054 m_pieces.erase(i, m_pieces.end());
1055 l.unlock();
1056 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
1058 mutex_t::scoped_lock l(m_pool_mutex);
1059 m_pool.release_memory();
1061 #endif
1062 ret = j.storage->delete_files_impl();
1063 if (ret != 0) test_error(j);
1064 break;
1066 case disk_io_job::check_fastresume:
1068 #ifdef TORRENT_DISK_STATS
1069 m_log << log_time() << " check fastresume" << std::endl;
1070 #endif
1071 lazy_entry const* rd = (lazy_entry const*)j.buffer;
1072 TORRENT_ASSERT(rd != 0);
1073 ret = j.storage->check_fastresume(*rd, j.str);
1074 break;
1076 case disk_io_job::check_files:
1078 #ifdef TORRENT_DISK_STATS
1079 m_log << log_time() << " check files" << std::endl;
1080 #endif
1081 int piece_size = j.storage->info()->piece_length();
1082 for (int processed = 0; processed < 4 * 1024 * 1024; processed += piece_size)
1084 ret = j.storage->check_files(j.piece, j.offset, j.str);
1086 #ifndef BOOST_NO_EXCEPTIONS
1087 try {
1088 #endif
1089 TORRENT_ASSERT(handler);
1090 if (handler && ret == piece_manager::need_full_check)
1091 m_ios.post(bind(handler, ret, j));
1092 #ifndef BOOST_NO_EXCEPTIONS
1093 } catch (std::exception&) {}
1094 #endif
1095 if (ret != piece_manager::need_full_check) break;
1097 if (test_error(j))
1099 ret = piece_manager::fatal_disk_error;
1100 break;
1102 TORRENT_ASSERT(ret != -2 || !j.str.empty());
1104 // if the check is not done, add it at the end of the job queue
1105 if (ret == piece_manager::need_full_check)
1107 add_job(j, handler);
1108 continue;
1110 break;
1112 case disk_io_job::save_resume_data:
1114 #ifdef TORRENT_DISK_STATS
1115 m_log << log_time() << " save resume data" << std::endl;
1116 #endif
1117 j.resume_data.reset(new entry(entry::dictionary_t));
1118 j.storage->write_resume_data(*j.resume_data);
1119 ret = 0;
1120 break;
1122 case disk_io_job::rename_file:
1124 #ifdef TORRENT_DISK_STATS
1125 m_log << log_time() << " rename file" << std::endl;
1126 #endif
1127 ret = j.storage->rename_file_impl(j.piece, j.str);
1130 #ifndef BOOST_NO_EXCEPTIONS
1131 } catch (std::exception& e)
1133 ret = -1;
1136 j.str = e.what();
1138 catch (std::exception&) {}
1140 #endif
1142 // if (!handler) std::cerr << "DISK THREAD: no callback specified" << std::endl;
1143 // else std::cerr << "DISK THREAD: invoking callback" << std::endl;
1144 #ifndef BOOST_NO_EXCEPTIONS
1145 try {
1146 #endif
1147 TORRENT_ASSERT(ret != -2 || !j.str.empty()
1148 || j.action == disk_io_job::hash);
1149 if (handler) m_ios.post(bind(handler, ret, j));
1150 #ifndef BOOST_NO_EXCEPTIONS
1151 } catch (std::exception&)
1153 TORRENT_ASSERT(false);
1155 #endif
1157 TORRENT_ASSERT(false);