event_machine: avoid close on deferred response
[rainbows.git] / lib / rainbows / thread_timeout.rb
blob83482729ca8088170ffa5bf78c95bc7ff2a13684
1 # -*- encoding: binary -*-
2 require 'thread'
4 # Soft timeout middleware for thread-based concurrency models in \Rainbows!
5 # This timeout only includes application dispatch, and will not take into
6 # account the (rare) response bodies that are dynamically generated while
7 # they are being written out to the client.
9 # In your rackup config file (config.ru), the following line will
10 # cause execution to timeout in 1.5 seconds.
12 #    use Rainbows::ThreadTimeout, :timeout => 1.5
13 #    run MyApplication.new
15 # You may also specify a threshold, so the timeout does not take
16 # effect until there are enough active clients.  It does not make
17 # sense to set a +:threshold+ higher or equal to the
18 # +worker_connections+ \Rainbows! configuration parameter.
19 # You may specify a negative threshold to be an absolute
20 # value relative to the +worker_connections+ parameter, thus
21 # if you specify a threshold of -1, and have 100 worker_connections,
22 # ThreadTimeout will only activate when there are 99 active requests.
24 #    use Rainbows::ThreadTimeout, :timeout => 1.5, :threshold => -1
25 #    run MyApplication.new
27 # This middleware only affects elements below it in the stack, so
28 # it can be configured to ignore certain endpoints or middlewares.
30 # Timed-out requests will cause this middleware to return with a
31 # "408 Request Timeout" response.
33 # == Caveats
35 # Badly-written C extensions may not be timed out.  Audit and fix
36 # (or remove) those extensions before relying on this module.
38 # Do NOT, under any circumstances nest and load this in
39 # the same middleware stack.  You may load this in parallel in the
40 # same process completely independent middleware stacks, but DO NOT
41 # load this twice so it nests.  Things will break!
43 # This will behave badly if system time is changed since Ruby
44 # does not expose a monotonic clock for users, so don't change
45 # the system time while this is running.  All servers should be
46 # running ntpd anyways.
48 # "ensure" clauses may not fire properly or be interrupted during
49 # execution, so do not mix this module with code which relies on "ensure".
50 # (This is also true for the "Timeout" module in the Ruby standard library)
52 # "recursive locking" ThreadError exceptions may occur if
53 # ThreadTimeout fires while a Mutex is locked (because "ensure"
54 # clauses may not fire properly).
56 class Rainbows::ThreadTimeout
58   # :stopdoc:
59   #
60   # we subclass Exception to get rid of normal StandardError rescues
61   # in app-level code.  timeout.rb does something similar
62   ExecutionExpired = Class.new(Exception)
64   # The MRI 1.8 won't be usable in January 2038, we'll raise this
65   # when we eventually drop support for 1.8 (before 2038, hopefully)
66   NEVER = Time.at(0x7fffffff)
68   def initialize(app, opts)
69     # @timeout must be Numeric since we add this to Time
70     @timeout = opts[:timeout]
71     Numeric === @timeout or
72       raise TypeError, "timeout=#{@timeout.inspect} is not numeric"
74     if @threshold = opts[:threshold]
75       Integer === @threshold or
76         raise TypeError, "threshold=#{@threshold.inspect} is not an integer"
77       @threshold == 0 and raise ArgumentError, "threshold=0 does not make sense"
78       @threshold < 0 and @threshold += Rainbows.server.worker_connections
79     end
80     @app = app
82     # This is the main datastructure for communicating Threads eligible
83     # for expiration to the watchdog thread.  If the eligible thread
84     # completes its job before its expiration time, it will delete itself
85     # @active.  If the watchdog thread notices the thread is timed out,
86     # the watchdog thread will delete the thread from this hash as it
87     # raises the exception.
88     #
89     # key: Thread to be timed out
90     # value: Time of expiration
91     @active = {}
93     # Protects all access to @active.  It is important since it also limits
94     # safe points for asynchronously raising exceptions.
95     @lock = Mutex.new
97     # There is one long-running watchdog thread that watches @active and
98     # kills threads that have been running too long
99     # see start_watchdog
100     @watchdog = nil
101   end
103   # entry point for Rack middleware
104   def call(env)
105     # Once we have this lock, we ensure two things:
106     # 1) there is only one watchdog thread started
107     # 2) we can't be killed once we have this lock, it's unlikely
108     #    to happen unless @timeout is really low and the machine
109     #    is very slow.
110     @lock.lock
112     # we're dead if anything in the next two lines raises, but it's
113     # highly unlikely that they will, and anything such as NoMemoryError
114     # is hopeless and we might as well just die anyways.
115     # initialize guarantees @timeout will be Numeric
116     start_watchdog(env) unless @watchdog
117     @active[Thread.current] = Time.now + @timeout
119     begin
120       # It is important to unlock inside this begin block
121       # Mutex#unlock really can't fail here since we did a successful
122       # Mutex#lock before
123       @lock.unlock
125       # Once the Mutex was unlocked, we're open to Thread#raise from
126       # the watchdog process.  This is the main place we expect to receive
127       # Thread#raise.  @app is of course the next layer of the Rack
128       # application stack
129       @app.call(env)
130     ensure
131       # I's still possible to receive a Thread#raise here from
132       # the watchdog, but that's alright, the "rescue ExecutionExpired"
133       # line will catch that.
134       @lock.synchronize { @active.delete(Thread.current) }
135       # Thread#raise no longer possible here
136     end
137     rescue ExecutionExpired
138       # If we got here, it's because the watchdog thread raised an exception
139       # here to kill us.  The watchdog uses @active.delete_if with a lock,
140       # so we guaranteed it's
141       [ 408, { 'Content-Type' => 'text/plain', 'Content-Length' => '0' }, [] ]
142   end
144   # The watchdog thread is the one that does the job of killing threads
145   # that have expired.
146   def start_watchdog(env)
147     @watchdog = Thread.new(env["rack.logger"]) do |logger|
148       begin
149         if @threshold
150           # Hash#size is atomic in MRI 1.8 and 1.9 and we
151           # expect that from other implementations.
152           #
153           # Even without a memory barrier, sleep(@timeout) vs
154           # sleep(@timeout - time-for-SMP-to-synchronize-a-word)
155           # is too trivial to worry about here.
156           sleep(@timeout) while @active.size < @threshold
157         end
159         next_expiry = NEVER
161         # We always lock access to @active, so we can't kill threads
162         # that are about to release themselves from the eye of the
163         # watchdog thread.
164         @lock.synchronize do
165           now = Time.now
166           @active.delete_if do |thread, expire_at|
167             # We also use this loop to get the maximum possible time to
168             # sleep for if we're not killing the thread.
169             if expire_at > now
170               next_expiry = expire_at if next_expiry > expire_at
171               false
172             else
173               # Terminate execution and delete this from the @active
174               thread.raise(ExecutionExpired)
175               true
176             end
177           end
178         end
180         # We always try to sleep as long as possible to avoid consuming
181         # resources from the app.  So that's the user-configured @timeout
182         # value.
183         if next_expiry == NEVER
184           sleep(@timeout)
185         else
186           # sleep until the next known thread is about to expire.
187           sec = next_expiry - Time.now
188           sec > 0.0 ? sleep(sec) : Thread.pass # give other threads a chance
189         end
190       rescue => e
191         # just in case
192         logger.error e
193       end while true # we run this forever
194     end
195   end
196   # :startdoc: