start using kgio library
[rainbows.git] / lib / rainbows / thread_spawn.rb
bloba0ccde61ee0c3d3380208b00c6d1cfbccbfae882
1 # -*- encoding: binary -*-
2 require 'thread'
3 module Rainbows
5   # Spawns a new thread for every client connection we accept().  This
6   # model is recommended for platforms like Ruby 1.8 where spawning new
7   # threads is inexpensive.
8   #
9   # This model should provide a high level of compatibility with all
10   # Ruby implementations, and most libraries and applications.
11   # Applications running under this model should be thread-safe
12   # but not necessarily reentrant.
13   #
14   # If you're connecting to external services and need to perform DNS
15   # lookups, consider using the "resolv-replace" library which replaces
16   # parts of the core Socket package with concurrent DNS lookup
17   # capabilities
19   module ThreadSpawn
20     include Base
22     def accept_loop(klass) #:nodoc:
23       lock = Mutex.new
24       limit = worker_connections
25       LISTENERS.each do |l|
26         klass.new(l) do |l|
27           begin
28             if lock.synchronize { G.cur >= limit }
29               # Sleep if we're busy, another less busy worker process may
30               # take it for us if we sleep. This is gross but other options
31               # still suck because they require expensive/complicated
32               # synchronization primitives for _every_ case, not just this
33               # unlikely one.  Since this case is (or should be) uncommon,
34               # just busy wait when we have to.
35               # We don't use Thread.pass because it needlessly spins the
36               # CPU during I/O wait, CPU cycles that can be better used
37               # by other worker _processes_.
38               sleep(0.01)
39             elsif c = l.kgio_accept
40               klass.new(c) do |c|
41                 begin
42                   lock.synchronize { G.cur += 1 }
43                   process_client(c)
44                 ensure
45                   lock.synchronize { G.cur -= 1 }
46                 end
47               end
48             end
49           rescue => e
50             Error.listen_loop(e)
51           end while G.alive
52         end
53       end
54       sleep 1 while G.tick || lock.synchronize { G.cur > 0 }
55     end
57     def worker_loop(worker) #:nodoc:
58       init_worker_process(worker)
59       accept_loop(Thread)
60     end
61   end
62 end