writer_thread_spawn: factor out Client.quit
[rainbows.git] / lib / rainbows / writer_thread_spawn / client.rb
blobf9c373e720b1a5ca68dc472ff5302c78e647a55b
1 # -*- encoding: binary -*-
2 # used to wrap a BasicSocket to use with +q+ for all writes
3 # this is compatible with IO.select
4 class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr)
5   include Rainbows::Response
6   include Rainbows::SocketProxy
8   CUR = {} # :nodoc:
10   def self.quit
11     g = Rainbows::G
12     CUR.delete_if do |t,q|
13       q << nil
14       g.tick
15       t.alive? ? t.join(0.01) : true
16     end until CUR.empty?
17   end
19   def queue_writer
20     # not using Thread.pass here because that spins the CPU during
21     # I/O wait and will eat cycles from other worker processes.
22     until CUR.size < MAX
23       CUR.delete_if { |t,_|
24         t.alive? ? t.join(0) : true
25       }.size >= MAX and sleep(0.01)
26     end
28     q = Queue.new
29     self.thr = Thread.new(to_io, q) do |io, q|
30       while response = q.shift
31         begin
32           arg1, arg2, arg3 = response
33           case arg1
34           when :body then write_body(io, arg2, arg3)
35           when :close
36             io.close unless io.closed?
37             break
38           else
39             io.write(arg1)
40           end
41         rescue => e
42           Rainbows::Error.write(io, e)
43         end
44       end
45       CUR.delete(Thread.current)
46     end
47     CUR[thr] = q
48   end
50   def write(buf)
51     (self.q ||= queue_writer) << buf
52   end
54   def queue_body(body, range)
55     (self.q ||= queue_writer) << [ :body, body, range ]
56   end
58   def close
59     if q
60       q << :close
61     else
62       to_io.close
63     end
64   end
66   def closed?
67     false
68   end
69 end