3 Copyright (c) 2007, Arvid Norberg
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #include "libtorrent/storage.hpp"
35 #include "libtorrent/disk_io_thread.hpp"
36 #include "libtorrent/disk_buffer_holder.hpp"
37 #include <boost/scoped_array.hpp>
42 #define alloca(s) _alloca(s)
46 #ifdef TORRENT_DISK_STATS
47 #include "libtorrent/time.hpp"
53 disk_io_thread::disk_io_thread(asio::io_service
& ios
, int block_size
)
55 , m_queue_buffer_size(0)
56 , m_cache_size(512) // 512 * 16kB = 8MB
57 , m_cache_expiry(60) // 1 minute
58 , m_coalesce_writes(true)
59 , m_coalesce_reads(true)
60 , m_use_read_cache(true)
61 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
64 , m_block_size(block_size
)
66 , m_disk_io_thread(boost::ref(*this))
71 #ifdef TORRENT_DISK_STATS
72 m_log
.open("disk_io_thread.log", std::ios::trunc
);
76 disk_io_thread::~disk_io_thread()
78 TORRENT_ASSERT(m_abort
== true);
81 void disk_io_thread::join()
83 mutex_t::scoped_lock
l(m_queue_mutex
);
85 j
.action
= disk_io_job::abort_thread
;
86 m_jobs
.insert(m_jobs
.begin(), j
);
87 m_signal
.notify_all();
90 m_disk_io_thread
.join();
93 void disk_io_thread::get_cache_info(sha1_hash
const& ih
, std::vector
<cached_piece_info
>& ret
) const
95 mutex_t::scoped_lock
l(m_piece_mutex
);
97 ret
.reserve(m_pieces
.size());
98 for (cache_t::const_iterator i
= m_pieces
.begin()
99 , end(m_pieces
.end()); i
!= end
; ++i
)
101 torrent_info
const& ti
= *i
->storage
->info();
102 if (ti
.info_hash() != ih
) continue;
103 cached_piece_info info
;
104 info
.piece
= i
->piece
;
105 info
.last_use
= i
->last_use
;
106 info
.kind
= cached_piece_info::write_cache
;
107 int blocks_in_piece
= (ti
.piece_size(i
->piece
) + (m_block_size
) - 1) / m_block_size
;
108 info
.blocks
.resize(blocks_in_piece
);
109 for (int b
= 0; b
< blocks_in_piece
; ++b
)
110 if (i
->blocks
[b
]) info
.blocks
[b
] = true;
113 for (cache_t::const_iterator i
= m_read_pieces
.begin()
114 , end(m_read_pieces
.end()); i
!= end
; ++i
)
116 torrent_info
const& ti
= *i
->storage
->info();
117 if (ti
.info_hash() != ih
) continue;
118 cached_piece_info info
;
119 info
.piece
= i
->piece
;
120 info
.last_use
= i
->last_use
;
121 info
.kind
= cached_piece_info::read_cache
;
122 int blocks_in_piece
= (ti
.piece_size(i
->piece
) + (m_block_size
) - 1) / m_block_size
;
123 info
.blocks
.resize(blocks_in_piece
);
124 for (int b
= 0; b
< blocks_in_piece
; ++b
)
125 if (i
->blocks
[b
]) info
.blocks
[b
] = true;
130 cache_status
disk_io_thread::status() const
132 mutex_t::scoped_lock
l(m_piece_mutex
);
133 return m_cache_stats
;
136 void disk_io_thread::set_cache_size(int s
)
138 mutex_t::scoped_lock
l(m_piece_mutex
);
139 TORRENT_ASSERT(s
>= 0);
143 void disk_io_thread::set_cache_expiry(int ex
)
145 mutex_t::scoped_lock
l(m_piece_mutex
);
146 TORRENT_ASSERT(ex
> 0);
150 // aborts read operations
151 void disk_io_thread::stop(boost::intrusive_ptr
<piece_manager
> s
)
153 mutex_t::scoped_lock
l(m_queue_mutex
);
154 // read jobs are aborted, write and move jobs are syncronized
155 for (std::list
<disk_io_job
>::iterator i
= m_jobs
.begin();
163 if (i
->action
== disk_io_job::read
)
165 if (i
->callback
) m_ios
.post(bind(i
->callback
, -1, *i
));
169 if (i
->action
== disk_io_job::check_files
)
171 if (i
->callback
) m_ios
.post(bind(i
->callback
172 , piece_manager::disk_check_aborted
, *i
));
178 m_signal
.notify_all();
181 bool range_overlap(int start1
, int length1
, int start2
, int length2
)
183 return (start1
<= start2
&& start1
+ length1
> start2
)
184 || (start2
<= start1
&& start2
+ length2
> start1
);
189 // The semantic of this operator is:
190 // should lhs come before rhs in the job queue
191 bool operator<(disk_io_job
const& lhs
, disk_io_job
const& rhs
)
193 // NOTE: comparison inverted to make higher priority
194 // skip _in_front_of_ lower priority
195 if (lhs
.priority
> rhs
.priority
) return true;
196 if (lhs
.priority
< rhs
.priority
) return false;
198 if (lhs
.storage
.get() < rhs
.storage
.get()) return true;
199 if (lhs
.storage
.get() > rhs
.storage
.get()) return false;
200 if (lhs
.piece
< rhs
.piece
) return true;
201 if (lhs
.piece
> rhs
.piece
) return false;
202 if (lhs
.offset
< rhs
.offset
) return true;
203 // if (lhs.offset > rhs.offset) return false;
208 disk_io_thread::cache_t::iterator
disk_io_thread::find_cached_piece(
209 disk_io_thread::cache_t
& cache
210 , disk_io_job
const& j
, mutex_t::scoped_lock
& l
)
212 for (cache_t::iterator i
= cache
.begin()
213 , end(cache
.end()); i
!= end
; ++i
)
215 if (i
->storage
!= j
.storage
|| i
->piece
!= j
.piece
) continue;
221 void disk_io_thread::flush_expired_pieces()
223 ptime now
= time_now();
225 mutex_t::scoped_lock
l(m_piece_mutex
);
230 cache_t::iterator i
= std::min_element(
231 m_pieces
.begin(), m_pieces
.end()
232 , bind(&cached_piece_entry::last_use
, _1
)
233 < bind(&cached_piece_entry::last_use
, _2
));
234 if (i
== m_pieces
.end()) return;
235 int age
= total_seconds(now
- i
->last_use
);
236 if (age
< m_cache_expiry
) return;
237 flush_and_remove(i
, l
);
241 void disk_io_thread::free_piece(cached_piece_entry
& p
, mutex_t::scoped_lock
& l
)
243 int piece_size
= p
.storage
->info()->piece_size(p
.piece
);
244 int blocks_in_piece
= (piece_size
+ m_block_size
- 1) / m_block_size
;
246 for (int i
= 0; i
< blocks_in_piece
; ++i
)
248 if (p
.blocks
[i
] == 0) continue;
249 free_buffer(p
.blocks
[i
]);
252 --m_cache_stats
.cache_size
;
253 --m_cache_stats
.read_cache_size
;
257 bool disk_io_thread::clear_oldest_read_piece(
258 cache_t::iterator ignore
259 , mutex_t::scoped_lock
& l
)
263 cache_t::iterator i
= std::min_element(
264 m_read_pieces
.begin(), m_read_pieces
.end()
265 , bind(&cached_piece_entry::last_use
, _1
)
266 < bind(&cached_piece_entry::last_use
, _2
));
267 if (i
!= m_read_pieces
.end() && i
!= ignore
)
269 // don't replace an entry that is less than one second old
270 if (time_now() - i
->last_use
< seconds(1)) return false;
272 m_read_pieces
.erase(i
);
278 void disk_io_thread::flush_oldest_piece(mutex_t::scoped_lock
& l
)
281 // first look if there are any read cache entries that can
283 if (clear_oldest_read_piece(m_read_pieces
.end(), l
)) return;
285 cache_t::iterator i
= std::min_element(
286 m_pieces
.begin(), m_pieces
.end()
287 , bind(&cached_piece_entry::last_use
, _1
)
288 < bind(&cached_piece_entry::last_use
, _2
));
289 if (i
== m_pieces
.end()) return;
290 flush_and_remove(i
, l
);
293 void disk_io_thread::flush_and_remove(disk_io_thread::cache_t::iterator e
294 , mutex_t::scoped_lock
& l
)
300 void disk_io_thread::flush(disk_io_thread::cache_t::iterator e
301 , mutex_t::scoped_lock
& l
)
304 cached_piece_entry
& p
= *e
;
305 int piece_size
= p
.storage
->info()->piece_size(p
.piece
);
306 #ifdef TORRENT_DISK_STATS
307 m_log
<< log_time() << " flushing " << piece_size
<< std::endl
;
309 TORRENT_ASSERT(piece_size
> 0);
310 boost::scoped_array
<char> buf
;
311 if (m_coalesce_writes
) buf
.reset(new (std::nothrow
) char[piece_size
]);
313 int blocks_in_piece
= (piece_size
+ m_block_size
- 1) / m_block_size
;
316 for (int i
= 0; i
<= blocks_in_piece
; ++i
)
318 if (i
== blocks_in_piece
|| p
.blocks
[i
] == 0)
320 if (buffer_size
== 0) continue;
323 TORRENT_ASSERT(buffer_size
<= i
* m_block_size
);
325 p
.storage
->write_impl(buf
.get(), p
.piece
, (std::min
)(
326 i
* m_block_size
, piece_size
) - buffer_size
, buffer_size
);
328 ++m_cache_stats
.writes
;
329 // std::cerr << " flushing p: " << p.piece << " bytes: " << buffer_size << std::endl;
334 int block_size
= (std::min
)(piece_size
- i
* m_block_size
, m_block_size
);
335 TORRENT_ASSERT(offset
+ block_size
<= piece_size
);
336 TORRENT_ASSERT(offset
+ block_size
> 0);
340 p
.storage
->write_impl(p
.blocks
[i
], p
.piece
, i
* m_block_size
, block_size
);
342 ++m_cache_stats
.writes
;
346 std::memcpy(buf
.get() + offset
, p
.blocks
[i
], block_size
);
347 offset
+= m_block_size
;
348 buffer_size
+= block_size
;
350 free_buffer(p
.blocks
[i
]);
352 TORRENT_ASSERT(p
.num_blocks
> 0);
354 ++m_cache_stats
.blocks_written
;
355 --m_cache_stats
.cache_size
;
357 TORRENT_ASSERT(buffer_size
== 0);
358 // std::cerr << " flushing p: " << p.piece << " cached_blocks: " << m_cache_stats.cache_size << std::endl;
360 for (int i
= 0; i
< blocks_in_piece
; ++i
)
361 TORRENT_ASSERT(p
.blocks
[i
] == 0);
365 void disk_io_thread::cache_block(disk_io_job
& j
, mutex_t::scoped_lock
& l
)
368 TORRENT_ASSERT(find_cached_piece(m_pieces
, j
, l
) == m_pieces
.end());
369 cached_piece_entry p
;
371 int piece_size
= j
.storage
->info()->piece_size(j
.piece
);
372 int blocks_in_piece
= (piece_size
+ m_block_size
- 1) / m_block_size
;
375 p
.storage
= j
.storage
;
376 p
.last_use
= time_now();
378 p
.blocks
.reset(new char*[blocks_in_piece
]);
379 std::memset(&p
.blocks
[0], 0, blocks_in_piece
* sizeof(char*));
380 int block
= j
.offset
/ m_block_size
;
381 // std::cerr << " adding cache entry for p: " << j.piece << " block: " << block << " cached_blocks: " << m_cache_stats.cache_size << std::endl;
382 p
.blocks
[block
] = j
.buffer
;
383 ++m_cache_stats
.cache_size
;
384 m_pieces
.push_back(p
);
387 // fills a piece with data from disk, returns the total number of bytes
388 // read or -1 if there was an error
389 int disk_io_thread::read_into_piece(cached_piece_entry
& p
, int start_block
, mutex_t::scoped_lock
& l
)
391 int piece_size
= p
.storage
->info()->piece_size(p
.piece
);
392 int blocks_in_piece
= (piece_size
+ m_block_size
- 1) / m_block_size
;
394 int end_block
= start_block
;
395 for (int i
= start_block
; i
< blocks_in_piece
396 && m_cache_stats
.cache_size
< m_cache_size
; ++i
)
398 // this is a block that is already allocated
399 // stop allocating and don't read more than
400 // what we've allocated now
401 if (p
.blocks
[i
]) break;
402 p
.blocks
[i
] = allocate_buffer();
404 // the allocation failed, break
405 if (p
.blocks
[i
] == 0) break;
407 ++m_cache_stats
.cache_size
;
408 ++m_cache_stats
.read_cache_size
;
412 if (end_block
== start_block
) return -2;
414 int buffer_size
= piece_size
- (end_block
- 1) * m_block_size
+ (end_block
- start_block
- 1) * m_block_size
;
415 TORRENT_ASSERT(buffer_size
<= piece_size
);
416 TORRENT_ASSERT(buffer_size
+ start_block
* m_block_size
<= piece_size
);
417 boost::scoped_array
<char> buf
;
418 if (m_coalesce_reads
) buf
.reset(new (std::nothrow
) char[buffer_size
]);
423 ret
+= p
.storage
->read_impl(buf
.get(), p
.piece
, start_block
* m_block_size
, buffer_size
);
425 if (p
.storage
->error()) { return -1; }
426 ++m_cache_stats
.reads
;
429 int piece_offset
= start_block
* m_block_size
;
431 for (int i
= start_block
; i
< end_block
; ++i
)
433 int block_size
= (std::min
)(piece_size
- piece_offset
, m_block_size
);
434 if (p
.blocks
[i
] == 0) break;
435 TORRENT_ASSERT(offset
<= buffer_size
);
436 TORRENT_ASSERT(piece_offset
<= piece_size
);
439 std::memcpy(p
.blocks
[i
], buf
.get() + offset
, block_size
);
444 ret
+= p
.storage
->read_impl(p
.blocks
[i
], p
.piece
, piece_offset
, block_size
);
445 if (!p
.storage
->error()) { return -1; }
447 ++m_cache_stats
.reads
;
449 offset
+= m_block_size
;
450 piece_offset
+= m_block_size
;
452 TORRENT_ASSERT(ret
<= buffer_size
);
453 return (ret
!= buffer_size
) ? -1 : ret
;
456 bool disk_io_thread::make_room(int num_blocks
457 , cache_t::iterator ignore
458 , mutex_t::scoped_lock
& l
)
460 if (m_cache_size
- m_cache_stats
.cache_size
< num_blocks
)
462 // there's not enough room in the cache, clear a piece
463 // from the read cache
464 if (!clear_oldest_read_piece(ignore
, l
)) return false;
467 return m_cache_size
- m_cache_stats
.cache_size
>= num_blocks
;
470 // returns -1 on read error, -2 if there isn't any space in the cache
471 // or the number of bytes read
472 int disk_io_thread::cache_read_block(disk_io_job
const& j
, mutex_t::scoped_lock
& l
)
476 int piece_size
= j
.storage
->info()->piece_size(j
.piece
);
477 int blocks_in_piece
= (piece_size
+ m_block_size
- 1) / m_block_size
;
479 int start_block
= j
.offset
/ m_block_size
;
481 if (!make_room(blocks_in_piece
- start_block
482 , m_read_pieces
.end(), l
)) return -2;
484 cached_piece_entry p
;
486 p
.storage
= j
.storage
;
487 p
.last_use
= time_now();
489 p
.blocks
.reset(new char*[blocks_in_piece
]);
490 std::memset(&p
.blocks
[0], 0, blocks_in_piece
* sizeof(char*));
491 int ret
= read_into_piece(p
, start_block
, l
);
496 m_read_pieces
.push_back(p
);
502 void disk_io_thread::check_invariant() const
504 int cached_write_blocks
= 0;
505 for (cache_t::const_iterator i
= m_pieces
.begin()
506 , end(m_pieces
.end()); i
!= end
; ++i
)
508 cached_piece_entry
const& p
= *i
;
509 TORRENT_ASSERT(p
.blocks
);
511 if (!p
.storage
) continue;
512 int piece_size
= p
.storage
->info()->piece_size(p
.piece
);
513 int blocks_in_piece
= (piece_size
+ m_block_size
- 1) / m_block_size
;
515 for (int k
= 0; k
< blocks_in_piece
; ++k
)
519 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
520 TORRENT_ASSERT(is_disk_buffer(p
.blocks
[k
]));
525 // TORRENT_ASSERT(blocks == p.num_blocks);
526 cached_write_blocks
+= blocks
;
529 int cached_read_blocks
= 0;
530 for (cache_t::const_iterator i
= m_read_pieces
.begin()
531 , end(m_read_pieces
.end()); i
!= end
; ++i
)
533 cached_piece_entry
const& p
= *i
;
534 TORRENT_ASSERT(p
.blocks
);
536 int piece_size
= p
.storage
->info()->piece_size(p
.piece
);
537 int blocks_in_piece
= (piece_size
+ m_block_size
- 1) / m_block_size
;
539 for (int k
= 0; k
< blocks_in_piece
; ++k
)
543 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
544 TORRENT_ASSERT(is_disk_buffer(p
.blocks
[k
]));
549 // TORRENT_ASSERT(blocks == p.num_blocks);
550 cached_read_blocks
+= blocks
;
553 TORRENT_ASSERT(cached_read_blocks
+ cached_write_blocks
== m_cache_stats
.cache_size
);
554 TORRENT_ASSERT(cached_read_blocks
== m_cache_stats
.read_cache_size
);
556 // when writing, there may be a one block difference, right before an old piece
558 TORRENT_ASSERT(m_cache_stats
.cache_size
<= m_cache_size
+ 1);
562 int disk_io_thread::try_read_from_cache(disk_io_job
const& j
)
564 TORRENT_ASSERT(j
.buffer
);
566 mutex_t::scoped_lock
l(m_piece_mutex
);
567 if (!m_use_read_cache
) return -2;
570 = find_cached_piece(m_read_pieces
, j
, l
);
575 // if the piece cannot be found in the cache,
576 // read the whole piece starting at the block
577 // we got a request for.
578 if (p
== m_read_pieces
.end())
580 ret
= cache_read_block(j
, l
);
582 if (ret
< 0) return ret
;
583 p
= m_read_pieces
.end();
585 TORRENT_ASSERT(!m_read_pieces
.empty());
586 TORRENT_ASSERT(p
->piece
== j
.piece
);
587 TORRENT_ASSERT(p
->storage
== j
.storage
);
590 if (p
!= m_read_pieces
.end())
592 // copy from the cache and update the last use timestamp
593 int block
= j
.offset
/ m_block_size
;
594 int block_offset
= j
.offset
% m_block_size
;
595 int buffer_offset
= 0;
596 int size
= j
.buffer_size
;
597 if (p
->blocks
[block
] == 0)
599 int piece_size
= j
.storage
->info()->piece_size(j
.piece
);
600 int blocks_in_piece
= (piece_size
+ m_block_size
- 1) / m_block_size
;
601 int end_block
= block
;
602 while (end_block
< blocks_in_piece
&& p
->blocks
[end_block
] == 0) ++end_block
;
603 if (!make_room(end_block
- block
, p
, l
)) return -2;
604 ret
= read_into_piece(*p
, block
, l
);
606 if (ret
< 0) return ret
;
607 TORRENT_ASSERT(p
->blocks
[block
]);
610 p
->last_use
= time_now();
613 TORRENT_ASSERT(p
->blocks
[block
]);
614 int to_copy
= (std::min
)(m_block_size
615 - block_offset
, size
);
616 std::memcpy(j
.buffer
+ buffer_offset
617 , p
->blocks
[block
] + block_offset
621 buffer_offset
+= to_copy
;
625 ++m_cache_stats
.blocks_read
;
626 if (hit
) ++m_cache_stats
.blocks_read_hit
;
631 void disk_io_thread::add_job(disk_io_job
const& j
632 , boost::function
<void(int, disk_io_job
const&)> const& f
)
634 TORRENT_ASSERT(!j
.callback
);
635 TORRENT_ASSERT(j
.storage
);
636 TORRENT_ASSERT(j
.buffer_size
<= m_block_size
);
637 mutex_t::scoped_lock
l(m_queue_mutex
);
639 std::list
<disk_io_job
>::reverse_iterator i
= m_jobs
.rbegin();
640 if (j
.action
== disk_io_job::read
)
642 // when we're reading, we may not skip
643 // ahead of any write operation that overlaps
644 // the region we're reading
645 for (; i
!= m_jobs
.rend(); i
++)
647 // if *i should come before j, stop
648 // and insert j before i
650 // if we come across a write operation that
651 // overlaps the region we're reading, we need
653 if (i
->action
== disk_io_job::write
654 && i
->storage
== j
.storage
655 && i
->piece
== j
.piece
656 && range_overlap(i
->offset
, i
->buffer_size
657 , j
.offset
, j
.buffer_size
))
661 else if (j
.action
== disk_io_job::write
)
663 for (; i
!= m_jobs
.rend(); ++i
)
667 if (i
!= m_jobs
.rbegin()
668 && i
.base()->storage
.get() != j
.storage
.get())
675 // if we are placed in front of all other jobs, put it on the back of
676 // the queue, to sweep the disk in the same direction, and to avoid
677 // starvation. The exception is if the priority is higher than the
678 // job at the front of the queue
679 if (i
== m_jobs
.rend() && (m_jobs
.empty() || j
.priority
<= m_jobs
.back().priority
))
682 std::list
<disk_io_job
>::iterator k
= m_jobs
.insert(i
.base(), j
);
683 k
->callback
.swap(const_cast<boost::function
<void(int, disk_io_job
const&)>&>(f
));
684 if (j
.action
== disk_io_job::write
)
685 m_queue_buffer_size
+= j
.buffer_size
;
686 TORRENT_ASSERT(j
.storage
.get());
687 m_signal
.notify_all();
691 bool disk_io_thread::is_disk_buffer(char* buffer
) const
693 #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
696 mutex_t::scoped_lock
l(m_pool_mutex
);
697 return m_pool
.is_from(buffer
);
702 char* disk_io_thread::allocate_buffer()
704 mutex_t::scoped_lock
l(m_pool_mutex
);
708 #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
709 return (char*)malloc(m_block_size
);
711 return (char*)m_pool
.ordered_malloc();
715 void disk_io_thread::free_buffer(char* buf
)
717 mutex_t::scoped_lock
l(m_pool_mutex
);
721 #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
724 m_pool
.ordered_free(buf
);
728 bool disk_io_thread::test_error(disk_io_job
& j
)
730 error_code
const& ec
= j
.storage
->error();
733 j
.str
= ec
.message();
735 j
.error_file
= j
.storage
->error_file();
736 j
.storage
->clear_error();
738 std::cout
<< "ERROR: '" << j
.str
<< "' " << j
.error_file
<< std::endl
;
745 void disk_io_thread::operator()()
749 #ifdef TORRENT_DISK_STATS
750 m_log
<< log_time() << " idle" << std::endl
;
752 mutex_t::scoped_lock
jl(m_queue_mutex
);
754 while (m_jobs
.empty() && !m_abort
)
756 if (m_abort
&& m_jobs
.empty())
760 mutex_t::scoped_lock
l(m_piece_mutex
);
761 // flush all disk caches
762 for (cache_t::iterator i
= m_pieces
.begin()
763 , end(m_pieces
.end()); i
!= end
; ++i
)
765 for (cache_t::iterator i
= m_read_pieces
.begin()
766 , end(m_read_pieces
.end()); i
!= end
; ++i
)
769 m_read_pieces
.clear();
773 // if there's a buffer in this job, it will be freed
774 // when this holder is destructed, unless it has been
776 disk_buffer_holder
holder(*this
777 , m_jobs
.front().action
!= disk_io_job::check_fastresume
778 ? m_jobs
.front().buffer
: 0);
780 boost::function
<void(int, disk_io_job
const&)> handler
;
781 handler
.swap(m_jobs
.front().callback
);
783 disk_io_job j
= m_jobs
.front();
785 m_queue_buffer_size
-= j
.buffer_size
;
788 flush_expired_pieces();
792 TORRENT_ASSERT(j
.storage
|| j
.action
== disk_io_job::abort_thread
);
793 #ifdef TORRENT_DISK_STATS
794 ptime start
= time_now();
796 #ifndef BOOST_NO_EXCEPTIONS
802 case disk_io_job::abort_thread
:
804 mutex_t::scoped_lock
jl(m_queue_mutex
);
807 for (std::list
<disk_io_job
>::iterator i
= m_jobs
.begin();
810 if (i
->action
== disk_io_job::read
)
812 if (i
->callback
) m_ios
.post(bind(i
->callback
, -1, *i
));
816 if (i
->action
== disk_io_job::check_files
)
818 if (i
->callback
) m_ios
.post(bind(i
->callback
819 , piece_manager::disk_check_aborted
, *i
));
827 case disk_io_job::read
:
834 #ifdef TORRENT_DISK_STATS
835 m_log
<< log_time() << " read " << j
.buffer_size
<< std::endl
;
838 TORRENT_ASSERT(j
.buffer
== 0);
839 j
.buffer
= allocate_buffer();
840 TORRENT_ASSERT(j
.buffer_size
<= m_block_size
);
844 j
.error
= error_code(ENOMEM
, get_posix_category());
845 j
.str
= j
.error
.message();
849 disk_buffer_holder
read_holder(*this, j
.buffer
);
850 ret
= try_read_from_cache(j
);
852 // -2 means there's no space in the read cache
853 // or that the read cache is disabled
862 ret
= j
.storage
->read_impl(j
.buffer
, j
.piece
, j
.offset
869 ++m_cache_stats
.blocks_read
;
871 read_holder
.release();
874 case disk_io_job::write
:
881 #ifdef TORRENT_DISK_STATS
882 m_log
<< log_time() << " write " << j
.buffer_size
<< std::endl
;
884 mutex_t::scoped_lock
l(m_piece_mutex
);
887 = find_cached_piece(m_pieces
, j
, l
);
888 int block
= j
.offset
/ m_block_size
;
889 TORRENT_ASSERT(j
.buffer
);
890 TORRENT_ASSERT(j
.buffer_size
<= m_block_size
);
891 if (p
!= m_pieces
.end())
893 TORRENT_ASSERT(p
->blocks
[block
] == 0);
894 if (p
->blocks
[block
])
896 free_buffer(p
->blocks
[block
]);
899 p
->blocks
[block
] = j
.buffer
;
900 ++m_cache_stats
.cache_size
;
902 p
->last_use
= time_now();
908 // we've now inserted the buffer
909 // in the cache, we should not
910 // free it at the end
912 if (m_cache_stats
.cache_size
>= m_cache_size
)
913 flush_oldest_piece(l
);
916 case disk_io_job::hash
:
918 #ifdef TORRENT_DISK_STATS
919 m_log
<< log_time() << " hash" << std::endl
;
921 mutex_t::scoped_lock
l(m_piece_mutex
);
925 = find_cached_piece(m_pieces
, j
, l
);
926 if (i
!= m_pieces
.end())
928 flush_and_remove(i
, l
);
932 j
.storage
->mark_failed(j
.piece
);
937 sha1_hash h
= j
.storage
->hash_for_piece_impl(j
.piece
);
941 j
.storage
->mark_failed(j
.piece
);
944 ret
= (j
.storage
->info()->hash_for_piece(j
.piece
) == h
)?0:-2;
945 if (ret
== -2) j
.storage
->mark_failed(j
.piece
);
948 case disk_io_job::move_storage
:
950 #ifdef TORRENT_DISK_STATS
951 m_log
<< log_time() << " move" << std::endl
;
953 TORRENT_ASSERT(j
.buffer
== 0);
954 ret
= j
.storage
->move_storage_impl(j
.str
) ? 1 : 0;
960 j
.str
= j
.storage
->save_path().string();
963 case disk_io_job::release_files
:
965 #ifdef TORRENT_DISK_STATS
966 m_log
<< log_time() << " release" << std::endl
;
968 TORRENT_ASSERT(j
.buffer
== 0);
970 mutex_t::scoped_lock
l(m_piece_mutex
);
973 for (cache_t::iterator i
= m_pieces
.begin(); i
!= m_pieces
.end();)
975 if (i
->storage
== j
.storage
)
978 i
= m_pieces
.erase(i
);
986 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
988 mutex_t::scoped_lock
l(m_pool_mutex
);
989 m_pool
.release_memory();
992 ret
= j
.storage
->release_files_impl();
993 if (ret
!= 0) test_error(j
);
996 case disk_io_job::clear_read_cache
:
998 #ifdef TORRENT_DISK_STATS
999 m_log
<< log_time() << " clear-cache" << std::endl
;
1001 TORRENT_ASSERT(j
.buffer
== 0);
1003 mutex_t::scoped_lock
l(m_piece_mutex
);
1006 for (cache_t::iterator i
= m_read_pieces
.begin();
1007 i
!= m_read_pieces
.end();)
1009 if (i
->storage
== j
.storage
)
1012 i
= m_read_pieces
.erase(i
);
1020 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
1022 mutex_t::scoped_lock
l(m_pool_mutex
);
1023 m_pool
.release_memory();
1029 case disk_io_job::delete_files
:
1031 #ifdef TORRENT_DISK_STATS
1032 m_log
<< log_time() << " delete" << std::endl
;
1034 TORRENT_ASSERT(j
.buffer
== 0);
1036 mutex_t::scoped_lock
l(m_piece_mutex
);
1039 cache_t::iterator i
= std::remove_if(
1040 m_pieces
.begin(), m_pieces
.end(), bind(&cached_piece_entry::storage
, _1
) == j
.storage
);
1042 for (cache_t::iterator k
= i
; k
!= m_pieces
.end(); ++k
)
1044 torrent_info
const& ti
= *k
->storage
->info();
1045 int blocks_in_piece
= (ti
.piece_size(k
->piece
) + m_block_size
- 1) / m_block_size
;
1046 for (int j
= 0; j
< blocks_in_piece
; ++j
)
1048 if (k
->blocks
[j
] == 0) continue;
1049 free_buffer(k
->blocks
[j
]);
1051 --m_cache_stats
.cache_size
;
1054 m_pieces
.erase(i
, m_pieces
.end());
1056 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
1058 mutex_t::scoped_lock
l(m_pool_mutex
);
1059 m_pool
.release_memory();
1062 ret
= j
.storage
->delete_files_impl();
1063 if (ret
!= 0) test_error(j
);
1066 case disk_io_job::check_fastresume
:
1068 #ifdef TORRENT_DISK_STATS
1069 m_log
<< log_time() << " check fastresume" << std::endl
;
1071 lazy_entry
const* rd
= (lazy_entry
const*)j
.buffer
;
1072 TORRENT_ASSERT(rd
!= 0);
1073 ret
= j
.storage
->check_fastresume(*rd
, j
.str
);
1076 case disk_io_job::check_files
:
1078 #ifdef TORRENT_DISK_STATS
1079 m_log
<< log_time() << " check files" << std::endl
;
1081 int piece_size
= j
.storage
->info()->piece_length();
1082 for (int processed
= 0; processed
< 4 * 1024 * 1024; processed
+= piece_size
)
1084 ret
= j
.storage
->check_files(j
.piece
, j
.offset
, j
.str
);
1086 #ifndef BOOST_NO_EXCEPTIONS
1089 TORRENT_ASSERT(handler
);
1090 if (handler
&& ret
== piece_manager::need_full_check
)
1091 m_ios
.post(bind(handler
, ret
, j
));
1092 #ifndef BOOST_NO_EXCEPTIONS
1093 } catch (std::exception
&) {}
1095 if (ret
!= piece_manager::need_full_check
) break;
1099 ret
= piece_manager::fatal_disk_error
;
1102 TORRENT_ASSERT(ret
!= -2 || !j
.str
.empty());
1104 // if the check is not done, add it at the end of the job queue
1105 if (ret
== piece_manager::need_full_check
)
1107 add_job(j
, handler
);
1112 case disk_io_job::save_resume_data
:
1114 #ifdef TORRENT_DISK_STATS
1115 m_log
<< log_time() << " save resume data" << std::endl
;
1117 j
.resume_data
.reset(new entry(entry::dictionary_t
));
1118 j
.storage
->write_resume_data(*j
.resume_data
);
1122 case disk_io_job::rename_file
:
1124 #ifdef TORRENT_DISK_STATS
1125 m_log
<< log_time() << " rename file" << std::endl
;
1127 ret
= j
.storage
->rename_file_impl(j
.piece
, j
.str
);
1130 #ifndef BOOST_NO_EXCEPTIONS
1131 } catch (std::exception
& e
)
1138 catch (std::exception
&) {}
1142 // if (!handler) std::cerr << "DISK THREAD: no callback specified" << std::endl;
1143 // else std::cerr << "DISK THREAD: invoking callback" << std::endl;
1144 #ifndef BOOST_NO_EXCEPTIONS
1147 TORRENT_ASSERT(ret
!= -2 || !j
.str
.empty()
1148 || j
.action
== disk_io_job::hash
);
1149 if (handler
) m_ios
.post(bind(handler
, ret
, j
));
1150 #ifndef BOOST_NO_EXCEPTIONS
1151 } catch (std::exception
&)
1153 TORRENT_ASSERT(false);
1157 TORRENT_ASSERT(false);