update test infrastructure to support Rubinius
[rainbows.git] / lib / rainbows / base.rb
blob86ec733b21d8e20c8e5c8dba0f677134f5026ac2
1 # -*- encoding: binary -*-
3 module Rainbows
5   # base class for Rainbows concurrency models, this is currently
6   # used by ThreadSpawn and ThreadPool models
7   module Base
9     include Unicorn
10     include Rainbows::Const
11     G = Rainbows::G
13     def init_worker_process(worker)
14       super(worker)
15       MaxBody.setup
16       G.tmp = worker.tmp
18       # avoid spurious wakeups and blocking-accept() with 1.8 green threads
19       if ! defined?(RUBY_ENGINE) && RUBY_VERSION.to_f < 1.9
20         require "io/nonblock"
21         HttpServer::LISTENERS.each { |l| l.nonblock = true }
22       end
24       # we're don't use the self-pipe mechanism in the Rainbows! worker
25       # since we don't defer reopening logs
26       HttpServer::SELF_PIPE.each { |x| x.close }.clear
27       trap(:USR1) { reopen_worker_logs(worker.nr) }
28       trap(:QUIT) { G.quit! }
29       [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
30       logger.info "Rainbows! #@use worker_connections=#@worker_connections"
31     end
33     if IO.respond_to?(:copy_stream)
34       def write_body(client, body)
35         if body.respond_to?(:to_path)
36           IO.copy_stream(Rainbows.body_to_io(body), client)
37         else
38           body.each { |chunk| client.write(chunk) }
39         end
40         ensure
41           body.respond_to?(:close) and body.close
42       end
43     else
44       def write_body(client, body)
45         body.each { |chunk| client.write(chunk) }
46         ensure
47           body.respond_to?(:close) and body.close
48       end
49     end
51     module_function :write_body
53     # once a client is accepted, it is processed in its entirety here
54     # in 3 easy steps: read request, call app, write app response
55     # this is used by synchronous concurrency models
56     #   Base, ThreadSpawn, ThreadPool
57     def process_client(client)
58       buf = client.readpartial(CHUNK_SIZE) # accept filters protect us here
59       hp = HttpParser.new
60       env = {}
61       alive = true
62       remote_addr = Rainbows.addr(client)
64       begin # loop
65         while ! hp.headers(env, buf)
66           IO.select([client], nil, nil, G.kato) or return
67           buf << client.readpartial(CHUNK_SIZE)
68         end
70         env[CLIENT_IO] = client
71         env[RACK_INPUT] = 0 == hp.content_length ?
72                  HttpRequest::NULL_IO : TeeInput.new(client, env, hp, buf)
73         env[REMOTE_ADDR] = remote_addr
74         status, headers, body = app.call(env.update(RACK_DEFAULTS))
76         if 100 == status.to_i
77           client.write(EXPECT_100_RESPONSE)
78           env.delete(HTTP_EXPECT)
79           status, headers, body = app.call(env)
80         end
82         alive = hp.keepalive? && G.alive
83         if hp.headers?
84           out = [ alive ? CONN_ALIVE : CONN_CLOSE ]
85           client.write(HttpResponse.header_string(status, headers, out))
86         end
87         write_body(client, body)
88       end while alive and hp.reset.nil? and env.clear
89     # if we get any error, try to write something back to the client
90     # assuming we haven't closed the socket, but don't get hung up
91     # if the socket is already closed or broken.  We'll always ensure
92     # the socket is closed at the end of this function
93     rescue => e
94       Error.write(client, e)
95     ensure
96       client.close unless client.closed?
97     end
99     def self.included(klass)
100       klass.const_set :LISTENERS, HttpServer::LISTENERS
101       klass.const_set :G, Rainbows::G
102     end
104   end