reverse_proxy: properly read IPv6 addreses in upstreams
[rainbows.git] / lib / rainbows / reverse_proxy.rb
blob93ad9382f44e4d22004574b648bb98b4500bec16
1 # -*- encoding: binary -*-
2 require 'socket'
3 require 'thread'
4 require 'uri'
5 require 'kcar' # http://bogomips.org/kcar/ -- gem install kcar
7 # A reverse proxy implementation for \Rainbows!  It is a Rack application
8 # compatible and optimized for most \Rainbows! concurrency models.
10 # It makes HTTP/1.0 connections without keepalive to backends, so
11 # it is only recommended for proxying to upstreams on the same LAN
12 # or machine.  It can proxy to TCP hosts as well as UNIX domain sockets.
14 # Currently it only does simple round-robin balancing and does not
15 # know to retry connections from failed backends.
17 # Buffering-behavior is currently dependent on the concurrency model selected:
19 # Fully-buffered (uploads and response bodies):
20 #    Coolio, EventMachine, NeverBlock, CoolioThreadSpawn, CoolioThreadPool
21 # If you're proxying to Unicorn, fully-buffered is the way to go.
23 # Buffered input only (uploads, but not response bodies):
24 #    ThreadSpawn, ThreadPool, FiberSpawn, FiberPool, CoolioFiberSpawn
26 # It is not recommended to use Base, WriterThreadSpawn or WriterThreadPool
27 # to host this application.  However, you may proxy to a backend running
28 # one of these concurrency models with a fully-buffering concurrency model.
30 # See the {example config}[link:examples/reverse_proxy.ru] for a sample
31 # configuration
33 # TODO: Revactor support
34 # TODO: Support HTTP trailers
35 # TODO: optional streaming input for synchronous
36 # TODO: error handling
38 # WARNING! this is only lightly tested and has no automated tests, yet!
39 class Rainbows::ReverseProxy
40   autoload :MultiThread, 'rainbows/reverse_proxy/multi_thread'
41   autoload :Synchronous, 'rainbows/reverse_proxy/synchronous'
42   autoload :Coolio, 'rainbows/reverse_proxy/coolio'
43   autoload :EventMachine, 'rainbows/reverse_proxy/event_machine'
44   autoload :EvClient, 'rainbows/reverse_proxy/ev_client'
46   HTTP_X_FORWARDED_FOR = "HTTP_X_FORWARDED_FOR"
47   REMOTE_ADDR = "REMOTE_ADDR"
48   REQUEST_METHOD = "REQUEST_METHOD"
49   REQUEST_URI = "REQUEST_URI"
50   CRLF = "\r\n"
51   TR = %w(_ -)
52   E502 = [ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ]
54   def initialize(opts)
55     @lock = Mutex.new
56     upstreams = opts[:upstreams]
57     @upstreams = []
58     upstreams.each do |url|
59       url, cfg = *url if Array === url
60       if url =~ %r{\Ahttp://}
61         uri = URI.parse(url)
62         host = uri.host =~ %r{\A\[([a-fA-F0-9:]+)\]\z} ? $1 : uri.host
63         sockaddr = Socket.sockaddr_in(uri.port, host)
64       else
65         path = url.gsub(%r{\Aunix:}, "") # nginx compat
66         %r{\A~} =~ path and path = File.expand_path(path)
67         sockaddr = Socket.sockaddr_un(path)
68       end
69       ((cfg && cfg[:weight]) || 1).times { @upstreams << sockaddr }
70     end
71     @nr = 0
72   end
74   # detects the concurrency model at first run and replaces itself
75   def call(env)
76     if @lock.try_lock
77       case model = env["rainbows.model"]
78       when :EventMachine, :NeverBlock
79         extend(EventMachine)
80       when :Coolio, :CoolioThreadPool, :CoolioThreadSpawn
81         extend(Coolio)
82       when :RevFiberSpawn, :Rev, :RevThreadPool, :RevThreadSpawn
83         warn "#{model} is not *well* supported with #{self.class}"
84         warn "Switch to #{model.to_s.gsub(/Rev/, 'Coolio')}!"
85         extend(Synchronous)
86       when :Revactor
87         warn "Revactor is not *well* supported with #{self.class} yet"
88         extend(Synchronous)
89       when :FiberSpawn, :FiberPool, :CoolioFiberSpawn
90         extend(Synchronous)
91         Synchronous::UpstreamSocket.
92           __send__(:include, Rainbows::Fiber::IO::Methods)
93       when :WriterThreadSpawn, :WriterThreadPool
94         warn "#{model} is not recommended for use with #{self.class}"
95         extend(Synchronous)
96       else
97         extend(Synchronous)
98       end
99       extend(MultiThread) if env["rack.multithread"]
100       @lock.unlock
101     else
102       @lock.synchronize {} # wait for the first locker to finish
103     end
104     call(env)
105   end
107   # returns request headers for sending to the upstream as a string
108   def build_headers(env, input)
109     remote_addr = env[REMOTE_ADDR]
110     xff = env[HTTP_X_FORWARDED_FOR]
111     xff = xff ? "#{xff},#{remote_addr}" : remote_addr
112     req = "#{env[REQUEST_METHOD]} #{env[REQUEST_URI]} HTTP/1.0\r\n" \
113           "Connection: close\r\n" \
114           "X-Forwarded-For: #{xff}\r\n"
115     uscore, dash = *TR
116     env.each do |key, value|
117       %r{\AHTTP_(\w+)\z} =~ key or next
118       key = $1
119       next if %r{\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR)\z}x =~ key
120       key.tr!(uscore, dash)
121       req << "#{key}: #{value}\r\n"
122     end
123     input and req << (input.respond_to?(:size) ?
124                      "Content-Length: #{input.size}\r\n" :
125                      "Transfer-Encoding: chunked\r\n")
126     req << CRLF
127   end
129   def pick_upstream(env) # +env+ is reserved for future expansion
130     @nr += 1
131     @upstreams[@nr %= @upstreams.size]
132   end
134   CONTENT_LENGTH = "CONTENT_LENGTH"
135   HTTP_TRANSFER_ENCODING = "HTTP_TRANSFER_ENCODING"
136   RackInput = "rack.input"
138   def prepare_input!(env)
139     if cl = env[CONTENT_LENGTH]
140       size = cl.to_i
141       size > 0 or return
142     elsif %r{\Achunked\z}i =~ env.delete(HTTP_TRANSFER_ENCODING)
143       # do people use multiple transfer-encodings?
144     else
145       return
146     end
148     input = env[RackInput]
149     if input.respond_to?(:rewind)
150       if input.respond_to?(:size)
151         input.size # TeeInput-specific behavior
152         return input
153       else
154         return SizedInput.new(input, size)
155       end
156     end
157     tmp = size && size < 0x4000 ? StringIO.new("") : Unicorn::TmpIO.new
158     each_block(input) { |x| tmp.syswrite(x) }
159     tmp.rewind
160     tmp
161   end
163   class SizedInput
164     attr_reader :size
166     def initialize(input, n)
167       buf = ""
168       if n == nil
169         n = 0
170         while input.read(16384, buf)
171           n += buf.size
172         end
173         input.rewind
174       end
175       @input, @size = input, n
176     end
178     def read(*args)
179       @input.read(*args)
180     end
181   end
183   class UpstreamSocket < Kgio::Socket
184     alias readpartial kgio_read!
185   end