doc: rdoc cleanups and fixes
[rainbows.git] / lib / rainbows / reverse_proxy.rb
blobb96fa477635e7416dd5968dced49dffb5f82a498
1 # -*- encoding: binary -*-
2 # :enddoc:
3 require 'socket'
4 require 'thread'
5 require 'uri'
6 require 'kcar' # http://bogomips.org/kcar/ -- gem install kcar
8 # A reverse proxy implementation for \Rainbows!  It is a Rack application
9 # compatible and optimized for most \Rainbows! concurrency models.
11 # It makes HTTP/1.0 connections without keepalive to backends, so
12 # it is only recommended for proxying to upstreams on the same LAN
13 # or machine.  It can proxy to TCP hosts as well as UNIX domain sockets.
15 # Currently it only does simple round-robin balancing and does not
16 # know to retry connections from failed backends.
18 # Buffering-behavior is currently dependent on the concurrency model selected:
20 # Fully-buffered (uploads and response bodies):
21 #    Coolio, EventMachine, NeverBlock, CoolioThreadSpawn, CoolioThreadPool
22 # If you're proxying to Unicorn, fully-buffered is the way to go.
24 # Buffered input only (uploads, but not response bodies):
25 #    ThreadSpawn, ThreadPool, FiberSpawn, FiberPool, CoolioFiberSpawn
27 # It is not recommended to use Base, WriterThreadSpawn or WriterThreadPool
28 # to host this application.  However, you may proxy to a backend running
29 # one of these concurrency models with a fully-buffering concurrency model.
31 # See the {example config}[link:examples/reverse_proxy.ru] for a sample
32 # configuration
34 # TODO: Revactor support
35 # TODO: Support HTTP trailers
36 # TODO: optional streaming input for synchronous
37 # TODO: error handling
39 # WARNING! this is only lightly tested and has no automated tests, yet!
40 class Rainbows::ReverseProxy
41   autoload :MultiThread, 'rainbows/reverse_proxy/multi_thread'
42   autoload :Synchronous, 'rainbows/reverse_proxy/synchronous'
43   autoload :Coolio, 'rainbows/reverse_proxy/coolio'
44   autoload :EventMachine, 'rainbows/reverse_proxy/event_machine'
45   autoload :EvClient, 'rainbows/reverse_proxy/ev_client'
47   HTTP_X_FORWARDED_FOR = "HTTP_X_FORWARDED_FOR"
48   REMOTE_ADDR = "REMOTE_ADDR"
49   REQUEST_METHOD = "REQUEST_METHOD"
50   REQUEST_URI = "REQUEST_URI"
51   CRLF = "\r\n"
52   TR = %w(_ -)
53   CONTENT_LENGTH = "CONTENT_LENGTH"
54   HTTP_TRANSFER_ENCODING = "HTTP_TRANSFER_ENCODING"
55   RackInput = "rack.input"
56   E502 = [ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ]
58   def initialize(opts)
59     @lock = Mutex.new
60     upstreams = opts[:upstreams]
61     @upstreams = []
62     upstreams.each do |url|
63       url, cfg = *url if Array === url
64       if url =~ %r{\Ahttp://}
65         uri = URI.parse(url)
66         host = uri.host =~ %r{\A\[([a-fA-F0-9:]+)\]\z} ? $1 : uri.host
67         sockaddr = Socket.sockaddr_in(uri.port, host)
68       else
69         path = url.gsub(%r{\Aunix:}, "") # nginx compat
70         %r{\A~} =~ path and path = File.expand_path(path)
71         sockaddr = Socket.sockaddr_un(path)
72       end
73       ((cfg && cfg[:weight]) || 1).times { @upstreams << sockaddr }
74     end
75     @nr = 0
76   end
78   # detects the concurrency model at first run and replaces itself
79   def call(env)
80     if @lock.try_lock
81       case model = env["rainbows.model"]
82       when :EventMachine, :NeverBlock
83         extend(EventMachine)
84       when :Coolio, :CoolioThreadPool, :CoolioThreadSpawn
85         extend(Coolio)
86       when :RevFiberSpawn, :Rev, :RevThreadPool, :RevThreadSpawn
87         warn "#{model} is not *well* supported with #{self.class}"
88         warn "Switch to #{model.to_s.gsub(/Rev/, 'Coolio')}!"
89         extend(Synchronous)
90       when :Revactor
91         warn "Revactor is not *well* supported with #{self.class} yet"
92         extend(Synchronous)
93       when :FiberSpawn, :FiberPool, :CoolioFiberSpawn
94         extend(Synchronous)
95         Synchronous::UpstreamSocket.
96           __send__(:include, Rainbows::Fiber::IO::Methods)
97       when :WriterThreadSpawn, :WriterThreadPool
98         warn "#{model} is not recommended for use with #{self.class}"
99         extend(Synchronous)
100       else
101         extend(Synchronous)
102       end
103       extend(MultiThread) if env["rack.multithread"]
104       @lock.unlock
105     else
106       @lock.synchronize {} # wait for the first locker to finish
107     end
108     call(env)
109   end
111   # returns request headers for sending to the upstream as a string
112   def build_headers(env, input)
113     remote_addr = env[REMOTE_ADDR]
114     xff = env[HTTP_X_FORWARDED_FOR]
115     xff = xff ? "#{xff},#{remote_addr}" : remote_addr
116     req = "#{env[REQUEST_METHOD]} #{env[REQUEST_URI]} HTTP/1.0\r\n" \
117           "Connection: close\r\n" \
118           "X-Forwarded-For: #{xff}\r\n"
119     uscore, dash = *TR
120     env.each do |key, value|
121       %r{\AHTTP_(\w+)\z} =~ key or next
122       key = $1
123       next if %r{\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR)\z}x =~ key
124       key.tr!(uscore, dash)
125       req << "#{key}: #{value}\r\n"
126     end
127     input and req << (input.respond_to?(:size) ?
128                      "Content-Length: #{input.size}\r\n" :
129                      "Transfer-Encoding: chunked\r\n")
130     req << CRLF
131   end
133   def pick_upstream(env) # +env+ is reserved for future expansion
134     @nr += 1
135     @upstreams[@nr %= @upstreams.size]
136   end
138   def prepare_input!(env)
139     if cl = env[CONTENT_LENGTH]
140       size = cl.to_i
141       size > 0 or return
142     elsif %r{\Achunked\z}i =~ env.delete(HTTP_TRANSFER_ENCODING)
143       # do people use multiple transfer-encodings?
144     else
145       return
146     end
148     input = env[RackInput]
149     if input.respond_to?(:rewind)
150       if input.respond_to?(:size)
151         input.size # TeeInput-specific behavior
152         return input
153       else
154         return SizedInput.new(input, size)
155       end
156     end
157     tmp = size && size < 0x4000 ? StringIO.new("") : Unicorn::TmpIO.new
158     each_block(input) { |x| tmp.syswrite(x) }
159     tmp.rewind
160     tmp
161   end
163   class SizedInput
164     attr_reader :size
166     def initialize(input, n)
167       buf = ""
168       if n == nil
169         n = 0
170         while input.read(16384, buf)
171           n += buf.size
172         end
173         input.rewind
174       end
175       @input, @size = input, n
176     end
178     def read(*args)
179       @input.read(*args)
180     end
181   end
183   class UpstreamSocket < Kgio::Socket
184     alias readpartial kgio_read!
185   end