From: Eric Wong Date: Fri, 8 Feb 2013 22:45:20 +0000 (+0000) Subject: hijacking support for Rack 1.5.x users X-Git-Tag: v4.5.0~9 X-Git-Url: https://repo.or.cz/w/rainbows.git/commitdiff_plain/e166cfe5e8d648b544b1291ec157bd234a425e21 hijacking support for Rack 1.5.x users This requires Rack 1.5.x and unicorn 4.6.0 for hijacking support. Older versions of Rack continue to work fine, but we must use unicorn 4.6.0 features to support this. --- diff --git a/lib/rainbows/coolio/client.rb b/lib/rainbows/coolio/client.rb index 8d48bbf..843f574 100644 --- a/lib/rainbows/coolio/client.rb +++ b/lib/rainbows/coolio/client.rb @@ -86,6 +86,12 @@ class Rainbows::Coolio::Client < Coolio::IO @deferred = true end + def hijacked + CONN.delete(self) + detach + nil + end + def write_response_path(status, headers, body, alive) io = body_to_io(body) st = io.stat @@ -93,7 +99,8 @@ class Rainbows::Coolio::Client < Coolio::IO if st.file? defer_file(status, headers, body, alive, io, st) elsif st.socket? || st.pipe? - chunk = stream_response_headers(status, headers, alive) + chunk = stream_response_headers(status, headers, alive, body) + return hijacked if nil == chunk stream_response_body(body, io, chunk) else # char or block device... WTF? @@ -103,10 +110,11 @@ class Rainbows::Coolio::Client < Coolio::IO def ev_write_response(status, headers, body, alive) if body.respond_to?(:to_path) - write_response_path(status, headers, body, alive) + body = write_response_path(status, headers, body, alive) else - write_response(status, headers, body, alive) + body = write_response(status, headers, body, alive) end + return hijacked unless body return quit unless alive && :close != @state @state = :headers end @@ -117,9 +125,11 @@ class Rainbows::Coolio::Client < Coolio::IO @env[RACK_INPUT] = input @env[REMOTE_ADDR] = @_io.kgio_addr @env[ASYNC_CALLBACK] = method(:write_async_response) + @hp.hijack_setup(@env, @_io) status, headers, body = catch(:async) { APP.call(@env.merge!(RACK_DEFAULTS)) } + return hijacked if @hp.hijacked? (nil == status || -1 == status) ? @deferred = true : ev_write_response(status, headers, body, @hp.next?) @@ -186,12 +196,13 @@ class Rainbows::Coolio::Client < Coolio::IO def defer_file(status, headers, body, alive, io, st) if r = sendfile_range(status, headers) status, headers, range = r - write_headers(status, headers, alive) + body = write_headers(status, headers, alive, body) or return hijacked range and defer_file_stream(range[0], range[1], io, body) else - write_headers(status, headers, alive) + write_headers(status, headers, alive, body) or return hijacked defer_file_stream(0, st.size, io, body) end + body end def stream_file_chunk(sf) # +sf+ is a Rainbows::StreamFile object @@ -207,8 +218,9 @@ class Rainbows::Coolio::Client < Coolio::IO end else def defer_file(status, headers, body, alive, io, st) - write_headers(status, headers, alive) + write_headers(status, headers, alive, body) or return hijacked defer_file_stream(0, st.size, io, body) + body end def stream_file_chunk(body) diff --git a/lib/rainbows/coolio/thread_client.rb b/lib/rainbows/coolio/thread_client.rb index abc11d2..ee9fa04 100644 --- a/lib/rainbows/coolio/thread_client.rb +++ b/lib/rainbows/coolio/thread_client.rb @@ -14,6 +14,7 @@ class Rainbows::Coolio::ThreadClient < Rainbows::Coolio::Client # this is only called in the master thread def response_write(response) + return hijacked if @hp.hijacked? ev_write_response(*response, @hp.next?) rescue => e handle_error(e) @@ -25,6 +26,7 @@ class Rainbows::Coolio::ThreadClient < Rainbows::Coolio::Client def app_response begin @env[REMOTE_ADDR] = @_io.kgio_addr + @hp.hijack_setup(@env, @_io) APP.call(@env.merge!(RACK_DEFAULTS)) rescue => e Rainbows::Error.app(e) # we guarantee this does not raise diff --git a/lib/rainbows/epoll/client.rb b/lib/rainbows/epoll/client.rb index d72696b..f6af6fa 100644 --- a/lib/rainbows/epoll/client.rb +++ b/lib/rainbows/epoll/client.rb @@ -52,6 +52,7 @@ module Rainbows::Epoll::Client when String on_read(rv) return if @wr_queue[0] || closed? + return hijacked if @hp.hijacked? when :wait_readable KATO[self] = @@last_expire if :headers == @state return EP.set(self, IN) @@ -67,7 +68,9 @@ module Rainbows::Epoll::Client def app_call input # called by on_read() @env[RACK_INPUT] = input @env[REMOTE_ADDR] = kgio_addr + @hp.hijack_setup(@env, self) status, headers, body = APP.call(@env.merge!(RACK_DEFAULTS)) + return hijacked if @hp.hijacked? ev_write_response(status, headers, body, @hp.next?) end @@ -78,7 +81,8 @@ module Rainbows::Epoll::Client if st.file? defer_file(status, headers, body, alive, io, st) elsif st.socket? || st.pipe? - chunk = stream_response_headers(status, headers, alive) + chunk = stream_response_headers(status, headers, alive, body) + return hijacked if nil == chunk stream_response_body(body, io, chunk) else # char or block device... WTF? @@ -102,10 +106,18 @@ module Rainbows::Epoll::Client else write_response(status, headers, body, alive) end + return hijacked if @hp.hijacked? # try to read more if we didn't have to buffer writes next_request if alive && 0 == @wr_queue.size end + def hijacked + KATO.delete(self) + Server.decr # no other place to do this + EP.delete(self) + nil + end + def next_request if 0 == @buf.size want_more @@ -113,6 +125,7 @@ module Rainbows::Epoll::Client # pipelined request (already in buffer) on_read(Z) return if @wr_queue[0] || closed? + return hijacked if @hp.hijacked? close if :close == @state end end @@ -197,13 +210,14 @@ module Rainbows::Epoll::Client true end + # Rack apps should not hijack here, but they may... def defer_file(status, headers, body, alive, io, st) if r = sendfile_range(status, headers) status, headers, range = r - write_headers(status, headers, alive) + write_headers(status, headers, alive, body) or return hijacked range and defer_file_stream(range[0], range[1], io, body) else - write_headers(status, headers, alive) + write_headers(status, headers, alive, body) or return hijacked defer_file_stream(0, st.size, io, body) end end diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb index 46feaff..5c3c5b8 100644 --- a/lib/rainbows/ev_core.rb +++ b/lib/rainbows/ev_core.rb @@ -52,16 +52,17 @@ module Rainbows::EvCore end # returns whether to enable response chunking for autochunk models - def stream_response_headers(status, headers, alive) + # returns nil if request was hijacked in response stage + def stream_response_headers(status, headers, alive, body) headers = Rack::Utils::HeaderHash.new(headers) unless Hash === headers if headers.include?(Content_Length) - write_headers(status, headers, alive) + write_headers(status, headers, alive, body) or return return false end case @env[HTTP_VERSION] when "HTTP/1.0" # disable HTTP/1.0 keepalive to stream - write_headers(status, headers, false) + write_headers(status, headers, false, body) or return @hp.clear false when nil # "HTTP/0.9" @@ -69,7 +70,7 @@ module Rainbows::EvCore else rv = !!(headers[Transfer_Encoding] =~ %r{\Achunked\z}i) rv = false unless @env["rainbows.autochunk"] - write_headers(status, headers, alive) + write_headers(status, headers, alive, body) or return rv end end diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb index 26f0dbd..9871c09 100644 --- a/lib/rainbows/event_machine/client.rb +++ b/lib/rainbows/event_machine/client.rb @@ -10,6 +10,7 @@ class Rainbows::EventMachine::Client < EM::Connection end alias write send_data + alias hijacked detach def receive_data(data) # To avoid clobbering the current streaming response @@ -37,9 +38,11 @@ class Rainbows::EventMachine::Client < EM::Connection @env[REMOTE_ADDR] = @_io.kgio_addr @env[ASYNC_CALLBACK] = method(:write_async_response) @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new + @hp.hijack_setup(@env, @_io) status, headers, body = catch(:async) { APP.call(@env.merge!(RACK_DEFAULTS)) } + return hijacked if @hp.hijacked? if (nil == status || -1 == status) @deferred = true @@ -67,8 +70,8 @@ class Rainbows::EventMachine::Client < EM::Connection def ev_write_response(status, headers, body, alive) @state = :headers if alive if body.respond_to?(:errback) && body.respond_to?(:callback) + write_headers(status, headers, alive, body) or return hijacked @deferred = body - write_headers(status, headers, alive) write_body_each(body) deferred_errback(body) deferred_callback(body, alive) @@ -77,21 +80,22 @@ class Rainbows::EventMachine::Client < EM::Connection st = File.stat(path = body.to_path) if st.file? - write_headers(status, headers, alive) + write_headers(status, headers, alive, body) or return hijacked @deferred = stream_file_data(path) deferred_errback(body) deferred_callback(body, alive) return elsif st.socket? || st.pipe? + chunk = stream_response_headers(status, headers, alive, body) + return hijacked if nil == chunk io = body_to_io(@deferred = body) - chunk = stream_response_headers(status, headers, alive) m = chunk ? Rainbows::EventMachine::ResponseChunkPipe : Rainbows::EventMachine::ResponsePipe return EM.watch(io, m, self).notify_readable = true end # char or block device... WTF? fall through to body.each end - write_response(status, headers, body, alive) + write_response(status, headers, body, alive) or return hijacked if alive if @deferred.nil? if @buf.empty? @@ -112,6 +116,7 @@ class Rainbows::EventMachine::Client < EM::Connection end def unbind + return if @hp.hijacked? async_close = @env[ASYNC_CLOSE] and async_close.succeed @deferred.respond_to?(:fail) and @deferred.fail begin diff --git a/lib/rainbows/process_client.rb b/lib/rainbows/process_client.rb index b685001..f58770c 100644 --- a/lib/rainbows/process_client.rb +++ b/lib/rainbows/process_client.rb @@ -40,6 +40,7 @@ module Rainbows::ProcessClient set_input(env, hp) env[REMOTE_ADDR] = kgio_addr + hp.hijack_setup(env, to_io) status, headers, body = APP.call(env.merge!(RACK_DEFAULTS)) if 100 == status.to_i @@ -47,7 +48,8 @@ module Rainbows::ProcessClient env.delete(HTTP_EXPECT) status, headers, body = APP.call(env) end - write_response(status, headers, body, alive = @hp.next?) + return if hp.hijacked? + write_response(status, headers, body, alive = hp.next?) or return end while alive # if we get any error, try to write something back to the client # assuming we haven't closed the socket, but don't get hung up @@ -56,7 +58,7 @@ module Rainbows::ProcessClient rescue => e handle_error(e) ensure - close unless closed? + close unless closed? || hp.hijacked? end def handle_error(e) @@ -71,13 +73,15 @@ module Rainbows::ProcessClient begin set_input(env, hp) env[REMOTE_ADDR] = kgio_addr + hp.hijack_setup(env, to_io) status, headers, body = APP.call(env.merge!(RACK_DEFAULTS)) if 100 == status.to_i write(EXPECT_100_RESPONSE) env.delete(HTTP_EXPECT) status, headers, body = APP.call(env) end - write_response(status, headers, body, alive = hp.next?) + return if hp.hijacked? + write_response(status, headers, body, alive = hp.next?) or return end while alive && pipeline_ready(hp) alive or close rescue => e diff --git a/lib/rainbows/response.rb b/lib/rainbows/response.rb index f8b0831..8a0daf8 100644 --- a/lib/rainbows/response.rb +++ b/lib/rainbows/response.rb @@ -19,23 +19,56 @@ module Rainbows::Response Rainbows::HttpParser.keepalive_requests = 0 end - def write_headers(status, headers, alive) - @hp.headers? or return + # Rack 1.5.0 (protocol version 1.2) adds response hijacking support + if ((Rack::VERSION[0] << 8) | Rack::VERSION[1]) >= 0x0102 + RACK_HIJACK = "rack.hijack" + + def hijack_prepare(value) + value + end + + def hijack_socket + @hp.env[RACK_HIJACK].call + end + else + def hijack_prepare(_) + end + end + + # returns the original body on success + # returns nil if the headers hijacked the response body + def write_headers(status, headers, alive, body) + @hp.headers? or return body + hijack = nil status = CODES[status.to_i] || status buf = "HTTP/1.1 #{status}\r\n" \ "Date: #{httpdate}\r\n" \ - "Status: #{status}\r\n" \ - "Connection: #{alive ? KeepAlive : Close}\r\n" + "Status: #{status}\r\n" headers.each do |key, value| - next if %r{\A(?:Date\z|Connection\z)}i =~ key - if value =~ /\n/ - # avoiding blank, key-only cookies with /\n+/ - buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join + case key + when %r{\A(?:Date\z|Connection\z)}i + next + when "rack.hijack" + # this was an illegal key in Rack < 1.5, so it should be + # OK to silently discard it for those older versions + hijack = hijack_prepare(value) + alive = false # No persistent connections for hijacking else - buf << "#{key}: #{value}\r\n" + if /\n/ =~ value + # avoiding blank, key-only cookies with /\n+/ + buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join + else + buf << "#{key}: #{value}\r\n" + end end end - write(buf << CRLF) + write(buf << "Connection: #{alive ? KeepAlive : Close}\r\n\r\n") + + if hijack + body = nil # ensure caller does not close body + hijack.call(hijack_socket) + end + body end def close_if_private(io) @@ -70,8 +103,9 @@ module Rainbows::Response # generic response writer, used for most dynamically-generated responses # and also when copy_stream and/or IO#trysendfile is unavailable def write_response(status, headers, body, alive) - write_headers(status, headers, alive) - write_body_each(body) + body = write_headers(status, headers, alive, body) + write_body_each(body) if body + body ensure body.close if body.respond_to?(:close) end @@ -166,21 +200,23 @@ module Rainbows::Response if File.file?(body.to_path) if r = sendfile_range(status, headers) status, headers, range = r - write_headers(status, headers, alive) - write_body_file(body, range) if range + body = write_headers(status, headers, alive, body) + write_body_file(body, range) if body && range else - write_headers(status, headers, alive) - write_body_file(body, nil) + body = write_headers(status, headers, alive, body) + write_body_file(body, nil) if body end else - write_headers(status, headers, alive) - write_body_stream(body) + body = write_headers(status, headers, alive, body) + write_body_stream(body) if body end + body ensure body.close if body.respond_to?(:close) end module ToPath + # returns nil if hijacked def write_response(status, headers, body, alive) if body.respond_to?(:to_path) write_response_path(status, headers, body, alive) diff --git a/lib/rainbows/revactor/client/methods.rb b/lib/rainbows/revactor/client/methods.rb index b2e1847..592c996 100644 --- a/lib/rainbows/revactor/client/methods.rb +++ b/lib/rainbows/revactor/client/methods.rb @@ -36,7 +36,7 @@ module Rainbows::Revactor::Client::Methods end def write_response(status, headers, body, alive) - super(status, headers, body, alive) + super(status, headers, body, alive) or return alive && @ts and @hp.buf << @ts.leftover end diff --git a/lib/rainbows/stream_response_epoll.rb b/lib/rainbows/stream_response_epoll.rb index 3bb3540..33d7386 100644 --- a/lib/rainbows/stream_response_epoll.rb +++ b/lib/rainbows/stream_response_epoll.rb @@ -26,18 +26,24 @@ module Rainbows::StreamResponseEpoll def http_response_write(socket, status, headers, body) status = CODES[status.to_i] || status - ep_client = false + hijack = ep_client = false if headers # don't set extra headers here, this is only intended for # consuming by nginx. buf = "HTTP/1.0 #{status}\r\nStatus: #{status}\r\n" headers.each do |key, value| - if value =~ /\n/ - # avoiding blank, key-only cookies with /\n+/ - buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join + case key + when "rack.hijack" + hijack = hijack_prepare(value) + body = nil # ensure we do not close body else - buf << "#{key}: #{value}\r\n" + if /\n/ =~ value + # avoiding blank, key-only cookies with /\n+/ + buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join + else + buf << "#{key}: #{value}\r\n" + end end end buf << HEADER_END @@ -48,11 +54,22 @@ module Rainbows::StreamResponseEpoll buf = rv when :wait_writable ep_client = Client.new(socket, buf) - body.each { |chunk| ep_client.write(chunk) } - return ep_client.close + if hijack + ep_client.hijack(hijack) + else + body.each { |chunk| ep_client.write(chunk) } + ep_client.close + end + # body is nil on hijack, in which case ep_client is never closed by us + return end while true end + if hijack + hijack.call(socket) + return + end + body.each do |chunk| if ep_client ep_client.write(chunk) @@ -67,14 +84,15 @@ module Rainbows::StreamResponseEpoll end while true end end - ensure - body.respond_to?(:close) and body.close - if ep_client - ep_client.close - else - socket.shutdown - socket.close - end + ensure + return if hijack + body.respond_to?(:close) and body.close + if ep_client + ep_client.close + else + socket.shutdown + socket.close + end end # once a client is accepted, it is processed in its entirety here @@ -88,6 +106,7 @@ module Rainbows::StreamResponseEpoll status, headers, body = @app.call(env) end @request.headers? or headers = nil + return if @request.hijacked? http_response_write(client, status, headers, body) rescue => e handle_error(client, e) diff --git a/lib/rainbows/stream_response_epoll/client.rb b/lib/rainbows/stream_response_epoll/client.rb index db303b0..dc226d6 100644 --- a/lib/rainbows/stream_response_epoll/client.rb +++ b/lib/rainbows/stream_response_epoll/client.rb @@ -18,7 +18,7 @@ class Rainbows::StreamResponseEpoll::Client attr_reader :to_io def initialize(io, unwritten) - @closed = false + @finish = false @to_io = io @wr_queue = [ unwritten.dup ] EP.set(self, OUT) @@ -29,7 +29,11 @@ class Rainbows::StreamResponseEpoll::Client end def close - @closed = true + @finish = true + end + + def hijack(hijack) + @finish = hijack end def epoll_run @@ -49,10 +53,14 @@ class Rainbows::StreamResponseEpoll::Client end def on_write_complete - if @closed + if true == @finish @to_io.shutdown @to_io.close N.decr(0, 1) + elsif @finish.respond_to?(:call) # hijacked + EP.delete(self) + N.decr(0, 1) + @finish.call(@to_io) end end end diff --git a/lib/rainbows/writer_thread_pool/client.rb b/lib/rainbows/writer_thread_pool/client.rb index 4df7f49..e02d6a8 100644 --- a/lib/rainbows/writer_thread_pool/client.rb +++ b/lib/rainbows/writer_thread_pool/client.rb @@ -8,11 +8,13 @@ class Rainbows::WriterThreadPool::Client < Struct.new(:to_io, :q) module Methods def write_body_each(body) + return if @hp.hijacked? q << [ to_io, :write_body_each, body ] end def write_response_close(status, headers, body, alive) to_io.instance_variable_set(:@hp, @hp) # XXX ugh + return if @hp.hijacked? Rainbows::SyncClose.new(body) { |sync_body| q << [ to_io, :write_response, status, headers, sync_body, alive ] } diff --git a/rainbows.gemspec b/rainbows.gemspec index e7e832b..c29f308 100644 --- a/rainbows.gemspec +++ b/rainbows.gemspec @@ -28,7 +28,7 @@ Gem::Specification.new do |s| s.add_dependency(%q, ['~> 2.5']) # we need Unicorn for the HTTP parser and process management - s.add_dependency(%q, ["~> 4.1"]) + s.add_dependency(%q, ["~> 4.6"]) # 4.6.0+ supports hijacking s.add_development_dependency(%q, "~> 3.1") s.add_development_dependency(%q, "~> 1.6") diff --git a/t/hijack.ru b/t/hijack.ru new file mode 100644 index 0000000..64c23d7 --- /dev/null +++ b/t/hijack.ru @@ -0,0 +1,56 @@ +use Rack::Lint +use Rack::ContentLength +use Rack::ContentType, "text/plain" +class DieIfUsed + def each + abort "body.each called after response hijack\n" + end + + def close + abort "body.close called after response hijack\n" + end +end +def lazy_close(io) + thr = Thread.new do + # wait and see if Rainbows! accidentally closes us + sleep((ENV["DELAY"] || 10).to_i) + begin + io.close + rescue => e + warn "E: #{e.message} (#{e.class})" + exit!(3) + end + end + at_exit { thr.join } +end + +run lambda { |env| + case env["PATH_INFO"] + when "/hijack_req" + if env["rack.hijack?"] + io = env["rack.hijack"].call + if io.respond_to?(:read_nonblock) && + env["rack.hijack_io"].respond_to?(:read_nonblock) + + # exercise both, since we Rack::Lint may use different objects + env["rack.hijack_io"].write("HTTP/1.0 200 OK\r\n\r\n") + io.write("request.hijacked") + lazy_close(io) + return [ 500, {}, DieIfUsed.new ] + end + end + [ 500, {}, [ "hijack BAD\n" ] ] + when "/hijack_res" + r = "response.hijacked" + [ 200, + { + "Content-Length" => r.bytesize.to_s, + "rack.hijack" => proc do |io| + io.write(r) + lazy_close(io) + end + }, + DieIfUsed.new + ] + end +} diff --git a/t/t0800-rack-hijack.sh b/t/t0800-rack-hijack.sh new file mode 100755 index 0000000..c8f976d --- /dev/null +++ b/t/t0800-rack-hijack.sh @@ -0,0 +1,27 @@ +#!/bin/sh +. ./test-lib.sh +t_plan 5 "rack.hijack tests (Rack 1.5+ (Rack::VERSION >= [1,2]))" + +t_begin "setup and start" && { + rainbows_setup + rainbows -D -c $unicorn_config hijack.ru + rainbows_wait_start +} + +t_begin "check request hijack" && { + test "xrequest.hijacked" = x"$(curl -sSfv http://$listen/hijack_req)" +} + +t_begin "check response hijack" && { + test "xresponse.hijacked" = x"$(curl -sSfv http://$listen/hijack_res)" +} + +t_begin "killing succeeds" && { + kill $rainbows_pid +} + +t_begin "check stderr" && { + check_stderr +} + +t_done diff --git a/t/test_isolate.rb b/t/test_isolate.rb index 6688b72..6b7fe5a 100644 --- a/t/test_isolate.rb +++ b/t/test_isolate.rb @@ -17,10 +17,10 @@ lock = File.open(__FILE__, "rb") lock.flock(File::LOCK_EX) Isolate.now!(opts) do gem 'kgio', '2.8.0' - gem 'rack', '1.5.1' + gem 'rack', '1.5.2' gem 'kcar', '0.4.0' gem 'raindrops', '0.10.0' - gem 'unicorn', '4.5.0' + gem 'unicorn', '4.6.0' if engine == "ruby" gem 'sendfile', '1.1.0'