Rainbows! 5.2.1
[rainbows.git] / lib / rainbows / reverse_proxy.rb
blobf844c1d2152d9f2305b679819ca8c9cfddeaa709
1 # -*- encoding: binary -*-
2 # :enddoc:
3 require 'socket'
4 require 'thread'
5 require 'uri'
6 require 'kcar' # https://yhbt.net/kcar/ -- gem install kcar
8 # This is lightly tested and has an unstable configuration interface.
9 # ***** Do not rely on anything under the ReverseProxy namespace! *****
11 # A reverse proxy implementation for \Rainbows!  It is a Rack application
12 # compatible and optimized for most \Rainbows! concurrency models.
14 # It makes HTTP/1.0 connections without keepalive to backends, so
15 # it is only recommended for proxying to upstreams on the same LAN
16 # or machine.  It can proxy to TCP hosts as well as UNIX domain sockets.
18 # Currently it only does simple round-robin balancing and does not
19 # know to retry connections from failed backends.
21 # Buffering-behavior is currently dependent on the concurrency model selected:
23 # Fully-buffered (uploads and response bodies):
24 #    Coolio, EventMachine, NeverBlock, CoolioThreadSpawn, CoolioThreadPool
25 # If you're proxying to Unicorn, fully-buffered is the way to go.
27 # Buffered input only (uploads, but not response bodies):
28 #    ThreadSpawn, ThreadPool, FiberSpawn, FiberPool, CoolioFiberSpawn
30 # It is not recommended to use Base, WriterThreadSpawn or WriterThreadPool
31 # to host this application.  However, you may proxy to a backend running
32 # one of these concurrency models with a fully-buffering concurrency model.
34 # See the {example config}[link:examples/reverse_proxy.ru] for a sample
35 # configuration
37 # TODO: Revactor support
38 # TODO: Support HTTP trailers
39 # TODO: optional streaming input for synchronous
40 # TODO: error handling
42 # WARNING! this is only lightly tested and has no automated tests, yet!
43 class Rainbows::ReverseProxy
44   autoload :MultiThread, 'rainbows/reverse_proxy/multi_thread'
45   autoload :Synchronous, 'rainbows/reverse_proxy/synchronous'
46   autoload :Coolio, 'rainbows/reverse_proxy/coolio'
47   autoload :EventMachine, 'rainbows/reverse_proxy/event_machine'
48   autoload :EvClient, 'rainbows/reverse_proxy/ev_client'
50   E502 = [ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ]
52   def initialize(opts)
53     @lock = Mutex.new
54     upstreams = opts[:upstreams]
55     @upstreams = []
56     upstreams.each do |url|
57       url, cfg = *url if Array === url
58       if url =~ %r{\Ahttp://}
59         uri = URI.parse(url)
60         host = uri.host =~ %r{\A\[([a-fA-F0-9:]+)\]\z} ? $1 : uri.host
61         sockaddr = Socket.sockaddr_in(uri.port, host)
62       else
63         path = url.gsub(%r{\Aunix:}, "") # nginx compat
64         %r{\A~} =~ path and path = File.expand_path(path)
65         sockaddr = Socket.sockaddr_un(path)
66       end
67       ((cfg && cfg[:weight]) || 1).times { @upstreams << sockaddr }
68     end
69     @nr = 0
70   end
72   # detects the concurrency model at first run and replaces itself
73   def call(env)
74     if @lock.try_lock
75       case model = env["rainbows.model"]
76       when :EventMachine, :NeverBlock
77         extend(EventMachine)
78       when :Coolio, :CoolioThreadPool, :CoolioThreadSpawn
79         extend(Coolio)
80       when :RevFiberSpawn, :Rev, :RevThreadPool, :RevThreadSpawn
81         warn "#{model} is not *well* supported with #{self.class}"
82         warn "Switch to #{model.to_s.gsub(/Rev/, 'Coolio')}!"
83         extend(Synchronous)
84       when :Revactor
85         warn "Revactor is not *well* supported with #{self.class} yet"
86         extend(Synchronous)
87       when :FiberSpawn, :FiberPool, :CoolioFiberSpawn
88         extend(Synchronous)
89         Synchronous::UpstreamSocket.
90           __send__(:include, Rainbows::Fiber::IO::Methods)
91       when :WriterThreadSpawn, :WriterThreadPool
92         warn "#{model} is not recommended for use with #{self.class}"
93         extend(Synchronous)
94       else
95         extend(Synchronous)
96       end
97       extend(MultiThread) if env["rack.multithread"]
98       @lock.unlock
99     else
100       @lock.synchronize {} # wait for the first locker to finish
101     end
102     call(env)
103   end
105   # returns request headers for sending to the upstream as a string
106   def build_headers(env, input)
107     remote_addr = env['REMOTE_ADDR']
108     xff = env['HTTP_X_FORWARDED_FOR']
109     xff = xff ? "#{xff},#{remote_addr}" : remote_addr
110     req = "#{env['REQUEST_METHOD']} #{env['REQUEST_URI']} HTTP/1.0\r\n" \
111           "Connection: close\r\n" \
112           "X-Forwarded-For: #{xff}\r\n"
113     env.each do |key, value|
114       %r{\AHTTP_(\w+)\z} =~ key or next
115       key = $1
116       next if %r{\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR)\z}x =~ key
117       key.tr!('_'.freeze, '-'.freeze)
118       req << "#{key}: #{value}\r\n"
119     end
120     input and req << (input.respond_to?(:size) ?
121                      "Content-Length: #{input.size}\r\n" :
122                      "Transfer-Encoding: chunked\r\n".freeze)
123     req << "\r\n".freeze
124   end
126   def pick_upstream(env) # +env+ is reserved for future expansion
127     @nr += 1
128     @upstreams[@nr %= @upstreams.size]
129   end
131   def prepare_input!(env)
132     if cl = env['CONTENT_LENGTH']
133       size = cl.to_i
134       size > 0 or return
135     elsif %r{\Achunked\z}i =~ env.delete('HTTP_TRANSFER_ENCODING')
136       # do people use multiple transfer-encodings?
137     else
138       return
139     end
141     input = env['rack.input']
142     if input.respond_to?(:rewind)
143       if input.respond_to?(:size)
144         input.size # TeeInput-specific behavior
145         return input
146       else
147         return SizedInput.new(input, size)
148       end
149     end
150     tmp = size && size < 0x4000 ? StringIO.new("") : Unicorn::TmpIO.new
151     each_block(input) { |x| tmp.syswrite(x) }
152     tmp.rewind
153     tmp
154   end
156   class SizedInput
157     attr_reader :size
159     def initialize(input, n)
160       buf = ""
161       if n == nil
162         n = 0
163         while input.read(16384, buf)
164           n += buf.size
165         end
166         input.rewind
167       end
168       @input, @size = input, n
169     end
171     def read(*args)
172       @input.read(*args)
173     end
174   end
176   class UpstreamSocket < Kgio::Socket
177     alias readpartial kgio_read!
178   end