From 69d8ec2f167eb99a1efec4a770ba5951e7c366f0 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 10 Jan 2011 18:07:21 -0800 Subject: [PATCH] coolio: enable async.callback for one-shot body responses The lack of an equivlent to EM::Deferrable prevents us from doing streaming/trickling responses, but a one-shot body should work fine for Coolio and generating dynamic responses. --- lib/rainbows/coolio/client.rb | 69 +++++++++++++--------- lib/rainbows/coolio/thread_client.rb | 4 +- t/async_chunk_app.ru | 42 +++++++++---- ...async-keepalive.sh => t0402-async-keepalive.sh} | 44 +++++++++++++- 4 files changed, 115 insertions(+), 44 deletions(-) rename t/{t0402-em-async-keepalive.sh => t0402-async-keepalive.sh} (58%) diff --git a/lib/rainbows/coolio/client.rb b/lib/rainbows/coolio/client.rb index d3d696d..6264df7 100644 --- a/lib/rainbows/coolio/client.rb +++ b/lib/rainbows/coolio/client.rb @@ -64,7 +64,7 @@ class Rainbows::Coolio::Client < Coolio::IO def next! attached? or return @deferred = nil - enable_write_watcher + enable_write_watcher # trigger on_write_complete end def timeout? @@ -80,45 +80,59 @@ class Rainbows::Coolio::Client < Coolio::IO @deferred = true end - def coolio_write_response(response, alive) - status, headers, body = response + def write_response_path(status, headers, body, alive) + io = body_to_io(body) + st = io.stat + + if st.file? + defer_file(status, headers, body, alive, io, st) + elsif st.socket? || st.pipe? + chunk = stream_response_headers(status, headers, alive) + stream_response_body(body, io, chunk) + else + # char or block device... WTF? + write_response(status, headers, body, alive) + end + end + def ev_write_response(status, headers, body, alive) if body.respond_to?(:to_path) - io = body_to_io(body) - st = io.stat - - if st.file? - return defer_file(status, headers, body, alive, io, st) - elsif st.socket? || st.pipe? - chunk = stream_response_headers(status, headers, alive) - return stream_response_body(body, io, chunk) - end - # char or block device... WTF? fall through to body.each + write_response_path(status, headers, body, alive) + else + write_response(status, headers, body, alive) end - write_response(status, headers, body, alive) + return quit unless alive && :close != @state + @state = :headers + end + + def coolio_write_async_response(response) + write_async_response(response) + @deferred = nil end def app_call KATO.delete(self) + disable if enabled? @env[RACK_INPUT] = @input @env[REMOTE_ADDR] = @_io.kgio_addr - response = APP.call(@env.merge!(RACK_DEFAULTS)) + @env[ASYNC_CALLBACK] = method(:coolio_write_async_response) + status, headers, body = catch(:async) { + APP.call(@env.merge!(RACK_DEFAULTS)) + } - coolio_write_response(response, alive = @hp.next?) - return quit unless alive && :close != @state - @state = :headers - disable if enabled? + (nil == status || -1 == status) ? @deferred = true : + ev_write_response(status, headers, body, @hp.next?) end def on_write_complete case @deferred - when true then return + when true then return # #next! will clear this bit when nil # fall through else begin return stream_file_chunk(@deferred) rescue EOFError # expected at file EOF - close_deferred + close_deferred # fall through end end @@ -150,13 +164,14 @@ class Rainbows::Coolio::Client < Coolio::IO end def close_deferred - @deferred.respond_to?(:close) or return - begin - @deferred.close - rescue => e - Rainbows.server.logger.error("closing #@deferred: #{e}") + if @deferred + begin + @deferred.close if @deferred.respond_to?(:close) + rescue => e + Rainbows.server.logger.error("closing #@deferred: #{e}") + end + @deferred = nil end - @deferred = nil end def on_close diff --git a/lib/rainbows/coolio/thread_client.rb b/lib/rainbows/coolio/thread_client.rb index 6cd77b9..b837115 100644 --- a/lib/rainbows/coolio/thread_client.rb +++ b/lib/rainbows/coolio/thread_client.rb @@ -14,9 +14,7 @@ class Rainbows::Coolio::ThreadClient < Rainbows::Coolio::Client # this is only called in the master thread def response_write(response) - coolio_write_response(response, alive = @hp.next?) - return quit unless alive && :close != @state - @state = :headers + ev_write_response(*response, @hp.next?) rescue => e handle_error(e) end diff --git a/t/async_chunk_app.ru b/t/async_chunk_app.ru index 26b9915..007d7b2 100644 --- a/t/async_chunk_app.ru +++ b/t/async_chunk_app.ru @@ -17,27 +17,45 @@ class DeferrableChunkBody def finish @body_callback.call("0\r\n\r\n") end -end +end if defined?(EventMachine) class AsyncChunkApp def call(env) - body = DeferrableChunkBody.new - body.callback { body.finish } headers = { 'Content-Type' => 'text/plain', 'Transfer-Encoding' => 'chunked', } - EM.next_tick { - env['async.callback'].call([ 200, headers, body ]) - } - EM.add_timer(1) { - body.call "Hello " + delay = env["HTTP_X_DELAY"].to_i + + case env["rainbows.model"] + when :EventMachine, :NeverBlock + body = DeferrableChunkBody.new + body.callback { body.finish } + task = lambda { + env['async.callback'].call([ 200, headers, body ]) + EM.add_timer(1) { + body.call "Hello " - EM.add_timer(1) { - body.call "World #{env['PATH_INFO']}\n" - body.succeed + EM.add_timer(1) { + body.call "World #{env['PATH_INFO']}\n" + body.succeed + } + } } - } + delay == 0 ? EM.next_tick(&task) : EM.add_timer(delay, &task) + when :Coolio + # Cool.io only does one-shot responses due to the lack of the + # equivalent of EM::Deferrables + body = [ "Hello ", "World #{env['PATH_INFO']}\n", '' ].map do |chunk| + "#{chunk.size.to_s(16)}\r\n#{chunk}\r\n" + end + + next_tick = Coolio::TimerWatcher.new(delay, false) + next_tick.on_timer { env['async.callback'].call([ 200, headers, body ]) } + next_tick.attach(Coolio::Loop.default) + else + raise "Not supported: #{env['rainbows.model']}" + end nil end end diff --git a/t/t0402-em-async-keepalive.sh b/t/t0402-async-keepalive.sh similarity index 58% rename from t/t0402-em-async-keepalive.sh rename to t/t0402-async-keepalive.sh index 24eb678..fdf4cbc 100644 --- a/t/t0402-em-async-keepalive.sh +++ b/t/t0402-async-keepalive.sh @@ -1,14 +1,15 @@ #!/bin/sh +DELAY=${DELAY-1} . ./test-lib.sh case $model in -NeverBlock|EventMachine) ;; +Coolio|NeverBlock|EventMachine) ;; *) t_info "skipping $T since it's not compatible with $model" exit 0 ;; esac -t_plan 9 "async_chunk_app test for test for EM" +t_plan 11 "async_chunk_app test for test for $model" CONFIG_RU=async_chunk_app.ru @@ -39,6 +40,31 @@ t_begin "async.callback supports pipelining" && { elapsed=$(( $t1 - $t0 )) t_info "elapsed=$elapsed $model.$0 ($t_current)" test 3 -eq "$(fgrep 'HTTP/1.1 200 OK' $tmp | wc -l)" + test 3 -eq "$(grep '^Hello ' $tmp | wc -l)" + test 3 -eq "$(grep 'World ' $tmp | wc -l)" +} + +t_begin "async.callback supports pipelining with delay $DELAY" && { + rm -f $tmp + t0=$(date +%s) + ( + cat $fifo > $tmp & + printf 'GET /0 HTTP/1.1\r\nX-Delay: %d\r\n' $DELAY + printf 'Host: example.com\r\n\r\n' + printf 'GET /1 HTTP/1.1\r\nX-Delay: %d\r\n' $DELAY + printf 'Host: example.com\r\n\r\n' + printf 'GET /2 HTTP/1.0\r\nX-Delay: %d\r\n' $DELAY + printf 'Host: example.com\r\n\r\n' + wait + ) | socat - TCP:$listen > $fifo + t1=$(date +%s) + elapsed=$(( $t1 - $t0 )) + min=$(( $DELAY * 3 )) + t_info "elapsed=$elapsed $model.$0 ($t_current) min=$min" + test $elapsed -ge $min + test 3 -eq "$(fgrep 'HTTP/1.1 200 OK' $tmp | wc -l)" + test 3 -eq "$(grep '^Hello ' $tmp | wc -l)" + test 3 -eq "$(grep 'World ' $tmp | wc -l)" } t_begin "async.callback supports keepalive" && { @@ -52,6 +78,20 @@ t_begin "async.callback supports keepalive" && { rm -f $curl_err } +t_begin "async.callback supports keepalive with delay $DELAY" && { + t0=$(date +%s) + curl -v --no-buffer -sSf -H "X-Delay: $DELAY" \ + http://$listen/[0-2] > $tmp 2>> $curl_err + t1=$(date +%s) + elapsed=$(( $t1 - $t0 )) + min=$(( $DELAY * 3 )) + t_info "elapsed=$elapsed $model.$0 ($t_current) min=$min" + test $elapsed -ge $min + cmp $expect $tmp + test 2 -eq "$(fgrep 'Re-using existing connection!' $curl_err |wc -l)" + rm -f $curl_err +} + t_begin "send async requests off in parallel" && { t0=$(date +%s) curl --no-buffer -sSf http://$listen/[0-2] > $a 2>> $curl_err & -- 2.11.4.GIT