LT SYNC and README changes
[tore.git] / libtorrent / src / disk_io_thread.cpp
blobdd9554d396bc98e4e9f2de176b93e2282e86a3c4
1 /*
3 Copyright (c) 2007, Arvid Norberg
4 All rights reserved.
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #include "libtorrent/storage.hpp"
34 #include <deque>
35 #include "libtorrent/disk_io_thread.hpp"
36 #include "libtorrent/disk_buffer_holder.hpp"
37 #include <boost/scoped_array.hpp>
39 #ifdef _WIN32
40 #include <malloc.h>
41 #ifndef alloca
42 #define alloca(s) _alloca(s)
43 #endif
44 #endif
46 #ifdef TORRENT_DISK_STATS
47 #include "libtorrent/time.hpp"
48 #endif
50 namespace libtorrent
53 disk_io_thread::disk_io_thread(asio::io_service& ios, int block_size)
54 : m_abort(false)
55 , m_queue_buffer_size(0)
56 , m_cache_size(512) // 512 * 16kB = 8MB
57 , m_cache_expiry(60) // 1 minute
58 , m_coalesce_writes(true)
59 , m_coalesce_reads(true)
60 , m_use_read_cache(true)
61 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
62 , m_pool(block_size)
63 #endif
64 , m_block_size(block_size)
65 , m_ios(ios)
66 , m_disk_io_thread(boost::ref(*this))
68 #ifdef TORRENT_STATS
69 m_allocations = 0;
70 #endif
71 #ifdef TORRENT_DISK_STATS
72 m_log.open("disk_io_thread.log", std::ios::trunc);
73 #endif
76 disk_io_thread::~disk_io_thread()
78 TORRENT_ASSERT(m_abort == true);
81 void disk_io_thread::join()
83 mutex_t::scoped_lock l(m_queue_mutex);
84 disk_io_job j;
85 j.action = disk_io_job::abort_thread;
86 m_jobs.insert(m_jobs.begin(), j);
87 m_signal.notify_all();
88 l.unlock();
90 m_disk_io_thread.join();
93 void disk_io_thread::get_cache_info(sha1_hash const& ih, std::vector<cached_piece_info>& ret) const
95 mutex_t::scoped_lock l(m_piece_mutex);
96 ret.clear();
97 ret.reserve(m_pieces.size());
98 for (cache_t::const_iterator i = m_pieces.begin()
99 , end(m_pieces.end()); i != end; ++i)
101 torrent_info const& ti = *i->storage->info();
102 if (ti.info_hash() != ih) continue;
103 cached_piece_info info;
104 info.piece = i->piece;
105 info.last_use = i->last_use;
106 info.kind = cached_piece_info::write_cache;
107 int blocks_in_piece = (ti.piece_size(i->piece) + (m_block_size) - 1) / m_block_size;
108 info.blocks.resize(blocks_in_piece);
109 for (int b = 0; b < blocks_in_piece; ++b)
110 if (i->blocks[b]) info.blocks[b] = true;
111 ret.push_back(info);
113 for (cache_t::const_iterator i = m_read_pieces.begin()
114 , end(m_read_pieces.end()); i != end; ++i)
116 torrent_info const& ti = *i->storage->info();
117 if (ti.info_hash() != ih) continue;
118 cached_piece_info info;
119 info.piece = i->piece;
120 info.last_use = i->last_use;
121 info.kind = cached_piece_info::read_cache;
122 int blocks_in_piece = (ti.piece_size(i->piece) + (m_block_size) - 1) / m_block_size;
123 info.blocks.resize(blocks_in_piece);
124 for (int b = 0; b < blocks_in_piece; ++b)
125 if (i->blocks[b]) info.blocks[b] = true;
126 ret.push_back(info);
130 cache_status disk_io_thread::status() const
132 mutex_t::scoped_lock l(m_piece_mutex);
133 return m_cache_stats;
136 void disk_io_thread::set_cache_size(int s)
138 mutex_t::scoped_lock l(m_piece_mutex);
139 TORRENT_ASSERT(s >= 0);
140 m_cache_size = s;
143 void disk_io_thread::set_cache_expiry(int ex)
145 mutex_t::scoped_lock l(m_piece_mutex);
146 TORRENT_ASSERT(ex > 0);
147 m_cache_expiry = ex;
150 // aborts read operations
151 void disk_io_thread::stop(boost::intrusive_ptr<piece_manager> s)
153 mutex_t::scoped_lock l(m_queue_mutex);
154 // read jobs are aborted, write and move jobs are syncronized
155 for (std::list<disk_io_job>::iterator i = m_jobs.begin();
156 i != m_jobs.end();)
158 if (i->storage != s)
160 ++i;
161 continue;
163 if (i->action == disk_io_job::read)
165 if (i->callback) m_ios.post(bind(i->callback, -1, *i));
166 m_jobs.erase(i++);
167 continue;
169 if (i->action == disk_io_job::check_files)
171 if (i->callback) m_ios.post(bind(i->callback
172 , piece_manager::disk_check_aborted, *i));
173 m_jobs.erase(i++);
174 continue;
176 ++i;
178 m_signal.notify_all();
181 bool range_overlap(int start1, int length1, int start2, int length2)
183 return (start1 <= start2 && start1 + length1 > start2)
184 || (start2 <= start1 && start2 + length2 > start1);
187 namespace
189 // The semantic of this operator is:
190 // should lhs come before rhs in the job queue
191 bool operator<(disk_io_job const& lhs, disk_io_job const& rhs)
193 // NOTE: comparison inverted to make higher priority
194 // skip _in_front_of_ lower priority
195 if (lhs.priority > rhs.priority) return true;
196 if (lhs.priority < rhs.priority) return false;
198 if (lhs.storage.get() < rhs.storage.get()) return true;
199 if (lhs.storage.get() > rhs.storage.get()) return false;
200 if (lhs.piece < rhs.piece) return true;
201 if (lhs.piece > rhs.piece) return false;
202 if (lhs.offset < rhs.offset) return true;
203 // if (lhs.offset > rhs.offset) return false;
204 return false;
208 disk_io_thread::cache_t::iterator disk_io_thread::find_cached_piece(
209 disk_io_thread::cache_t& cache
210 , disk_io_job const& j, mutex_t::scoped_lock& l)
212 for (cache_t::iterator i = cache.begin()
213 , end(cache.end()); i != end; ++i)
215 if (i->storage != j.storage || i->piece != j.piece) continue;
216 return i;
218 return cache.end();
221 void disk_io_thread::flush_expired_pieces()
223 ptime now = time_now();
225 mutex_t::scoped_lock l(m_piece_mutex);
227 INVARIANT_CHECK;
228 for (;;)
230 cache_t::iterator i = std::min_element(
231 m_pieces.begin(), m_pieces.end()
232 , bind(&cached_piece_entry::last_use, _1)
233 < bind(&cached_piece_entry::last_use, _2));
234 if (i == m_pieces.end()) return;
235 int age = total_seconds(now - i->last_use);
236 if (age < m_cache_expiry) return;
237 flush_and_remove(i, l);
241 void disk_io_thread::free_piece(cached_piece_entry& p, mutex_t::scoped_lock& l)
243 int piece_size = p.storage->info()->piece_size(p.piece);
244 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
246 for (int i = 0; i < blocks_in_piece; ++i)
248 if (p.blocks[i] == 0) continue;
249 free_buffer(p.blocks[i]);
250 p.blocks[i] = 0;
251 --p.num_blocks;
252 --m_cache_stats.cache_size;
253 --m_cache_stats.read_cache_size;
257 bool disk_io_thread::clear_oldest_read_piece(
258 cache_t::iterator ignore
259 , mutex_t::scoped_lock& l)
261 INVARIANT_CHECK;
263 cache_t::iterator i = std::min_element(
264 m_read_pieces.begin(), m_read_pieces.end()
265 , bind(&cached_piece_entry::last_use, _1)
266 < bind(&cached_piece_entry::last_use, _2));
267 if (i != m_read_pieces.end() && i != ignore)
269 // don't replace an entry that is less than one second old
270 if (time_now() - i->last_use < seconds(1)) return false;
271 free_piece(*i, l);
272 m_read_pieces.erase(i);
273 return true;
275 return false;
278 void disk_io_thread::flush_oldest_piece(mutex_t::scoped_lock& l)
280 INVARIANT_CHECK;
281 // first look if there are any read cache entries that can
282 // be cleared
283 if (clear_oldest_read_piece(m_read_pieces.end(), l)) return;
285 cache_t::iterator i = std::min_element(
286 m_pieces.begin(), m_pieces.end()
287 , bind(&cached_piece_entry::last_use, _1)
288 < bind(&cached_piece_entry::last_use, _2));
289 if (i == m_pieces.end()) return;
290 flush_and_remove(i, l);
293 void disk_io_thread::flush_and_remove(disk_io_thread::cache_t::iterator e
294 , mutex_t::scoped_lock& l)
296 flush(e, l);
297 m_pieces.erase(e);
300 void disk_io_thread::flush(disk_io_thread::cache_t::iterator e
301 , mutex_t::scoped_lock& l)
303 INVARIANT_CHECK;
304 cached_piece_entry& p = *e;
305 int piece_size = p.storage->info()->piece_size(p.piece);
306 #ifdef TORRENT_DISK_STATS
307 m_log << log_time() << " flushing " << piece_size << std::endl;
308 #endif
309 TORRENT_ASSERT(piece_size > 0);
310 boost::scoped_array<char> buf;
311 if (m_coalesce_writes) buf.reset(new (std::nothrow) char[piece_size]);
313 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
314 int buffer_size = 0;
315 int offset = 0;
316 for (int i = 0; i <= blocks_in_piece; ++i)
318 if (i == blocks_in_piece || p.blocks[i] == 0)
320 if (buffer_size == 0) continue;
321 TORRENT_ASSERT(buf);
323 TORRENT_ASSERT(buffer_size <= i * m_block_size);
324 l.unlock();
325 p.storage->write_impl(buf.get(), p.piece, (std::min)(
326 i * m_block_size, piece_size) - buffer_size, buffer_size);
327 l.lock();
328 ++m_cache_stats.writes;
329 // std::cerr << " flushing p: " << p.piece << " bytes: " << buffer_size << std::endl;
330 buffer_size = 0;
331 offset = 0;
332 continue;
334 int block_size = (std::min)(piece_size - i * m_block_size, m_block_size);
335 TORRENT_ASSERT(offset + block_size <= piece_size);
336 TORRENT_ASSERT(offset + block_size > 0);
337 if (!buf)
339 l.unlock();
340 p.storage->write_impl(p.blocks[i], p.piece, i * m_block_size, block_size);
341 l.lock();
342 ++m_cache_stats.writes;
344 else
346 std::memcpy(buf.get() + offset, p.blocks[i], block_size);
347 offset += m_block_size;
348 buffer_size += block_size;
350 free_buffer(p.blocks[i]);
351 p.blocks[i] = 0;
352 TORRENT_ASSERT(p.num_blocks > 0);
353 --p.num_blocks;
354 ++m_cache_stats.blocks_written;
355 --m_cache_stats.cache_size;
357 TORRENT_ASSERT(buffer_size == 0);
358 // std::cerr << " flushing p: " << p.piece << " cached_blocks: " << m_cache_stats.cache_size << std::endl;
359 #ifndef NDEBUG
360 for (int i = 0; i < blocks_in_piece; ++i)
361 TORRENT_ASSERT(p.blocks[i] == 0);
362 #endif
365 void disk_io_thread::cache_block(disk_io_job& j, mutex_t::scoped_lock& l)
367 INVARIANT_CHECK;
368 TORRENT_ASSERT(find_cached_piece(m_pieces, j, l) == m_pieces.end());
369 cached_piece_entry p;
371 int piece_size = j.storage->info()->piece_size(j.piece);
372 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
374 p.piece = j.piece;
375 p.storage = j.storage;
376 p.last_use = time_now();
377 p.num_blocks = 1;
378 p.blocks.reset(new char*[blocks_in_piece]);
379 std::memset(&p.blocks[0], 0, blocks_in_piece * sizeof(char*));
380 int block = j.offset / m_block_size;
381 // std::cerr << " adding cache entry for p: " << j.piece << " block: " << block << " cached_blocks: " << m_cache_stats.cache_size << std::endl;
382 p.blocks[block] = j.buffer;
383 ++m_cache_stats.cache_size;
384 m_pieces.push_back(p);
387 // fills a piece with data from disk, returns the total number of bytes
388 // read or -1 if there was an error
389 int disk_io_thread::read_into_piece(cached_piece_entry& p, int start_block, mutex_t::scoped_lock& l)
391 int piece_size = p.storage->info()->piece_size(p.piece);
392 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
394 int end_block = start_block;
395 for (int i = start_block; i < blocks_in_piece
396 && m_cache_stats.cache_size < m_cache_size; ++i)
398 // this is a block that is already allocated
399 // stop allocating and don't read more than
400 // what we've allocated now
401 if (p.blocks[i]) break;
402 p.blocks[i] = allocate_buffer();
404 // the allocation failed, break
405 if (p.blocks[i] == 0) break;
406 ++p.num_blocks;
407 ++m_cache_stats.cache_size;
408 ++m_cache_stats.read_cache_size;
409 ++end_block;
412 if (end_block == start_block) return -2;
414 int buffer_size = piece_size - (end_block - 1) * m_block_size + (end_block - start_block - 1) * m_block_size;
415 TORRENT_ASSERT(buffer_size <= piece_size);
416 TORRENT_ASSERT(buffer_size + start_block * m_block_size <= piece_size);
417 boost::scoped_array<char> buf;
418 if (m_coalesce_reads) buf.reset(new (std::nothrow) char[buffer_size]);
419 int ret = 0;
420 if (buf)
422 l.unlock();
423 ret += p.storage->read_impl(buf.get(), p.piece, start_block * m_block_size, buffer_size);
424 l.lock();
425 if (p.storage->error()) { return -1; }
426 ++m_cache_stats.reads;
429 int piece_offset = start_block * m_block_size;
430 int offset = 0;
431 for (int i = start_block; i < end_block; ++i)
433 int block_size = (std::min)(piece_size - piece_offset, m_block_size);
434 if (p.blocks[i] == 0) break;
435 TORRENT_ASSERT(offset <= buffer_size);
436 TORRENT_ASSERT(piece_offset <= piece_size);
437 if (buf)
439 std::memcpy(p.blocks[i], buf.get() + offset, block_size);
441 else
443 l.unlock();
444 ret += p.storage->read_impl(p.blocks[i], p.piece, piece_offset, block_size);
445 if (!p.storage->error()) { return -1; }
446 l.lock();
447 ++m_cache_stats.reads;
449 offset += m_block_size;
450 piece_offset += m_block_size;
452 TORRENT_ASSERT(ret <= buffer_size);
453 return (ret != buffer_size) ? -1 : ret;
456 bool disk_io_thread::make_room(int num_blocks
457 , cache_t::iterator ignore
458 , mutex_t::scoped_lock& l)
460 if (m_cache_size - m_cache_stats.cache_size < num_blocks)
462 // there's not enough room in the cache, clear a piece
463 // from the read cache
464 if (!clear_oldest_read_piece(ignore, l)) return false;
467 return m_cache_size - m_cache_stats.cache_size >= num_blocks;
470 // returns -1 on read error, -2 if there isn't any space in the cache
471 // or the number of bytes read
472 int disk_io_thread::cache_read_block(disk_io_job const& j, mutex_t::scoped_lock& l)
474 INVARIANT_CHECK;
476 int piece_size = j.storage->info()->piece_size(j.piece);
477 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
479 int start_block = j.offset / m_block_size;
481 if (!make_room(blocks_in_piece - start_block
482 , m_read_pieces.end(), l)) return -2;
484 cached_piece_entry p;
485 p.piece = j.piece;
486 p.storage = j.storage;
487 p.last_use = time_now();
488 p.num_blocks = 0;
489 p.blocks.reset(new char*[blocks_in_piece]);
490 std::memset(&p.blocks[0], 0, blocks_in_piece * sizeof(char*));
491 int ret = read_into_piece(p, start_block, l);
493 if (ret == -1)
494 free_piece(p, l);
495 else
496 m_read_pieces.push_back(p);
498 return ret;
501 #ifndef NDEBUG
502 void disk_io_thread::check_invariant() const
504 int cached_write_blocks = 0;
505 for (cache_t::const_iterator i = m_pieces.begin()
506 , end(m_pieces.end()); i != end; ++i)
508 cached_piece_entry const& p = *i;
509 TORRENT_ASSERT(p.blocks);
511 if (!p.storage) continue;
512 int piece_size = p.storage->info()->piece_size(p.piece);
513 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
514 int blocks = 0;
515 for (int k = 0; k < blocks_in_piece; ++k)
517 if (p.blocks[k])
519 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
520 TORRENT_ASSERT(is_disk_buffer(p.blocks[k]));
521 #endif
522 ++blocks;
525 // TORRENT_ASSERT(blocks == p.num_blocks);
526 cached_write_blocks += blocks;
529 int cached_read_blocks = 0;
530 for (cache_t::const_iterator i = m_read_pieces.begin()
531 , end(m_read_pieces.end()); i != end; ++i)
533 cached_piece_entry const& p = *i;
534 TORRENT_ASSERT(p.blocks);
536 int piece_size = p.storage->info()->piece_size(p.piece);
537 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
538 int blocks = 0;
539 for (int k = 0; k < blocks_in_piece; ++k)
541 if (p.blocks[k])
543 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
544 TORRENT_ASSERT(is_disk_buffer(p.blocks[k]));
545 #endif
546 ++blocks;
549 // TORRENT_ASSERT(blocks == p.num_blocks);
550 cached_read_blocks += blocks;
553 TORRENT_ASSERT(cached_read_blocks + cached_write_blocks == m_cache_stats.cache_size);
554 TORRENT_ASSERT(cached_read_blocks == m_cache_stats.read_cache_size);
556 // when writing, there may be a one block difference, right before an old piece
557 // is flushed
558 TORRENT_ASSERT(m_cache_stats.cache_size <= m_cache_size + 1);
560 #endif
562 int disk_io_thread::try_read_from_cache(disk_io_job const& j)
564 TORRENT_ASSERT(j.buffer);
566 mutex_t::scoped_lock l(m_piece_mutex);
567 if (!m_use_read_cache) return -2;
569 cache_t::iterator p
570 = find_cached_piece(m_read_pieces, j, l);
572 bool hit = true;
573 int ret = 0;
575 // if the piece cannot be found in the cache,
576 // read the whole piece starting at the block
577 // we got a request for.
578 if (p == m_read_pieces.end())
580 ret = cache_read_block(j, l);
581 hit = false;
582 if (ret < 0) return ret;
583 p = m_read_pieces.end();
584 --p;
585 TORRENT_ASSERT(!m_read_pieces.empty());
586 TORRENT_ASSERT(p->piece == j.piece);
587 TORRENT_ASSERT(p->storage == j.storage);
590 if (p != m_read_pieces.end())
592 // copy from the cache and update the last use timestamp
593 int block = j.offset / m_block_size;
594 int block_offset = j.offset % m_block_size;
595 int buffer_offset = 0;
596 int size = j.buffer_size;
597 if (p->blocks[block] == 0)
599 int piece_size = j.storage->info()->piece_size(j.piece);
600 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
601 int end_block = block;
602 while (end_block < blocks_in_piece && p->blocks[end_block] == 0) ++end_block;
603 if (!make_room(end_block - block, p, l)) return -2;
604 ret = read_into_piece(*p, block, l);
605 hit = false;
606 if (ret < 0) return ret;
607 TORRENT_ASSERT(p->blocks[block]);
610 p->last_use = time_now();
611 while (size > 0)
613 TORRENT_ASSERT(p->blocks[block]);
614 int to_copy = (std::min)(m_block_size
615 - block_offset, size);
616 std::memcpy(j.buffer + buffer_offset
617 , p->blocks[block] + block_offset
618 , to_copy);
619 size -= to_copy;
620 block_offset = 0;
621 buffer_offset += to_copy;
622 ++block;
624 ret = j.buffer_size;
625 ++m_cache_stats.blocks_read;
626 if (hit) ++m_cache_stats.blocks_read_hit;
628 return ret;
631 void disk_io_thread::add_job(disk_io_job const& j
632 , boost::function<void(int, disk_io_job const&)> const& f)
634 TORRENT_ASSERT(!j.callback);
635 TORRENT_ASSERT(j.storage);
636 TORRENT_ASSERT(j.buffer_size <= m_block_size);
637 mutex_t::scoped_lock l(m_queue_mutex);
638 #ifndef NDEBUG
639 if (j.action == disk_io_job::write)
641 cache_t::iterator p
642 = find_cached_piece(m_pieces, j, l);
643 if (p != m_pieces.end())
645 int block = j.offset / m_block_size;
646 char const* buffer = p->blocks[block];
647 TORRENT_ASSERT(buffer == 0);
650 #endif
652 std::list<disk_io_job>::reverse_iterator i = m_jobs.rbegin();
653 if (j.action == disk_io_job::read)
655 // when we're reading, we may not skip
656 // ahead of any write operation that overlaps
657 // the region we're reading
658 for (; i != m_jobs.rend(); i++)
660 // if *i should come before j, stop
661 // and insert j before i
662 if (*i < j) break;
663 // if we come across a write operation that
664 // overlaps the region we're reading, we need
665 // to stop
666 if (i->action == disk_io_job::write
667 && i->storage == j.storage
668 && i->piece == j.piece
669 && range_overlap(i->offset, i->buffer_size
670 , j.offset, j.buffer_size))
671 break;
674 else if (j.action == disk_io_job::write)
676 for (; i != m_jobs.rend(); ++i)
678 if (*i < j)
680 if (i != m_jobs.rbegin()
681 && i.base()->storage.get() != j.storage.get())
682 i = m_jobs.rbegin();
683 break;
688 // if we are placed in front of all other jobs, put it on the back of
689 // the queue, to sweep the disk in the same direction, and to avoid
690 // starvation. The exception is if the priority is higher than the
691 // job at the front of the queue
692 if (i == m_jobs.rend() && (m_jobs.empty() || j.priority <= m_jobs.back().priority))
693 i = m_jobs.rbegin();
695 std::list<disk_io_job>::iterator k = m_jobs.insert(i.base(), j);
696 k->callback.swap(const_cast<boost::function<void(int, disk_io_job const&)>&>(f));
697 if (j.action == disk_io_job::write)
698 m_queue_buffer_size += j.buffer_size;
699 TORRENT_ASSERT(j.storage.get());
700 m_signal.notify_all();
703 #ifndef NDEBUG
704 bool disk_io_thread::is_disk_buffer(char* buffer) const
706 #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
707 return true;
708 #else
709 mutex_t::scoped_lock l(m_pool_mutex);
710 return m_pool.is_from(buffer);
711 #endif
713 #endif
715 char* disk_io_thread::allocate_buffer()
717 mutex_t::scoped_lock l(m_pool_mutex);
718 #ifdef TORRENT_STATS
719 ++m_allocations;
720 #endif
721 #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
722 return (char*)malloc(m_block_size);
723 #else
724 return (char*)m_pool.ordered_malloc();
725 #endif
728 void disk_io_thread::free_buffer(char* buf)
730 mutex_t::scoped_lock l(m_pool_mutex);
731 #ifdef TORRENT_STATS
732 --m_allocations;
733 #endif
734 #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
735 free(buf);
736 #else
737 m_pool.ordered_free(buf);
738 #endif
741 bool disk_io_thread::test_error(disk_io_job& j)
743 error_code const& ec = j.storage->error();
744 if (ec)
746 j.str = ec.message();
747 j.error = ec;
748 j.error_file = j.storage->error_file();
749 j.storage->clear_error();
750 #ifndef NDEBUG
751 std::cout << "ERROR: '" << j.str << "' " << j.error_file << std::endl;
752 #endif
753 return true;
755 return false;
758 void disk_io_thread::operator()()
760 for (;;)
762 #ifdef TORRENT_DISK_STATS
763 m_log << log_time() << " idle" << std::endl;
764 #endif
765 mutex_t::scoped_lock jl(m_queue_mutex);
767 while (m_jobs.empty() && !m_abort)
768 m_signal.wait(jl);
769 if (m_abort && m_jobs.empty())
771 jl.unlock();
773 mutex_t::scoped_lock l(m_piece_mutex);
774 // flush all disk caches
775 for (cache_t::iterator i = m_pieces.begin()
776 , end(m_pieces.end()); i != end; ++i)
777 flush(i, l);
778 for (cache_t::iterator i = m_read_pieces.begin()
779 , end(m_read_pieces.end()); i != end; ++i)
780 free_piece(*i, l);
781 m_pieces.clear();
782 m_read_pieces.clear();
783 return;
786 // if there's a buffer in this job, it will be freed
787 // when this holder is destructed, unless it has been
788 // released.
789 disk_buffer_holder holder(*this
790 , m_jobs.front().action != disk_io_job::check_fastresume
791 ? m_jobs.front().buffer : 0);
793 boost::function<void(int, disk_io_job const&)> handler;
794 handler.swap(m_jobs.front().callback);
796 disk_io_job j = m_jobs.front();
797 m_jobs.pop_front();
798 m_queue_buffer_size -= j.buffer_size;
799 jl.unlock();
801 flush_expired_pieces();
803 int ret = 0;
805 TORRENT_ASSERT(j.storage || j.action == disk_io_job::abort_thread);
806 #ifdef TORRENT_DISK_STATS
807 ptime start = time_now();
808 #endif
809 #ifndef BOOST_NO_EXCEPTIONS
810 try {
811 #endif
813 switch (j.action)
815 case disk_io_job::abort_thread:
817 mutex_t::scoped_lock jl(m_queue_mutex);
818 m_abort = true;
820 for (std::list<disk_io_job>::iterator i = m_jobs.begin();
821 i != m_jobs.end();)
823 if (i->action == disk_io_job::read)
825 if (i->callback) m_ios.post(bind(i->callback, -1, *i));
826 m_jobs.erase(i++);
827 continue;
829 if (i->action == disk_io_job::check_files)
831 if (i->callback) m_ios.post(bind(i->callback
832 , piece_manager::disk_check_aborted, *i));
833 m_jobs.erase(i++);
834 continue;
836 ++i;
838 break;
840 case disk_io_job::read:
842 if (test_error(j))
844 ret = -1;
845 return;
847 #ifdef TORRENT_DISK_STATS
848 m_log << log_time() << " read " << j.buffer_size << std::endl;
849 #endif
850 INVARIANT_CHECK;
851 TORRENT_ASSERT(j.buffer == 0);
852 j.buffer = allocate_buffer();
853 TORRENT_ASSERT(j.buffer_size <= m_block_size);
854 if (j.buffer == 0)
856 ret = -1;
857 j.error = error_code(ENOMEM, get_posix_category());
858 j.str = j.error.message();
859 break;
862 disk_buffer_holder read_holder(*this, j.buffer);
863 ret = try_read_from_cache(j);
865 // -2 means there's no space in the read cache
866 // or that the read cache is disabled
867 if (ret == -1)
869 j.buffer = 0;
870 test_error(j);
871 break;
873 else if (ret == -2)
875 ret = j.storage->read_impl(j.buffer, j.piece, j.offset
876 , j.buffer_size);
877 if (ret < 0)
879 test_error(j);
880 break;
882 ++m_cache_stats.blocks_read;
884 read_holder.release();
885 break;
887 case disk_io_job::write:
889 if (test_error(j))
891 ret = -1;
892 break;
894 #ifdef TORRENT_DISK_STATS
895 m_log << log_time() << " write " << j.buffer_size << std::endl;
896 #endif
897 mutex_t::scoped_lock l(m_piece_mutex);
898 INVARIANT_CHECK;
899 cache_t::iterator p
900 = find_cached_piece(m_pieces, j, l);
901 int block = j.offset / m_block_size;
902 TORRENT_ASSERT(j.buffer);
903 TORRENT_ASSERT(j.buffer_size <= m_block_size);
904 if (p != m_pieces.end())
906 TORRENT_ASSERT(p->blocks[block] == 0);
907 if (p->blocks[block])
909 free_buffer(p->blocks[block]);
910 --p->num_blocks;
912 p->blocks[block] = j.buffer;
913 ++m_cache_stats.cache_size;
914 ++p->num_blocks;
915 p->last_use = time_now();
917 else
919 cache_block(j, l);
921 // we've now inserted the buffer
922 // in the cache, we should not
923 // free it at the end
924 holder.release();
925 if (m_cache_stats.cache_size >= m_cache_size)
926 flush_oldest_piece(l);
927 break;
929 case disk_io_job::hash:
931 #ifdef TORRENT_DISK_STATS
932 m_log << log_time() << " hash" << std::endl;
933 #endif
934 mutex_t::scoped_lock l(m_piece_mutex);
935 INVARIANT_CHECK;
937 cache_t::iterator i
938 = find_cached_piece(m_pieces, j, l);
939 if (i != m_pieces.end())
941 flush_and_remove(i, l);
942 if (test_error(j))
944 ret = -1;
945 j.storage->mark_failed(j.piece);
946 break;
949 l.unlock();
950 sha1_hash h = j.storage->hash_for_piece_impl(j.piece);
951 if (test_error(j))
953 ret = -1;
954 j.storage->mark_failed(j.piece);
955 break;
957 ret = (j.storage->info()->hash_for_piece(j.piece) == h)?0:-2;
958 if (ret == -2) j.storage->mark_failed(j.piece);
959 break;
961 case disk_io_job::move_storage:
963 #ifdef TORRENT_DISK_STATS
964 m_log << log_time() << " move" << std::endl;
965 #endif
966 TORRENT_ASSERT(j.buffer == 0);
967 ret = j.storage->move_storage_impl(j.str) ? 1 : 0;
968 if (ret != 0)
970 test_error(j);
971 break;
973 j.str = j.storage->save_path().string();
974 break;
976 case disk_io_job::release_files:
978 #ifdef TORRENT_DISK_STATS
979 m_log << log_time() << " release" << std::endl;
980 #endif
981 TORRENT_ASSERT(j.buffer == 0);
983 mutex_t::scoped_lock l(m_piece_mutex);
984 INVARIANT_CHECK;
986 for (cache_t::iterator i = m_pieces.begin(); i != m_pieces.end();)
988 if (i->storage == j.storage)
990 flush(i, l);
991 i = m_pieces.erase(i);
993 else
995 ++i;
998 l.unlock();
999 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
1001 mutex_t::scoped_lock l(m_pool_mutex);
1002 m_pool.release_memory();
1004 #endif
1005 ret = j.storage->release_files_impl();
1006 if (ret != 0) test_error(j);
1007 break;
1009 case disk_io_job::clear_read_cache:
1011 #ifdef TORRENT_DISK_STATS
1012 m_log << log_time() << " clear-cache" << std::endl;
1013 #endif
1014 TORRENT_ASSERT(j.buffer == 0);
1016 mutex_t::scoped_lock l(m_piece_mutex);
1017 INVARIANT_CHECK;
1019 for (cache_t::iterator i = m_read_pieces.begin();
1020 i != m_read_pieces.end();)
1022 if (i->storage == j.storage)
1024 free_piece(*i, l);
1025 i = m_read_pieces.erase(i);
1027 else
1029 ++i;
1032 l.unlock();
1033 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
1035 mutex_t::scoped_lock l(m_pool_mutex);
1036 m_pool.release_memory();
1038 #endif
1039 ret = 0;
1040 break;
1042 case disk_io_job::delete_files:
1044 #ifdef TORRENT_DISK_STATS
1045 m_log << log_time() << " delete" << std::endl;
1046 #endif
1047 TORRENT_ASSERT(j.buffer == 0);
1049 mutex_t::scoped_lock l(m_piece_mutex);
1050 INVARIANT_CHECK;
1052 cache_t::iterator i = std::remove_if(
1053 m_pieces.begin(), m_pieces.end(), bind(&cached_piece_entry::storage, _1) == j.storage);
1055 for (cache_t::iterator k = i; k != m_pieces.end(); ++k)
1057 torrent_info const& ti = *k->storage->info();
1058 int blocks_in_piece = (ti.piece_size(k->piece) + m_block_size - 1) / m_block_size;
1059 for (int j = 0; j < blocks_in_piece; ++j)
1061 if (k->blocks[j] == 0) continue;
1062 free_buffer(k->blocks[j]);
1063 k->blocks[j] = 0;
1066 m_pieces.erase(i, m_pieces.end());
1067 l.unlock();
1068 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
1070 mutex_t::scoped_lock l(m_pool_mutex);
1071 m_pool.release_memory();
1073 #endif
1074 ret = j.storage->delete_files_impl();
1075 if (ret != 0) test_error(j);
1076 break;
1078 case disk_io_job::check_fastresume:
1080 #ifdef TORRENT_DISK_STATS
1081 m_log << log_time() << " check fastresume" << std::endl;
1082 #endif
1083 lazy_entry const* rd = (lazy_entry const*)j.buffer;
1084 TORRENT_ASSERT(rd != 0);
1085 ret = j.storage->check_fastresume(*rd, j.str);
1086 break;
1088 case disk_io_job::check_files:
1090 #ifdef TORRENT_DISK_STATS
1091 m_log << log_time() << " check files" << std::endl;
1092 #endif
1093 int piece_size = j.storage->info()->piece_length();
1094 for (int processed = 0; processed < 4 * 1024 * 1024; processed += piece_size)
1096 ret = j.storage->check_files(j.piece, j.offset, j.str);
1098 #ifndef BOOST_NO_EXCEPTIONS
1099 try {
1100 #endif
1101 TORRENT_ASSERT(handler);
1102 if (handler && ret == piece_manager::need_full_check)
1103 m_ios.post(bind(handler, ret, j));
1104 #ifndef BOOST_NO_EXCEPTIONS
1105 } catch (std::exception&) {}
1106 #endif
1107 if (ret != piece_manager::need_full_check) break;
1109 if (test_error(j))
1111 ret = piece_manager::fatal_disk_error;
1112 break;
1114 TORRENT_ASSERT(ret != -2 || !j.str.empty());
1116 // if the check is not done, add it at the end of the job queue
1117 if (ret == piece_manager::need_full_check)
1119 add_job(j, handler);
1120 continue;
1122 break;
1124 case disk_io_job::save_resume_data:
1126 #ifdef TORRENT_DISK_STATS
1127 m_log << log_time() << " save resume data" << std::endl;
1128 #endif
1129 j.resume_data.reset(new entry(entry::dictionary_t));
1130 j.storage->write_resume_data(*j.resume_data);
1131 ret = 0;
1132 break;
1134 case disk_io_job::rename_file:
1136 #ifdef TORRENT_DISK_STATS
1137 m_log << log_time() << " rename file" << std::endl;
1138 #endif
1139 ret = j.storage->rename_file_impl(j.piece, j.str);
1142 #ifndef BOOST_NO_EXCEPTIONS
1143 } catch (std::exception& e)
1145 ret = -1;
1148 j.str = e.what();
1150 catch (std::exception&) {}
1152 #endif
1154 // if (!handler) std::cerr << "DISK THREAD: no callback specified" << std::endl;
1155 // else std::cerr << "DISK THREAD: invoking callback" << std::endl;
1156 #ifndef BOOST_NO_EXCEPTIONS
1157 try {
1158 #endif
1159 TORRENT_ASSERT(ret != -2 || !j.str.empty()
1160 || j.action == disk_io_job::hash);
1161 if (handler) m_ios.post(bind(handler, ret, j));
1162 #ifndef BOOST_NO_EXCEPTIONS
1163 } catch (std::exception&)
1165 TORRENT_ASSERT(false);
1167 #endif
1169 TORRENT_ASSERT(false);