thread_timeout: rewrite for safety
[rainbows.git] / lib / rainbows / thread_timeout.rb
blob44baf9a80609d8eafd4df17b2c7c1f6709034f0d
1 # -*- encoding: binary -*-
2 require 'thread'
4 # Soft timeout middleware for thread-based concurrency models in \Rainbows!
5 # This timeout only includes application dispatch, and will not take into
6 # account the (rare) response bodies that are dynamically generated while
7 # they are being written out to the client.
9 # In your rackup config file (config.ru), the following line will
10 # cause execution to timeout in 1.5 seconds.
12 #    use Rainbows::ThreadTimeout, :timeout => 1.5
13 #    run MyApplication.new
15 # You may also specify a threshold, so the timeout does not take
16 # effect until there are enough active clients.  It does not make
17 # sense to set a +:threshold+ higher or equal to the
18 # +worker_connections+ \Rainbows! configuration parameter.
19 # You may specify a negative threshold to be an absolute
20 # value relative to the +worker_connections+ parameter, thus
21 # if you specify a threshold of -1, and have 100 worker_connections,
22 # ThreadTimeout will only activate when there are 99 active requests.
24 #    use Rainbows::ThreadTimeout, :timeout => 1.5, :threshold => -1
25 #    run MyApplication.new
27 # This middleware only affects elements below it in the stack, so
28 # it can be configured to ignore certain endpoints or middlewares.
30 # Timed-out requests will cause this middleware to return with a
31 # "408 Request Timeout" response.
33 class Rainbows::ThreadTimeout
35   # :stopdoc:
36   ExecutionExpired = Class.new(Exception)
37   NEVER = Time.at(0x7fffffff) # MRI 1.8 won't be usable in January, 2038
39   def initialize(app, opts)
40     @timeout = opts[:timeout]
41     Numeric === @timeout or
42       raise TypeError, "timeout=#{@timeout.inspect} is not numeric"
44     if @threshold = opts[:threshold]
45       Integer === @threshold or
46         raise TypeError, "threshold=#{@threshold.inspect} is not an integer"
47       @threshold == 0 and raise ArgumentError, "threshold=0 does not make sense"
48       @threshold < 0 and @threshold += Rainbows.server.worker_connections
49     end
50     @app = app
51     @active = {}
52     @lock = Mutex.new
53     @watchdog = nil
54   end
56   def call(env)
57     @lock.lock
58     start_watchdog(env) unless @watchdog
59     @active[Thread.current] = Time.now + @timeout
60     begin
61       @lock.unlock
62       @app.call(env)
63     ensure
64       @lock.synchronize { @active.delete(Thread.current) }
65     end
66     rescue ExecutionExpired
67       [ 408, { 'Content-Type' => 'text/plain', 'Content-Length' => '0' }, [] ]
68   end
70   def start_watchdog(env)
71     @watchdog = Thread.new(env["rack.logger"]) do |logger|
72       begin
73         if @threshold
74           # "active.size" is atomic in MRI 1.8 and 1.9
75           sleep(@timeout) while @active.size < @threshold
76         end
78         next_expiry = NEVER
79         @lock.synchronize do
80           now = Time.now
81           @active.delete_if do |thread, expire_at|
82             if expire_at > now
83               next_expiry = expire_at if next_expiry > expire_at
84               false
85             else
86               thread.raise(ExecutionExpired)
87               true
88             end
89           end
90         end
92         if next_expiry == NEVER
93           sleep(@timeout)
94         else
95           sec = next_expiry - Time.now
96           sec > 0.0 ? sleep(sec) : Thread.pass
97         end
98       rescue => e
99         logger.error e
100       end while true
101     end
102   end
103   # :startdoc: