1 # -*- encoding: binary -*-
3 # used to wrap a BasicSocket to use with +q+ for all writes
4 # this is compatible with IO.select
5 class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr)
6 include Rainbows::SocketProxy
7 include Rainbows::ProcessClient
8 include Rainbows::WorkerYield
13 def write_body_each(body)
14 q << [ :write_body_each, body ]
17 def write_response_close(status, headers, body, alive)
18 to_io.instance_variable_set(:@hp, @hp) # XXX ugh
19 Rainbows::SyncClose.new(body) { |sync_body|
20 q << [ :write_response, status, headers, sync_body, alive ]
24 if IO.respond_to?(:copy_stream) || IO.method_defined?(:trysendfile)
25 def write_response(status, headers, body, alive)
26 self.q ||= queue_writer
27 if body.respond_to?(:close)
28 write_response_close(status, headers, body, alive)
29 elsif body.respond_to?(:to_path)
30 write_response_path(status, headers, body, alive)
36 def write_body_file(body, range)
37 q << [ :write_body_file, body, range ]
40 def write_body_stream(body)
41 q << [ :write_body_stream, body ]
43 else # each-only body response
44 def write_response(status, headers, body, alive)
45 self.q ||= queue_writer
46 if body.respond_to?(:close)
47 write_response_close(status, headers, body, alive)
52 end # each-only body response
57 CUR.delete_if do |t,q|
60 t.alive? ? t.join(0.01) : true
67 t.alive? ? t.join(0) : true
68 }.size >= MAX and worker_yield
72 self.thr = Thread.new(to_io) do |io|
80 io.close unless io.closed?
86 Rainbows::Error.write(io, e)
89 CUR.delete(Thread.current)
95 (self.q ||= queue_writer) << buf