*epoll: refactor common loop code
[rainbows.git] / lib / rainbows / thread_pool.rb
blob8f2b629132bbf55da5d51b3ee33e1eeaca8b9fe4
1 # -*- encoding: binary -*-
3 # Implements a worker thread pool model.  This is suited for platforms
4 # like Ruby 1.9, where the cost of dynamically spawning a new thread
5 # for every new client connection is higher than with the ThreadSpawn
6 # model.
8 # This model should provide a high level of compatibility with all
9 # Ruby implementations, and most libraries and applications.
10 # Applications running under this model should be thread-safe
11 # but not necessarily reentrant.
13 # Applications using this model are required to be thread-safe.
14 # Threads are never spawned dynamically under this model.  If you're
15 # connecting to external services and need to perform DNS lookups,
16 # consider using the "resolv-replace" library which replaces parts of
17 # the core Socket package with concurrent DNS lookup capabilities.
19 # This model probably less suited for many slow clients than the
20 # others and thus a lower +worker_connections+ setting is recommended.
22 module Rainbows::ThreadPool
23   include Rainbows::Base
25   def worker_loop(worker) # :nodoc:
26     init_worker_process(worker)
27     pool = (1..worker_connections).map do
28       Thread.new { LISTENERS.size == 1 ? sync_worker : async_worker }
29     end
31     while Rainbows.alive
32       # if any worker dies, something is serious wrong, bail
33       pool.each do |thr|
34         Rainbows.tick or break
35         thr.join(1) and Rainbows.quit!
36       end
37     end
38     Rainbows::JoinThreads.acceptors(pool)
39   end
41   def sync_worker # :nodoc:
42     s = LISTENERS[0]
43     begin
44       c = s.kgio_accept and c.process_loop
45     rescue => e
46       Rainbows::Error.listen_loop(e)
47     end while Rainbows.alive
48   end
50   def async_worker # :nodoc:
51     begin
52       # TODO: check if select() or accept() is a problem on large
53       # SMP systems under Ruby 1.9.  Hundreds of native threads
54       # all working off the same socket could be a thundering herd
55       # problem.  On the other hand, a thundering herd may not
56       # even incur as much overhead as an extra Mutex#synchronize
57       ret = select(LISTENERS) and ret[0].each do |s|
58         s = s.kgio_tryaccept and s.process_loop
59       end
60     rescue Errno::EINTR
61     rescue => e
62       Rainbows::Error.listen_loop(e)
63     end while Rainbows.alive
64   end
66   def join_threads(threads) # :nodoc:
67     Rainbows.quit!
68     threads.delete_if do |thr|
69       Rainbows.tick
70       begin
71         thr.run
72         thr.join(0.01)
73       rescue
74         true
75       end
76     end until threads.empty?
77   end
78 end