Watcher: Use relative paths in HTML links
[raindrops.git] / lib / raindrops / watcher.rb
blobb7199a1a17ac759dcbf39ce1decc139125d4c528
1 # -*- encoding: binary -*-
2 require "thread"
3 require "time"
4 require "socket"
5 require "rack"
6 require "aggregate"
8 # Raindrops::Watcher is a stand-alone Rack application for watching
9 # any number of TCP and UNIX listeners (all of them by default).
11 # It depends on the {Aggregate RubyGem}[http://rubygems.org/gems/aggregate]
13 # In your Rack config.ru:
15 #    run Raindrops::Watcher(options = {})
17 # It takes the following options hash:
19 # - :listeners - an array of listener names, (e.g. %w(0.0.0.0:80 /tmp/sock))
20 # - :delay - interval between stats updates in seconds (default: 1)
22 # Raindrops::Watcher is compatible any thread-safe/thread-aware Rack
23 # middleware.  It does not work well with multi-process web servers
24 # but can be used to monitor them.  It consumes minimal resources
25 # with the default :delay.
27 # == HTTP endpoints
29 # === GET /
31 # Returns an HTML summary listing of all listen interfaces watched on
33 # === GET /active/$LISTENER.txt
35 # Returns a plain text summary + histogram with X-* HTTP headers for
36 # active connections.
38 # e.g.: curl http://raindrops-demo.bogomips.org/active/0.0.0.0%3A80.txt
40 # === GET /active/$LISTENER.html
42 # Returns an HTML summary + histogram with X-* HTTP headers for
43 # active connections.
45 # e.g.: curl http://raindrops-demo.bogomips.org/active/0.0.0.0%3A80.html
47 # === GET /queued/$LISTENER.txt
49 # Returns a plain text summary + histogram with X-* HTTP headers for
50 # queued connections.
52 # e.g.: curl http://raindrops-demo.bogomips.org/queued/0.0.0.0%3A80.txt
54 # === GET /queued/$LISTENER.html
56 # Returns an HTML summary + histogram with X-* HTTP headers for
57 # queued connections.
59 # e.g.: curl http://raindrops-demo.bogomips.org/queued/0.0.0.0%3A80.html
61 # === POST /reset/$LISTENER
63 # Resets the active and queued statistics for the given listener.
65 # === GET /tail/$LISTENER.txt?active_min=1&queued_min=1
67 # Streams chunked a response to the client.
68 # Interval is the preconfigured +:delay+ of the application (default 1 second)
70 # The response is plain text in the following format:
72 #   ISO8601_TIMESTAMP LISTENER_NAME ACTIVE_COUNT QUEUED_COUNT LINEFEED
74 # Query parameters:
76 # - active_min - do not stream a line until this active count is reached
77 # - queued_min - do not stream a line until this queued count is reached
79 # == Response headers (mostly the same names as Raindrops::LastDataRecv)
81 # - X-Count   - number of samples polled
82 # - X-Last-Reset - date since the last reset
84 # The following headers are only present if X-Count is greater than one.
86 # - X-Min     - lowest number of connections recorded
87 # - X-Max     - highest number of connections recorded
88 # - X-Mean    - mean number of connections recorded
89 # - X-Std-Dev - standard deviation of connection count
90 # - X-Outliers-Low - number of low outliers (hopefully many for queued)
91 # - X-Outliers-High - number of high outliers (hopefully zero for queued)
92 # - X-Current - current number of connections
93 # - X-First-Peak-At - date of when X-Max was first reached
94 # - X-Last-Peak-At - date of when X-Max was last reached
96 # = Demo Server
98 # There is a server running this app at http://raindrops-demo.bogomips.org/
99 # The Raindrops::Middleware demo is also accessible at
100 # http://raindrops-demo.bogomips.org/_raindrops
102 # The demo server is only limited to 30 users, so be sure not to abuse it
103 # by using the /tail/ endpoint too much.
104 class Raindrops::Watcher
105   # :stopdoc:
106   attr_reader :snapshot
107   include Rack::Utils
108   include Raindrops::Linux
109   DOC_URL = "http://raindrops.bogomips.org/Raindrops/Watcher.html"
110   Peak = Struct.new(:first, :last)
112   def initialize(opts = {})
113     @tcp_listeners = @unix_listeners = nil
114     if l = opts[:listeners]
115       tcp, unix = [], []
116       Array(l).each { |addr| (addr =~ %r{\A/} ? unix : tcp) << addr }
117       unless tcp.empty? && unix.empty?
118         @tcp_listeners = tcp
119         @unix_listeners = unix
120       end
121     end
123     @agg_class = opts[:agg_class] || Aggregate
124     @start_time = Time.now.utc
125     @active = Hash.new { |h,k| h[k] = @agg_class.new }
126     @queued = Hash.new { |h,k| h[k] = @agg_class.new }
127     @resets = Hash.new { |h,k| h[k] = @start_time }
128     @peak_active = Hash.new { |h,k| h[k] = Peak.new(@start_time, @start_time) }
129     @peak_queued = Hash.new { |h,k| h[k] = Peak.new(@start_time, @start_time) }
130     @snapshot = [ @start_time, {} ]
131     @delay = opts[:delay] || 1
132     @lock = Mutex.new
133     @start = Mutex.new
134     @cond = ConditionVariable.new
135     @thr = nil
136   end
138   def hostname
139     Socket.gethostname
140   end
142   # rack endpoint
143   def call(env)
144     @start.synchronize { @thr ||= aggregator_thread(env["rack.logger"]) }
145     case env["REQUEST_METHOD"]
146     when "GET"
147       get env
148     when "HEAD"
149       r = get(env)
150       r[2] = []
151       r
152     when "POST"
153       post env
154     else
155       Rack::Response.new(["Method Not Allowed"], 405).finish
156     end
157   end
159   def aggregate!(agg_hash, peak_hash, addr, number, now)
160     agg = agg_hash[addr]
161     if (max = agg.max) && number > 0 && number >= max
162       peak = peak_hash[addr]
163       peak.first = now if number > max
164       peak.last = now
165     end
166     agg << number
167   end
169   def aggregator_thread(logger) # :nodoc:
170     @socket = sock = Raindrops::InetDiagSocket.new
171     thr = Thread.new do
172       begin
173         combined = tcp_listener_stats(@tcp_listeners, sock)
174         combined.merge!(unix_listener_stats(@unix_listeners))
175         @lock.synchronize do
176           now = Time.now.utc
177           combined.each do |addr,stats|
178             aggregate!(@active, @peak_active, addr, stats.active, now)
179             aggregate!(@queued, @peak_queued, addr, stats.queued, now)
180           end
181           @snapshot = [ now, combined ]
182           @cond.broadcast
183         end
184       rescue => e
185         logger.error "#{e.class} #{e.inspect}"
186       end while sleep(@delay) && @socket
187       sock.close
188     end
189     wait_snapshot
190     thr
191   end
193   def non_existent_stats(time)
194     [ time, @start_time, @agg_class.new, 0, Peak.new(@start_time, @start_time) ]
195   end
197   def active_stats(addr) # :nodoc:
198     @lock.synchronize do
199       time, combined = @snapshot
200       stats = combined[addr] or return non_existent_stats(time)
201       tmp, peak = @active[addr], @peak_active[addr]
202       [ time, @resets[addr], tmp.dup, stats.active, peak ]
203     end
204   end
206   def queued_stats(addr) # :nodoc:
207     @lock.synchronize do
208       time, combined = @snapshot
209       stats = combined[addr] or return non_existent_stats(time)
210       tmp, peak = @queued[addr], @peak_queued[addr]
211       [ time, @resets[addr], tmp.dup, stats.queued, peak ]
212     end
213   end
215   def wait_snapshot
216     @lock.synchronize do
217       @cond.wait @lock
218       @snapshot
219     end
220   end
222   def std_dev(agg)
223     agg.std_dev.to_s
224   rescue Errno::EDOM
225     "NaN"
226   end
228   def agg_to_hash(reset_at, agg, current, peak)
229     {
230       "X-Count" => agg.count.to_s,
231       "X-Min" => agg.min.to_s,
232       "X-Max" => agg.max.to_s,
233       "X-Mean" => agg.mean.to_s,
234       "X-Std-Dev" => std_dev(agg),
235       "X-Outliers-Low" => agg.outliers_low.to_s,
236       "X-Outliers-High" => agg.outliers_high.to_s,
237       "X-Last-Reset" => reset_at.httpdate,
238       "X-Current" => current.to_s,
239       "X-First-Peak-At" => peak.first.httpdate,
240       "X-Last-Peak-At" => peak.last.httpdate,
241     }
242   end
244   def histogram_txt(agg)
245     updated_at, reset_at, agg, current, peak = *agg
246     headers = agg_to_hash(reset_at, agg, current, peak)
247     body = agg.to_s
248     headers["Content-Type"] = "text/plain"
249     headers["Expires"] = (updated_at + @delay).httpdate
250     headers["Content-Length"] = bytesize(body).to_s
251     [ 200, headers, [ body ] ]
252   end
254   def histogram_html(agg, addr)
255     updated_at, reset_at, agg, current, peak = *agg
256     headers = agg_to_hash(reset_at, agg, current, peak)
257     body = "<html>" \
258       "<head><title>#{hostname} - #{escape_html addr}</title></head>" \
259       "<body><table>" <<
260       headers.map { |k,v|
261         "<tr><td>#{k.gsub(/^X-/, '')}</td><td>#{v}</td></tr>"
262       }.join << "</table><pre>#{escape_html agg}</pre>" \
263       "<form action='../reset/#{escape addr}' method='post'>" \
264       "<input type='submit' name='x' value='reset' /></form>" \
265       "</body>"
266     headers["Content-Type"] = "text/html"
267     headers["Expires"] = (updated_at + @delay).httpdate
268     headers["Content-Length"] = bytesize(body).to_s
269     [ 200, headers, [ body ] ]
270   end
272   def get(env)
273     retried = false
274     begin
275       case env["PATH_INFO"]
276       when "/"
277         index
278       when %r{\A/active/(.+)\.txt\z}
279         histogram_txt(active_stats(unescape($1)))
280       when %r{\A/active/(.+)\.html\z}
281         addr = unescape $1
282         histogram_html(active_stats(addr), addr)
283       when %r{\A/queued/(.+)\.txt\z}
284         histogram_txt(queued_stats(unescape($1)))
285       when %r{\A/queued/(.+)\.html\z}
286         addr = unescape $1
287         histogram_html(queued_stats(addr), addr)
288       when %r{\A/tail/(.+)\.txt\z}
289         tail(unescape($1), env)
290       else
291         not_found
292       end
293     rescue Errno::EDOM
294       raise if retried
295       retried = true
296       wait_snapshot
297       retry
298     end
299   end
301   def not_found
302     Rack::Response.new(["Not Found"], 404).finish
303   end
305   def post(env)
306     case env["PATH_INFO"]
307     when %r{\A/reset/(.+)\z}
308       reset!(env, unescape($1))
309     else
310       not_found
311     end
312   end
314   def reset!(env, addr)
315     @lock.synchronize do
316       @active.include?(addr) or return not_found
317       @active.delete addr
318       @queued.delete addr
319       @resets[addr] = Time.now.utc
320       @cond.wait @lock
321     end
322     req = Rack::Request.new(env)
323     res = Rack::Response.new
324     url = req.referer || "#{req.host_with_port}/"
325     res.redirect(url)
326     res.content_type.replace "text/plain"
327     res.write "Redirecting to #{url}"
328     res.finish
329   end
331   def index
332     updated_at, all = snapshot
333     headers = {
334       "Content-Type" => "text/html",
335       "Last-Modified" => updated_at.httpdate,
336       "Expires" => (updated_at + @delay).httpdate,
337     }
338     body = "<html><head>" \
339       "<title>#{hostname} - all interfaces</title>" \
340       "</head><body><h3>Updated at #{updated_at.iso8601}</h3>" \
341       "<table><tr>" \
342         "<th>address</th><th>active</th><th>queued</th><th>reset</th>" \
343       "</tr>" <<
344       all.sort do |a,b|
345         a[0] <=> b[0] # sort by addr
346       end.map do |addr,stats|
347         e_addr = escape addr
348         "<tr>" \
349           "<td><a href='tail/#{e_addr}.txt' " \
350             "title='&quot;tail&quot; output in real time'" \
351             ">#{escape_html addr}</a></td>" \
352           "<td><a href='active/#{e_addr}.html' " \
353             "title='show active connection stats'>#{stats.active}</a></td>" \
354           "<td><a href='queued/#{e_addr}.html' " \
355             "title='show queued connection stats'>#{stats.queued}</a></td>" \
356           "<td><form action='reset/#{e_addr}' method='post'>" \
357             "<input title='reset statistics' " \
358               "type='submit' name='x' value='x' /></form></td>" \
359         "</tr>" \
360       end.join << "</table>" \
361       "<p>" \
362         "This is running the #{self.class}</a> service, see " \
363         "<a href='#{DOC_URL}'>#{DOC_URL}</a> " \
364         "for more information and options." \
365       "</p>" \
366       "</body></html>"
367     headers["Content-Length"] = bytesize(body).to_s
368     [ 200, headers, [ body ] ]
369   end
371   def tail(addr, env)
372     Tailer.new(self, addr, env).finish
373   end
375   # This is the response body returned for "/tail/$ADDRESS.txt".  This
376   # must use a multi-threaded Rack server with streaming response support.
377   # It is an internal class and not expected to be used directly
378   class Tailer
379     def initialize(rdmon, addr, env) # :nodoc:
380       @rdmon = rdmon
381       @addr = addr
382       q = Rack::Utils.parse_query env["QUERY_STRING"]
383       @active_min = q["active_min"].to_i
384       @queued_min = q["queued_min"].to_i
385       len = Rack::Utils.bytesize(addr)
386       len = 35 if len > 35
387       @fmt = "%20s % #{len}s % 10u % 10u\n"
388       case env["HTTP_VERSION"]
389       when "HTTP/1.0", nil
390         @chunk = false
391       else
392         @chunk = true
393       end
394     end
396     def finish
397       headers = {
398         "Content-Type" => "text/plain",
399         "Cache-Control" => "no-transform",
400         "Expires" => Time.at(0).httpdate,
401       }
402       headers["Transfer-Encoding"] = "chunked" if @chunk
403       [ 200, headers, self ]
404     end
406     # called by the Rack server
407     def each # :nodoc:
408       begin
409         time, all = @rdmon.wait_snapshot
410         stats = all[@addr] or next
411         stats.queued >= @queued_min or next
412         stats.active >= @active_min or next
413         body = sprintf(@fmt, time.iso8601, @addr, stats.active, stats.queued)
414         body = "#{body.size.to_s(16)}\r\n#{body}\r\n" if @chunk
415         yield body
416       end while true
417       yield "0\r\n\r\n" if @chunk
418     end
419   end
421   # shuts down the background thread, only for tests
422   def shutdown
423     @socket = nil
424     @thr.join if @thr
425     @thr = nil
426   end
427   # :startdoc: