ev_core: reuse buffer to avoid GC thrashing
[rainbows.git] / lib / rainbows / ev_core.rb
blobb9b3381be0941368481413ebe0d93a6a9e3b4207
1 # -*- encoding: binary -*-
2 # :enddoc:
3 # base module for evented models like Rev and EventMachine
4 module Rainbows::EvCore
5   include Rainbows::Const
6   include Rainbows::Response
7   NULL_IO = Unicorn::HttpRequest::NULL_IO
8   HttpParser = Rainbows::HttpParser
9   autoload :CapInput, 'rainbows/ev_core/cap_input'
10   RBUF = ""
12   # Apps may return this Rack response: AsyncResponse = [ -1, {}, [] ]
13   ASYNC_CALLBACK = "async.callback".freeze
14   ASYNC_CLOSE = "async.close".freeze
16   def write_async_response(response)
17     status, headers, body = response
18     if alive = @hp.next?
19       # we can't do HTTP keepalive without Content-Length or
20       # "Transfer-Encoding: chunked", and the async.callback stuff
21       # isn't Rack::Lint-compatible, so we have to enforce it here.
22       headers = Rack::Utils::HeaderHash.new(headers) unless Hash === headers
23       alive = headers.include?(Content_Length) ||
24               !!(%r{\Achunked\z}i =~ headers[Transfer_Encoding])
25     end
26     @deferred = nil
27     ev_write_response(status, headers, body, alive)
28   end
30   def post_init
31     @hp = HttpParser.new
32     @env = @hp.env
33     @buf = @hp.buf
34     @state = :headers # [ :body [ :trailers ] ] :app_call :close
35   end
37   # graceful exit, like SIGQUIT
38   def quit
39     @state = :close
40   end
42   def want_more
43   end
45   def handle_error(e)
46     msg = Rainbows::Error.response(e) and write(msg)
47     ensure
48       quit
49   end
51   # returns whether to enable response chunking for autochunk models
52   def stream_response_headers(status, headers, alive)
53     headers = Rack::Utils::HeaderHash.new(headers) unless Hash === headers
54     if headers.include?(Content_Length)
55       rv = false
56     else
57       rv = !!(headers[Transfer_Encoding] =~ %r{\Achunked\z}i)
58       rv = false if headers.delete('X-Rainbows-Autochunk') == 'no'
59     end
60     write_headers(status, headers, alive)
61     rv
62   end
64   def prepare_request_body
65     # since we don't do streaming input, we have no choice but
66     # to take over 100-continue handling from the Rack application
67     if @env[HTTP_EXPECT] =~ /\A100-continue\z/i
68       write(EXPECT_100_RESPONSE)
69       @env.delete(HTTP_EXPECT)
70     end
71     @input = mkinput
72     @hp.filter_body(@buf2 = "", @buf)
73     @input << @buf2
74     on_read("")
75   end
77   # TeeInput doesn't map too well to this right now...
78   def on_read(data)
79     case @state
80     when :headers
81       @buf << data
82       @hp.parse or return want_more
83       @state = :body
84       if 0 == @hp.content_length
85         @input = NULL_IO
86         app_call # common case
87       else # nil or len > 0
88         prepare_request_body
89       end
90     when :body
91       if @hp.body_eof?
92         if @hp.content_length
93           @input.rewind
94           app_call
95         else
96           @state = :trailers
97           on_read(data)
98         end
99       elsif data.size > 0
100         @hp.filter_body(@buf2, @buf << data)
101         @input << @buf2
102         on_read("")
103       else
104         want_more
105       end
106     when :trailers
107       if @hp.trailers(@env, @buf << data)
108         @input.rewind
109         app_call
110       else
111         want_more
112       end
113     end
114     rescue => e
115       handle_error(e)
116   end
118   def err_413(msg)
119     write(Rainbows::Const::ERROR_413_RESPONSE)
120     quit
121     # zip back up the stack
122     raise IOError, msg, []
123   end
125   TmpIO = Unicorn::TmpIO
127   def io_for(bytes)
128     bytes <= CBB ? StringIO.new("") : TmpIO.new
129   end
131   def mkinput
132     max = Rainbows.max_bytes
133     len = @hp.content_length
134     if len
135       if max && (len > max)
136         err_413("Content-Length too big: #{len} > #{max}")
137       end
138       io_for(len)
139     else
140       max ? CapInput.new(io_for(max), self, max) : TmpIO.new
141     end
142   end
144   def self.setup
145     const_set :CBB, Unicorn::TeeInput.client_body_buffer_size
146   end