1 # -*- encoding: binary -*-
8 # Raindrops::Watcher is a stand-alone Rack application for watching
9 # any number of TCP and UNIX listeners (all of them by default).
11 # It depends on the {Aggregate RubyGem}[http://rubygems.org/gems/aggregate]
13 # In your Rack config.ru:
15 # run Raindrops::Watcher(options = {})
17 # It takes the following options hash:
19 # - :listeners - an array of listener names, (e.g. %w(0.0.0.0:80 /tmp/sock))
20 # - :delay - interval between stats updates in seconds (default: 1)
22 # Raindrops::Watcher is compatible any thread-safe/thread-aware Rack
23 # middleware. It does not work well with multi-process web servers
24 # but can be used to monitor them. It consumes minimal resources
25 # with the default :delay.
31 # Returns an HTML summary listing of all listen interfaces watched on
33 # === GET /active/$LISTENER.txt
35 # Returns a plain text summary + histogram with X-* HTTP headers for
38 # e.g.: curl http://example.com/active/0.0.0.0%3A80.txt
40 # === GET /active/$LISTENER.html
42 # Returns an HTML summary + histogram with X-* HTTP headers for
45 # e.g.: curl http://example.com/active/0.0.0.0%3A80.html
47 # === GET /queued/$LISTENER.txt
49 # Returns a plain text summary + histogram with X-* HTTP headers for
52 # e.g.: curl http://example.com/queued/0.0.0.0%3A80.txt
54 # === GET /queued/$LISTENER.html
56 # Returns an HTML summary + histogram with X-* HTTP headers for
59 # e.g.: curl http://example.com/queued/0.0.0.0%3A80.html
61 # === GET /tail/$LISTENER.txt?active_min=1&queued_min=1
63 # Streams chunked a response to the client.
64 # Interval is the preconfigured +:delay+ of the application (default 1 second)
66 # The response is plain text in the following format:
68 # ISO8601_TIMESTAMP LISTENER_NAME ACTIVE_COUNT QUEUED_COUNT LINEFEED
72 # - active_min - do not stream a line until this active count is reached
73 # - queued_min - do not stream a line until this queued count is reached
75 # == Response headers (mostly the same as Raindrops::LastDataRecv)
77 # - X-Count - number of requests received
78 # - X-Last-Reset - date since the last reset
80 # The following headers are only present if X-Count is greater than one.
82 # - X-Min - lowest last_data_recv time recorded (in milliseconds)
83 # - X-Max - highest last_data_recv time recorded (in milliseconds)
84 # - X-Mean - mean last_data_recv time recorded (rounded, in milliseconds)
85 # - X-Std-Dev - standard deviation of last_data_recv times
86 # - X-Outliers-Low - number of low outliers (hopefully many!)
87 # - X-Outliers-High - number of high outliers (hopefully zero!)
91 # There is a server running this app at http://raindrops-demo.bogomips.org/
92 # The Raindrops::Middleware demo is also accessible at
93 # http://raindrops-demo.bogomips.org/_raindrops
95 # The demo server is only limited to 30 users, so be sure not to abuse it
96 # by using the /tail/ endpoint too much.
97 class Raindrops::Watcher
101 include Raindrops::Linux
102 DOC_URL = "http://raindrops.bogomips.org/Raindrops/Watcher.html"
104 def initialize(opts = {})
105 @tcp_listeners = @unix_listeners = nil
106 if l = opts[:listeners]
108 Array(l).each { |addr| (addr =~ %r{\A/} ? unix : tcp) << addr }
109 unless tcp.empty? && unix.empty?
111 @unix_listeners = unix
115 agg_class = opts[:agg_class] || Aggregate
117 @active = Hash.new { |h,k| h[k] = agg_class.new }
118 @queued = Hash.new { |h,k| h[k] = agg_class.new }
119 @resets = Hash.new { |h,k| h[k] = start }
120 @snapshot = [ start, {} ]
121 @delay = opts[:delay] || 1
124 @cond = ConditionVariable.new
134 @start.synchronize { @thr ||= aggregator_thread(env["rack.logger"]) }
135 case env["REQUEST_METHOD"]
141 Rack::Response.new(["Method Not Allowed"], 405).finish
145 def aggregator_thread(logger) # :nodoc:
146 @socket = sock = Raindrops::InetDiagSocket.new
149 combined = tcp_listener_stats(@tcp_listeners, sock)
150 combined.merge!(unix_listener_stats(@unix_listeners))
152 combined.each do |addr,stats|
153 @active[addr] << stats.active
154 @queued[addr] << stats.queued
156 @snapshot = [ Time.now.utc, combined ]
160 logger.error "#{e.class} #{e.inspect}"
161 end while sleep(@delay) && @socket
168 def active_stats(addr) # :nodoc:
170 tmp = @active[addr] or return
171 [ @resets[addr], tmp.dup ]
175 def queued_stats(addr) # :nodoc:
177 tmp = @queued[addr] or return
178 [ @resets[addr], tmp.dup ]
189 def agg_to_hash(reset_at, agg)
191 "X-Count" => agg.count.to_s,
192 "X-Min" => agg.min.to_s,
193 "X-Max" => agg.max.to_s,
194 "X-Mean" => agg.mean.to_s,
195 "X-Std-Dev" => agg.std_dev.to_s,
196 "X-Outliers-Low" => agg.outliers_low.to_s,
197 "X-Outliers-High" => agg.outliers_high.to_s,
198 "X-Last-Reset" => reset_at.httpdate,
202 def histogram_txt(agg)
204 headers = agg_to_hash(reset_at, agg)
206 headers["Content-Type"] = "text/plain"
207 headers["Content-Length"] = bytesize(body).to_s
208 [ 200, headers, [ body ] ]
211 def histogram_html(agg, addr)
213 headers = agg_to_hash(reset_at, agg)
215 "<head><title>#{hostname} - #{escape_html addr}</title></head>" \
218 "<tr><td>#{k.gsub(/^X-/, '')}</td><td>#{v}</td></tr>"
219 }.join << "</table><pre>#{escape_html agg}</pre>" \
220 "<form action='/reset/#{escape addr}' method='post'>" \
221 "<input type='submit' name='x' value='reset' /></form>" \
223 headers["Content-Type"] = "text/html"
224 headers["Content-Length"] = bytesize(body).to_s
225 [ 200, headers, [ body ] ]
229 case env["PATH_INFO"]
232 when %r{\A/active/(.+)\.txt\z}
233 histogram_txt(active_stats(unescape($1)))
234 when %r{\A/active/(.+)\.html\z}
236 histogram_html(active_stats(addr), addr)
237 when %r{\A/queued/(.+)\.txt\z}
238 histogram_txt(queued_stats(unescape($1)))
239 when %r{\A/queued/(.+)\.html\z}
241 histogram_html(queued_stats(addr), addr)
242 when %r{\A/tail/(.+)\.txt\z}
243 tail(unescape($1), env)
248 raise if defined?(retried)
255 Rack::Response.new(["Not Found"], 404).finish
259 case env["PATH_INFO"]
260 when %r{\A/reset/(.+)\z}
261 reset!(env, unescape($1))
263 Rack::Response.new(["Not Found"], 404).finish
267 def reset!(env, addr)
269 @active.include?(addr) or return not_found
272 @resets[addr] = Time.now.utc
275 req = Rack::Request.new(env)
276 res = Rack::Response.new
277 url = req.referer || "#{req.host_with_port}/"
279 res.content_type.replace "text/plain"
280 res.write "Redirecting to #{url}"
285 updated_at, all = snapshot
287 "Content-Type" => "text/html",
288 "Last-Modified" => updated_at.httpdate,
290 body = "<html><head>" \
291 "<title>#{hostname} - all interfaces</title>" \
292 "</head><body><h3>Updated at #{updated_at.iso8601}</h3>" \
294 "<th>address</th><th>active</th><th>queued</th><th>reset</th>" \
296 all.map do |addr,stats|
299 "<td><a href='/tail/#{e_addr}.txt'>#{escape_html addr}</a></td>" \
300 "<td><a href='/active/#{e_addr}.html'>#{stats.active}</a></td>" \
301 "<td><a href='/queued/#{e_addr}.html'>#{stats.queued}</a></td>" \
302 "<td><form action='/reset/#{e_addr}' method='post'>" \
303 "<input type='submit' name='x' value='x' /></form></td>" \
305 end.join << "</table>" \
307 "This is running the #{self.class}</a> service, see " \
308 "<a href='#{DOC_URL}'>#{DOC_URL}</a> " \
309 "for more information and options." \
312 headers["Content-Length"] = bytesize(body).to_s
313 [ 200, headers, [ body ] ]
317 Tailer.new(self, addr, env).finish
320 # This is the response body returned for "/tail/$ADDRESS.txt". This
321 # must use a multi-threaded Rack server with streaming response support.
322 # It is an internal class and not expected to be used directly
324 def initialize(rdmon, addr, env) # :nodoc:
327 q = Rack::Utils.parse_query env["QUERY_STRING"]
328 @active_min = q["active_min"].to_i
329 @queued_min = q["queued_min"].to_i
330 len = Rack::Utils.bytesize(addr)
332 @fmt = "%20s % #{len}s % 10u % 10u\n"
333 case env["HTTP_VERSION"]
342 headers = { "Content-Type" => "text/plain" }
343 headers["Transfer-Encoding"] = "chunked" if @chunk
344 [ 200, headers, self ]
347 # called by the Rack server
350 time, all = @rdmon.wait_snapshot
351 stats = all[@addr] or next
352 stats.queued >= @queued_min or next
353 stats.active >= @active_min or next
354 body = sprintf(@fmt, time.iso8601, @addr, stats.active, stats.queued)
355 body = "#{body.size.to_s(16)}\r\n#{body}\r\n" if @chunk
358 yield "0\r\n\r\n" if @chunk
362 # shuts down the background thread, only for tests