watcher: set Expires headers for cache invalidation
[raindrops.git] / lib / raindrops / watcher.rb
blob3c724b96f1b2dc74bc44d39b0ae9138eec519d3b
1 # -*- encoding: binary -*-
2 require "thread"
3 require "time"
4 require "socket"
5 require "rack"
6 require "aggregate"
8 # Raindrops::Watcher is a stand-alone Rack application for watching
9 # any number of TCP and UNIX listeners (all of them by default).
11 # It depends on the {Aggregate RubyGem}[http://rubygems.org/gems/aggregate]
13 # In your Rack config.ru:
15 #    run Raindrops::Watcher(options = {})
17 # It takes the following options hash:
19 # - :listeners - an array of listener names, (e.g. %w(0.0.0.0:80 /tmp/sock))
20 # - :delay - interval between stats updates in seconds (default: 1)
22 # Raindrops::Watcher is compatible any thread-safe/thread-aware Rack
23 # middleware.  It does not work well with multi-process web servers
24 # but can be used to monitor them.  It consumes minimal resources
25 # with the default :delay.
27 # == HTTP endpoints
29 # === GET /
31 # Returns an HTML summary listing of all listen interfaces watched on
33 # === GET /active/$LISTENER.txt
35 # Returns a plain text summary + histogram with X-* HTTP headers for
36 # active connections.
38 # e.g.: curl http://example.com/active/0.0.0.0%3A80.txt
40 # === GET /active/$LISTENER.html
42 # Returns an HTML summary + histogram with X-* HTTP headers for
43 # active connections.
45 # e.g.: curl http://example.com/active/0.0.0.0%3A80.html
47 # === GET /queued/$LISTENER.txt
49 # Returns a plain text summary + histogram with X-* HTTP headers for
50 # queued connections.
52 # e.g.: curl http://example.com/queued/0.0.0.0%3A80.txt
54 # === GET /queued/$LISTENER.html
56 # Returns an HTML summary + histogram with X-* HTTP headers for
57 # queued connections.
59 # e.g.: curl http://example.com/queued/0.0.0.0%3A80.html
61 # === GET /tail/$LISTENER.txt?active_min=1&queued_min=1
63 # Streams chunked a response to the client.
64 # Interval is the preconfigured +:delay+ of the application (default 1 second)
66 # The response is plain text in the following format:
68 #   ISO8601_TIMESTAMP LISTENER_NAME ACTIVE_COUNT QUEUED_COUNT LINEFEED
70 # Query parameters:
72 # - active_min - do not stream a line until this active count is reached
73 # - queued_min - do not stream a line until this queued count is reached
75 # == Response headers (mostly the same as Raindrops::LastDataRecv)
77 # - X-Count   - number of requests received
78 # - X-Last-Reset - date since the last reset
80 # The following headers are only present if X-Count is greater than one.
82 # - X-Min     - lowest last_data_recv time recorded (in milliseconds)
83 # - X-Max     - highest last_data_recv time recorded (in milliseconds)
84 # - X-Mean    - mean last_data_recv time recorded (rounded, in milliseconds)
85 # - X-Std-Dev - standard deviation of last_data_recv times
86 # - X-Outliers-Low - number of low outliers (hopefully many!)
87 # - X-Outliers-High - number of high outliers (hopefully zero!)
89 # = Demo Server
91 # There is a server running this app at http://raindrops-demo.bogomips.org/
92 # The Raindrops::Middleware demo is also accessible at
93 #   http://raindrops-demo.bogomips.org/_raindrops
95 # The demo server is only limited to 30 users, so be sure not to abuse it
96 # by using the /tail/ endpoint too much.
97 class Raindrops::Watcher
98   # :stopdoc:
99   attr_reader :snapshot
100   include Rack::Utils
101   include Raindrops::Linux
102   DOC_URL = "http://raindrops.bogomips.org/Raindrops/Watcher.html"
104   def initialize(opts = {})
105     @tcp_listeners = @unix_listeners = nil
106     if l = opts[:listeners]
107       tcp, unix = [], []
108       Array(l).each { |addr| (addr =~ %r{\A/} ? unix : tcp) << addr }
109       unless tcp.empty? && unix.empty?
110         @tcp_listeners = tcp
111         @unix_listeners = unix
112       end
113     end
115     agg_class = opts[:agg_class] || Aggregate
116     start = Time.now.utc
117     @active = Hash.new { |h,k| h[k] = agg_class.new }
118     @queued = Hash.new { |h,k| h[k] = agg_class.new }
119     @resets = Hash.new { |h,k| h[k] = start }
120     @snapshot = [ start, {} ]
121     @delay = opts[:delay] || 1
122     @lock = Mutex.new
123     @start = Mutex.new
124     @cond = ConditionVariable.new
125     @thr = nil
126   end
128   def hostname
129     Socket.gethostname
130   end
132   # rack endpoint
133   def call(env)
134     @start.synchronize { @thr ||= aggregator_thread(env["rack.logger"]) }
135     case env["REQUEST_METHOD"]
136     when "HEAD", "GET"
137       get env
138     when "POST"
139       post env
140     else
141       Rack::Response.new(["Method Not Allowed"], 405).finish
142     end
143   end
145   def aggregator_thread(logger) # :nodoc:
146     @socket = sock = Raindrops::InetDiagSocket.new
147     thr = Thread.new do
148       begin
149         combined = tcp_listener_stats(@tcp_listeners, sock)
150         combined.merge!(unix_listener_stats(@unix_listeners))
151         @lock.synchronize do
152           combined.each do |addr,stats|
153             @active[addr] << stats.active
154             @queued[addr] << stats.queued
155           end
156           @snapshot = [ Time.now.utc, combined ]
157           @cond.broadcast
158         end
159       rescue => e
160         logger.error "#{e.class} #{e.inspect}"
161       end while sleep(@delay) && @socket
162       sock.close
163     end
164     wait_snapshot
165     thr
166   end
168   def active_stats(addr) # :nodoc:
169     @lock.synchronize do
170       tmp = @active[addr] or return
171       [ @snapshot[0], @resets[addr], tmp.dup ]
172     end
173   end
175   def queued_stats(addr) # :nodoc:
176     @lock.synchronize do
177       tmp = @queued[addr] or return
178       [ @snapshot[0], @resets[addr], tmp.dup ]
179     end
180   end
182   def wait_snapshot
183     @lock.synchronize do
184       @cond.wait @lock
185       @snapshot
186     end
187   end
189   def agg_to_hash(reset_at, agg)
190     {
191       "X-Count" => agg.count.to_s,
192       "X-Min" => agg.min.to_s,
193       "X-Max" => agg.max.to_s,
194       "X-Mean" => agg.mean.to_s,
195       "X-Std-Dev" => agg.std_dev.to_s,
196       "X-Outliers-Low" => agg.outliers_low.to_s,
197       "X-Outliers-High" => agg.outliers_high.to_s,
198       "X-Last-Reset" => reset_at.httpdate,
199     }
200   end
202   def histogram_txt(agg)
203     updated_at, reset_at, agg = *agg
204     headers = agg_to_hash(reset_at, agg)
205     body = agg.to_s
206     headers["Content-Type"] = "text/plain"
207     headers["Expires"] = (updated_at + @delay).httpdate
208     headers["Content-Length"] = bytesize(body).to_s
209     [ 200, headers, [ body ] ]
210   end
212   def histogram_html(agg, addr)
213     updated_at, reset_at, agg = *agg
214     headers = agg_to_hash(reset_at, agg)
215     body = "<html>" \
216       "<head><title>#{hostname} - #{escape_html addr}</title></head>" \
217       "<body><table>" <<
218       headers.map { |k,v|
219         "<tr><td>#{k.gsub(/^X-/, '')}</td><td>#{v}</td></tr>"
220       }.join << "</table><pre>#{escape_html agg}</pre>" \
221       "<form action='/reset/#{escape addr}' method='post'>" \
222       "<input type='submit' name='x' value='reset' /></form>" \
223       "</body>"
224     headers["Content-Type"] = "text/html"
225     headers["Expires"] = (updated_at + @delay).httpdate
226     headers["Content-Length"] = bytesize(body).to_s
227     [ 200, headers, [ body ] ]
228   end
230   def get(env)
231     case env["PATH_INFO"]
232     when "/"
233       index
234     when %r{\A/active/(.+)\.txt\z}
235       histogram_txt(active_stats(unescape($1)))
236     when %r{\A/active/(.+)\.html\z}
237       addr = unescape $1
238       histogram_html(active_stats(addr), addr)
239     when %r{\A/queued/(.+)\.txt\z}
240       histogram_txt(queued_stats(unescape($1)))
241     when %r{\A/queued/(.+)\.html\z}
242       addr = unescape $1
243       histogram_html(queued_stats(addr), addr)
244     when %r{\A/tail/(.+)\.txt\z}
245       tail(unescape($1), env)
246     else
247       not_found
248     end
249     rescue Errno::EDOM
250       raise if defined?(retried)
251       retried = true
252       wait_snapshot
253       retry
254   end
256   def not_found
257     Rack::Response.new(["Not Found"], 404).finish
258   end
260   def post(env)
261     case env["PATH_INFO"]
262     when %r{\A/reset/(.+)\z}
263       reset!(env, unescape($1))
264     else
265       Rack::Response.new(["Not Found"], 404).finish
266     end
267   end
269   def reset!(env, addr)
270     @lock.synchronize do
271       @active.include?(addr) or return not_found
272       @active.delete addr
273       @queued.delete addr
274       @resets[addr] = Time.now.utc
275       @cond.wait @lock
276     end
277     req = Rack::Request.new(env)
278     res = Rack::Response.new
279     url = req.referer || "#{req.host_with_port}/"
280     res.redirect(url)
281     res.content_type.replace "text/plain"
282     res.write "Redirecting to #{url}"
283     res.finish
284   end
286   def index
287     updated_at, all = snapshot
288     headers = {
289       "Content-Type" => "text/html",
290       "Last-Modified" => updated_at.httpdate,
291       "Expires" => (updated_at + @delay).httpdate,
292     }
293     body = "<html><head>" \
294       "<title>#{hostname} - all interfaces</title>" \
295       "</head><body><h3>Updated at #{updated_at.iso8601}</h3>" \
296       "<table><tr>" \
297         "<th>address</th><th>active</th><th>queued</th><th>reset</th>" \
298       "</tr>" <<
299       all.map do |addr,stats|
300         e_addr = escape addr
301         "<tr>" \
302           "<td><a href='/tail/#{e_addr}.txt'>#{escape_html addr}</a></td>" \
303           "<td><a href='/active/#{e_addr}.html'>#{stats.active}</a></td>" \
304           "<td><a href='/queued/#{e_addr}.html'>#{stats.queued}</a></td>" \
305           "<td><form action='/reset/#{e_addr}' method='post'>" \
306             "<input type='submit' name='x' value='x' /></form></td>" \
307         "</tr>" \
308       end.join << "</table>" \
309       "<p>" \
310         "This is running the #{self.class}</a> service, see " \
311         "<a href='#{DOC_URL}'>#{DOC_URL}</a> " \
312         "for more information and options." \
313       "</p>" \
314       "</body></html>"
315     headers["Content-Length"] = bytesize(body).to_s
316     [ 200, headers, [ body ] ]
317   end
319   def tail(addr, env)
320     Tailer.new(self, addr, env).finish
321   end
323   # This is the response body returned for "/tail/$ADDRESS.txt".  This
324   # must use a multi-threaded Rack server with streaming response support.
325   # It is an internal class and not expected to be used directly
326   class Tailer
327     def initialize(rdmon, addr, env) # :nodoc:
328       @rdmon = rdmon
329       @addr = addr
330       q = Rack::Utils.parse_query env["QUERY_STRING"]
331       @active_min = q["active_min"].to_i
332       @queued_min = q["queued_min"].to_i
333       len = Rack::Utils.bytesize(addr)
334       len = 35 if len > 35
335       @fmt = "%20s % #{len}s % 10u % 10u\n"
336       case env["HTTP_VERSION"]
337       when "HTTP/1.0", nil
338         @chunk = false
339       else
340         @chunk = true
341       end
342     end
344     def finish
345       headers = {
346         "Content-Type" => "text/plain",
347         "Cache-Control" => "no-transform",
348         "Expires" => Time.at(0).httpdate,
349       }
350       headers["Transfer-Encoding"] = "chunked" if @chunk
351       [ 200, headers, self ]
352     end
354     # called by the Rack server
355     def each # :nodoc:
356       begin
357         time, all = @rdmon.wait_snapshot
358         stats = all[@addr] or next
359         stats.queued >= @queued_min or next
360         stats.active >= @active_min or next
361         body = sprintf(@fmt, time.iso8601, @addr, stats.active, stats.queued)
362         body = "#{body.size.to_s(16)}\r\n#{body}\r\n" if @chunk
363         yield body
364       end while true
365       yield "0\r\n\r\n" if @chunk
366     end
367   end
369   # shuts down the background thread, only for tests
370   def shutdown
371     @socket = nil
372     @thr.join if @thr
373     @thr = nil
374   end
375   # :startdoc: