switch from IO#sendfile_nonblock to IO#trysendfile
[rainbows.git] / lib / rainbows / writer_thread_spawn / client.rb
blob3106253e1e0e1327d421702b5168ca95f37da022
1 # -*- encoding: binary -*-
2 # :enddoc:
3 # used to wrap a BasicSocket to use with +q+ for all writes
4 # this is compatible with IO.select
5 class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr)
6   include Rainbows::SocketProxy
7   include Rainbows::ProcessClient
8   include Rainbows::WorkerYield
10   CUR = {} # :nodoc:
12   module Methods
13     def write_body_each(body)
14       q << [ :write_body_each, body ]
15     end
17     def write_response_close(status, headers, body, alive)
18       to_io.instance_variable_set(:@hp, @hp) # XXX ugh
19       Rainbows::SyncClose.new(body) { |sync_body|
20         q << [ :write_response, status, headers, sync_body, alive ]
21       }
22     end
24     if IO.respond_to?(:copy_stream) || IO.method_defined?(:trysendfile)
25       def write_response(status, headers, body, alive)
26         self.q ||= queue_writer
27         if body.respond_to?(:close)
28           write_response_close(status, headers, body, alive)
29         elsif body.respond_to?(:to_path)
30           write_response_path(status, headers, body, alive)
31         else
32           super
33         end
34       end
36       def write_body_file(body, range)
37         q << [ :write_body_file, body, range ]
38       end
40       def write_body_stream(body)
41         q << [ :write_body_stream, body ]
42       end
43     else # each-only body response
44       def write_response(status, headers, body, alive)
45         self.q ||= queue_writer
46         if body.respond_to?(:close)
47           write_response_close(status, headers, body, alive)
48         else
49           super
50         end
51       end
52     end # each-only body response
53   end # module Methods
54   include Methods
56   def self.quit
57     CUR.delete_if do |t,q|
58       q << nil
59       Rainbows.tick
60       t.alive? ? t.join(0.01) : true
61     end until CUR.empty?
62   end
64   def queue_writer
65     until CUR.size < MAX
66       CUR.delete_if { |t,_|
67         t.alive? ? t.join(0) : true
68       }.size >= MAX and worker_yield
69     end
71     q = Queue.new
72     self.thr = Thread.new(to_io, q) do |io, q|
73       while op = q.shift
74         begin
75           op, *rest = op
76           case op
77           when String
78             io.kgio_write(op)
79           when :close
80             io.close unless io.closed?
81             break
82           else
83             io.__send__ op, *rest
84           end
85         rescue => e
86           Rainbows::Error.write(io, e)
87         end
88       end
89       CUR.delete(Thread.current)
90     end
91     CUR[thr] = q
92   end
94   def write(buf)
95     (self.q ||= queue_writer) << buf
96   end
98   def close
99     if q
100       q << :close
101     else
102       to_io.close
103     end
104   end
106   def closed?
107     to_io.closed?
108   end