doc: misc cleanups and additions for RDoc
[rainbows.git] / lib / rainbows / writer_thread_spawn / client.rb
blob8f65c19b556f10ec2e312a317e681c1447048aa6
1 # -*- encoding: binary -*-
2 # :enddoc:
3 # used to wrap a BasicSocket to use with +q+ for all writes
4 # this is compatible with IO.select
5 class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr)
6   include Rainbows::Response
7   include Rainbows::SocketProxy
8   include Rainbows::WorkerYield
10   CUR = {} # :nodoc:
12   def self.quit
13     g = Rainbows::G
14     CUR.delete_if do |t,q|
15       q << nil
16       g.tick
17       t.alive? ? t.join(0.01) : true
18     end until CUR.empty?
19   end
21   def queue_writer
22     until CUR.size < MAX
23       CUR.delete_if { |t,_|
24         t.alive? ? t.join(0) : true
25       }.size >= MAX and worker_yield
26     end
28     q = Queue.new
29     self.thr = Thread.new(to_io, q) do |io, q|
30       while response = q.shift
31         begin
32           arg1, arg2, arg3 = response
33           case arg1
34           when :body then write_body(io, arg2, arg3)
35           when :close
36             io.close unless io.closed?
37             break
38           else
39             io.write(arg1)
40           end
41         rescue => e
42           Rainbows::Error.write(io, e)
43         end
44       end
45       CUR.delete(Thread.current)
46     end
47     CUR[thr] = q
48   end
50   def write(buf)
51     (self.q ||= queue_writer) << buf
52   end
54   def queue_body(body, range)
55     (self.q ||= queue_writer) << [ :body, body, range ]
56   end
58   def close
59     if q
60       q << :close
61     else
62       to_io.close
63     end
64   end
66   def closed?
67     false
68   end
69 end