rev_thread_spawn: make 1.9 TeeInput performance tolerable
[rainbows.git] / lib / rainbows / ev_thread_core.rb
blobe132f188267712fa4db0defa917161e7fd8a0a4f
1 # -*- encoding: binary -*-
2 require 'thread' # for Queue
3 require 'rainbows/ev_core'
5 module Rainbows
7   # base module for mixed Thread + evented models like RevThreadSpawn
8   module EvThreadCore
9     include EvCore
11     def post_init
12       super
13       @lock = Mutex.new
14       @thread = nil
15     end
17     # we pass ourselves off as a Socket to Unicorn::TeeInput and this
18     # is the only method Unicorn::TeeInput requires from the socket
19     def readpartial(length, buf = "")
20       # we must modify the original buffer if there was one
21       length == 0 and return buf.replace("")
23       # wait on the main loop to feed us
24       while @tbuf.size == 0
25         @tbuf.write(@state.pop)
26         resume
27       end
28       buf.replace(@tbuf.read(length))
29     end
31     def app_spawn(input)
32       begin
33         @thread.nil? or @thread.join # only one thread per connection
34         env = @env.dup
35         alive, headers = @hp.keepalive?, @hp.headers?
36         @thread = Thread.new(self) do |client|
37           begin
38             env[REMOTE_ADDR] = @remote_addr
39             env[RACK_INPUT] = input || TeeInput.new(client, env, @hp, @buf)
40             response = APP.call(env.update(RACK_DEFAULTS))
41             if 100 == response.first.to_i
42               write(EXPECT_100_RESPONSE)
43               env.delete(HTTP_EXPECT)
44               response = APP.call(env)
45             end
47             alive &&= G.alive
48             out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if headers
49             response_write(response, out)
50           rescue => e
51             handle_error(e) rescue nil
52           end
53         end
54         if alive # in case we pipeline
55           @hp.reset
56           redo if @hp.headers(@env.clear, @buf)
57         end
58       end while false
59     end
61     def on_read(data)
62       case @state
63       when :headers
64         @hp.headers(@env, @buf << data) or return
65         if 0 == @hp.content_length
66           app_spawn(HttpRequest::NULL_IO) # common case
67         else # nil or len > 0
68           @state, @tbuf = Queue.new, ::IO::Buffer.new
69           app_spawn(nil)
70         end
71       when Queue
72         pause
73         @state << data
74       end
75       rescue => e
76         handle_error(e)
77     end
79   end
80 end