xepoll_thread_*/client: EPOLLONESHOT implies EPOLLET
[rainbows.git] / lib / rainbows / reverse_proxy.rb
blobc1f1dc22fd5b1fdabba4a72beb2ea6a260749b6a
1 # -*- encoding: binary -*-
2 # :enddoc:
3 require 'socket'
4 require 'thread'
5 require 'uri'
6 require 'kcar' # http://bogomips.org/kcar/ -- gem install kcar
8 # This is lightly tested and has an unstable configuration interface.
9 # ***** Do not rely on anything under the ReverseProxy namespace! *****
11 # A reverse proxy implementation for \Rainbows!  It is a Rack application
12 # compatible and optimized for most \Rainbows! concurrency models.
14 # It makes HTTP/1.0 connections without keepalive to backends, so
15 # it is only recommended for proxying to upstreams on the same LAN
16 # or machine.  It can proxy to TCP hosts as well as UNIX domain sockets.
18 # Currently it only does simple round-robin balancing and does not
19 # know to retry connections from failed backends.
21 # Buffering-behavior is currently dependent on the concurrency model selected:
23 # Fully-buffered (uploads and response bodies):
24 #    Coolio, EventMachine, NeverBlock, CoolioThreadSpawn, CoolioThreadPool
25 # If you're proxying to Unicorn, fully-buffered is the way to go.
27 # Buffered input only (uploads, but not response bodies):
28 #    ThreadSpawn, ThreadPool, FiberSpawn, FiberPool, CoolioFiberSpawn
30 # It is not recommended to use Base, WriterThreadSpawn or WriterThreadPool
31 # to host this application.  However, you may proxy to a backend running
32 # one of these concurrency models with a fully-buffering concurrency model.
34 # See the {example config}[link:examples/reverse_proxy.ru] for a sample
35 # configuration
37 # TODO: Revactor support
38 # TODO: Support HTTP trailers
39 # TODO: optional streaming input for synchronous
40 # TODO: error handling
42 # WARNING! this is only lightly tested and has no automated tests, yet!
43 class Rainbows::ReverseProxy
44   autoload :MultiThread, 'rainbows/reverse_proxy/multi_thread'
45   autoload :Synchronous, 'rainbows/reverse_proxy/synchronous'
46   autoload :Coolio, 'rainbows/reverse_proxy/coolio'
47   autoload :EventMachine, 'rainbows/reverse_proxy/event_machine'
48   autoload :EvClient, 'rainbows/reverse_proxy/ev_client'
50   HTTP_X_FORWARDED_FOR = "HTTP_X_FORWARDED_FOR"
51   REMOTE_ADDR = "REMOTE_ADDR"
52   REQUEST_METHOD = "REQUEST_METHOD"
53   REQUEST_URI = "REQUEST_URI"
54   CRLF = "\r\n"
55   TR = %w(_ -)
56   CONTENT_LENGTH = "CONTENT_LENGTH"
57   HTTP_TRANSFER_ENCODING = "HTTP_TRANSFER_ENCODING"
58   RackInput = "rack.input"
59   E502 = [ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ]
61   def initialize(opts)
62     @lock = Mutex.new
63     upstreams = opts[:upstreams]
64     @upstreams = []
65     upstreams.each do |url|
66       url, cfg = *url if Array === url
67       if url =~ %r{\Ahttp://}
68         uri = URI.parse(url)
69         host = uri.host =~ %r{\A\[([a-fA-F0-9:]+)\]\z} ? $1 : uri.host
70         sockaddr = Socket.sockaddr_in(uri.port, host)
71       else
72         path = url.gsub(%r{\Aunix:}, "") # nginx compat
73         %r{\A~} =~ path and path = File.expand_path(path)
74         sockaddr = Socket.sockaddr_un(path)
75       end
76       ((cfg && cfg[:weight]) || 1).times { @upstreams << sockaddr }
77     end
78     @nr = 0
79   end
81   # detects the concurrency model at first run and replaces itself
82   def call(env)
83     if @lock.try_lock
84       case model = env["rainbows.model"]
85       when :EventMachine, :NeverBlock
86         extend(EventMachine)
87       when :Coolio, :CoolioThreadPool, :CoolioThreadSpawn
88         extend(Coolio)
89       when :RevFiberSpawn, :Rev, :RevThreadPool, :RevThreadSpawn
90         warn "#{model} is not *well* supported with #{self.class}"
91         warn "Switch to #{model.to_s.gsub(/Rev/, 'Coolio')}!"
92         extend(Synchronous)
93       when :Revactor
94         warn "Revactor is not *well* supported with #{self.class} yet"
95         extend(Synchronous)
96       when :FiberSpawn, :FiberPool, :CoolioFiberSpawn
97         extend(Synchronous)
98         Synchronous::UpstreamSocket.
99           __send__(:include, Rainbows::Fiber::IO::Methods)
100       when :WriterThreadSpawn, :WriterThreadPool
101         warn "#{model} is not recommended for use with #{self.class}"
102         extend(Synchronous)
103       else
104         extend(Synchronous)
105       end
106       extend(MultiThread) if env["rack.multithread"]
107       @lock.unlock
108     else
109       @lock.synchronize {} # wait for the first locker to finish
110     end
111     call(env)
112   end
114   # returns request headers for sending to the upstream as a string
115   def build_headers(env, input)
116     remote_addr = env[REMOTE_ADDR]
117     xff = env[HTTP_X_FORWARDED_FOR]
118     xff = xff ? "#{xff},#{remote_addr}" : remote_addr
119     req = "#{env[REQUEST_METHOD]} #{env[REQUEST_URI]} HTTP/1.0\r\n" \
120           "Connection: close\r\n" \
121           "X-Forwarded-For: #{xff}\r\n"
122     uscore, dash = *TR
123     env.each do |key, value|
124       %r{\AHTTP_(\w+)\z} =~ key or next
125       key = $1
126       next if %r{\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR)\z}x =~ key
127       key.tr!(uscore, dash)
128       req << "#{key}: #{value}\r\n"
129     end
130     input and req << (input.respond_to?(:size) ?
131                      "Content-Length: #{input.size}\r\n" :
132                      "Transfer-Encoding: chunked\r\n")
133     req << CRLF
134   end
136   def pick_upstream(env) # +env+ is reserved for future expansion
137     @nr += 1
138     @upstreams[@nr %= @upstreams.size]
139   end
141   def prepare_input!(env)
142     if cl = env[CONTENT_LENGTH]
143       size = cl.to_i
144       size > 0 or return
145     elsif %r{\Achunked\z}i =~ env.delete(HTTP_TRANSFER_ENCODING)
146       # do people use multiple transfer-encodings?
147     else
148       return
149     end
151     input = env[RackInput]
152     if input.respond_to?(:rewind)
153       if input.respond_to?(:size)
154         input.size # TeeInput-specific behavior
155         return input
156       else
157         return SizedInput.new(input, size)
158       end
159     end
160     tmp = size && size < 0x4000 ? StringIO.new("") : Unicorn::TmpIO.new
161     each_block(input) { |x| tmp.syswrite(x) }
162     tmp.rewind
163     tmp
164   end
166   class SizedInput
167     attr_reader :size
169     def initialize(input, n)
170       buf = ""
171       if n == nil
172         n = 0
173         while input.read(16384, buf)
174           n += buf.size
175         end
176         input.rewind
177       end
178       @input, @size = input, n
179     end
181     def read(*args)
182       @input.read(*args)
183     end
184   end
186   class UpstreamSocket < Kgio::Socket
187     alias readpartial kgio_read!
188   end