README: update with extra disclaimer
[raindrops.git] / lib / raindrops / watcher.rb
blob8fc0772cdd15fd76d1cfdbb18ea206ba46ea5db6
1 # -*- encoding: binary -*-
2 # frozen_string_literal: false
3 require "thread"
4 require "time"
5 require "socket"
6 require "rack"
7 require "aggregate"
9 # Raindrops::Watcher is a stand-alone Rack application for watching
10 # any number of TCP and UNIX listeners (all of them by default).
12 # It depends on the {Aggregate RubyGem}[https://rubygems.org/gems/aggregate]
14 # In your Rack config.ru:
16 #    run Raindrops::Watcher(options = {})
18 # It takes the following options hash:
20 # - :listeners - an array of listener names, (e.g. %w(0.0.0.0:80 /tmp/sock))
21 # - :delay - interval between stats updates in seconds (default: 1)
23 # Raindrops::Watcher is compatible any thread-safe/thread-aware Rack
24 # middleware.  It does not work well with multi-process web servers
25 # but can be used to monitor them.  It consumes minimal resources
26 # with the default :delay.
28 # == HTTP endpoints
30 # === GET /
32 # Returns an HTML summary listing of all listen interfaces watched on
34 # === GET /active/$LISTENER.txt
36 # Returns a plain text summary + histogram with X-* HTTP headers for
37 # active connections.
39 # e.g.: curl https://yhbt.net/raindrops-demo/active/0.0.0.0%3A80.txt
41 # === GET /active/$LISTENER.html
43 # Returns an HTML summary + histogram with X-* HTTP headers for
44 # active connections.
46 # e.g.: curl https://yhbt.net/raindrops-demo/active/0.0.0.0%3A80.html
48 # === GET /queued/$LISTENER.txt
50 # Returns a plain text summary + histogram with X-* HTTP headers for
51 # queued connections.
53 # e.g.: curl https://yhbt.net/raindrops-demo/queued/0.0.0.0%3A80.txt
55 # === GET /queued/$LISTENER.html
57 # Returns an HTML summary + histogram with X-* HTTP headers for
58 # queued connections.
60 # e.g.: curl https://yhbt.net/raindrops-demo/queued/0.0.0.0%3A80.html
62 # === POST /reset/$LISTENER
64 # Resets the active and queued statistics for the given listener.
66 # === GET /tail/$LISTENER.txt?active_min=1&queued_min=1
68 # Streams chunked a response to the client.
69 # Interval is the preconfigured +:delay+ of the application (default 1 second)
71 # The response is plain text in the following format:
73 #   ISO8601_TIMESTAMP LISTENER_NAME ACTIVE_COUNT QUEUED_COUNT LINEFEED
75 # Query parameters:
77 # - active_min - do not stream a line until this active count is reached
78 # - queued_min - do not stream a line until this queued count is reached
80 # == Response headers (mostly the same names as Raindrops::LastDataRecv)
82 # - X-Count   - number of samples polled
83 # - X-Last-Reset - date since the last reset
85 # The following headers are only present if X-Count is greater than one.
87 # - X-Min     - lowest number of connections recorded
88 # - X-Max     - highest number of connections recorded
89 # - X-Mean    - mean number of connections recorded
90 # - X-Std-Dev - standard deviation of connection count
91 # - X-Outliers-Low - number of low outliers (hopefully many for queued)
92 # - X-Outliers-High - number of high outliers (hopefully zero for queued)
93 # - X-Current - current number of connections
94 # - X-First-Peak-At - date of when X-Max was first reached
95 # - X-Last-Peak-At - date of when X-Max was last reached
97 # = Demo Server
99 # There is a server running this app at https://yhbt.net/raindrops-demo/
100 # The Raindrops::Middleware demo is also accessible at
101 # https://yhbt.net/raindrops-demo/_raindrops
103 # The demo server is only limited to 30 users, so be sure not to abuse it
104 # by using the /tail/ endpoint too much.
105 class Raindrops::Watcher
106   # :stopdoc:
107   attr_reader :snapshot
108   include Rack::Utils
109   include Raindrops::Linux
110   DOC_URL = "https://yhbt.net/raindrops/Raindrops/Watcher.html"
111   Peak = Struct.new(:first, :last)
113   def initialize(opts = {})
114     @tcp_listeners = @unix_listeners = nil
115     if l = opts[:listeners]
116       tcp, unix = [], []
117       Array(l).each { |addr| (addr =~ %r{\A/} ? unix : tcp) << addr }
118       unless tcp.empty? && unix.empty?
119         @tcp_listeners = tcp
120         @unix_listeners = unix
121       end
122     end
124     @agg_class = opts[:agg_class] || Aggregate
125     @start_time = Time.now.utc
126     @active = Hash.new { |h,k| h[k] = @agg_class.new }
127     @queued = Hash.new { |h,k| h[k] = @agg_class.new }
128     @resets = Hash.new { |h,k| h[k] = @start_time }
129     @peak_active = Hash.new { |h,k| h[k] = Peak.new(@start_time, @start_time) }
130     @peak_queued = Hash.new { |h,k| h[k] = Peak.new(@start_time, @start_time) }
131     @snapshot = [ @start_time, {} ]
132     @delay = opts[:delay] || 1
133     @lock = Mutex.new
134     @start = Mutex.new
135     @cond = ConditionVariable.new
136     @thr = nil
137   end
139   def hostname
140     Socket.gethostname
141   end
143   # rack endpoint
144   def call(env)
145     @start.synchronize { @thr ||= aggregator_thread(env["rack.logger"]) }
146     case env["REQUEST_METHOD"]
147     when "GET"
148       get env
149     when "HEAD"
150       r = get(env)
151       r[2] = []
152       r
153     when "POST"
154       post env
155     else
156       Rack::Response.new(["Method Not Allowed"], 405).finish
157     end
158   end
160   def aggregate!(agg_hash, peak_hash, addr, number, now)
161     agg = agg_hash[addr]
162     if (max = agg.max) && number > 0 && number >= max
163       peak = peak_hash[addr]
164       peak.first = now if number > max
165       peak.last = now
166     end
167     agg << number
168   end
170   def aggregator_thread(logger) # :nodoc:
171     @socket = sock = Raindrops::InetDiagSocket.new
172     thr = Thread.new do
173       begin
174         combined = tcp_listener_stats(@tcp_listeners, sock)
175         combined.merge!(unix_listener_stats(@unix_listeners))
176         @lock.synchronize do
177           now = Time.now.utc
178           combined.each do |addr,stats|
179             aggregate!(@active, @peak_active, addr, stats.active, now)
180             aggregate!(@queued, @peak_queued, addr, stats.queued, now)
181           end
182           @snapshot = [ now, combined ]
183           @cond.broadcast
184         end
185       rescue => e
186         logger.error "#{e.class} #{e.inspect}"
187       end while sleep(@delay) && @socket
188       sock.close
189     end
190     wait_snapshot
191     thr
192   end
194   def non_existent_stats(time)
195     [ time, @start_time, @agg_class.new, 0, Peak.new(@start_time, @start_time) ]
196   end
198   def active_stats(addr) # :nodoc:
199     @lock.synchronize do
200       time, combined = @snapshot
201       stats = combined[addr] or return non_existent_stats(time)
202       tmp, peak = @active[addr], @peak_active[addr]
203       [ time, @resets[addr], tmp.dup, stats.active, peak ]
204     end
205   end
207   def queued_stats(addr) # :nodoc:
208     @lock.synchronize do
209       time, combined = @snapshot
210       stats = combined[addr] or return non_existent_stats(time)
211       tmp, peak = @queued[addr], @peak_queued[addr]
212       [ time, @resets[addr], tmp.dup, stats.queued, peak ]
213     end
214   end
216   def wait_snapshot
217     @lock.synchronize do
218       @cond.wait @lock
219       @snapshot
220     end
221   end
223   def std_dev(agg)
224     agg.std_dev.to_s
225   rescue Errno::EDOM
226     "NaN"
227   end
229   def agg_to_hash(reset_at, agg, current, peak)
230     {
231       "X-Count" => agg.count.to_s,
232       "X-Min" => agg.min.to_s,
233       "X-Max" => agg.max.to_s,
234       "X-Mean" => agg.mean.to_s,
235       "X-Std-Dev" => std_dev(agg),
236       "X-Outliers-Low" => agg.outliers_low.to_s,
237       "X-Outliers-High" => agg.outliers_high.to_s,
238       "X-Last-Reset" => reset_at.httpdate,
239       "X-Current" => current.to_s,
240       "X-First-Peak-At" => peak.first.httpdate,
241       "X-Last-Peak-At" => peak.last.httpdate,
242     }
243   end
245   def histogram_txt(agg)
246     updated_at, reset_at, agg, current, peak = *agg
247     headers = agg_to_hash(reset_at, agg, current, peak)
248     body = agg.to_s # 7-bit ASCII-clean
249     headers["Content-Type"] = "text/plain"
250     headers["Expires"] = (updated_at + @delay).httpdate
251     headers["Content-Length"] = body.size.to_s
252     [ 200, headers, [ body ] ]
253   end
255   def histogram_html(agg, addr)
256     updated_at, reset_at, agg, current, peak = *agg
257     headers = agg_to_hash(reset_at, agg, current, peak)
258     body = "<html>" \
259       "<head><title>#{hostname} - #{escape_html addr}</title></head>" \
260       "<body><table>" <<
261       headers.map { |k,v|
262         "<tr><td>#{k.gsub(/^X-/, '')}</td><td>#{v}</td></tr>"
263       }.join << "</table><pre>#{escape_html agg}</pre>" \
264       "<form action='../reset/#{escape addr}' method='post'>" \
265       "<input type='submit' name='x' value='reset' /></form>" \
266       "</body>"
267     headers["Content-Type"] = "text/html"
268     headers["Expires"] = (updated_at + @delay).httpdate
269     headers["Content-Length"] = body.size.to_s
270     [ 200, headers, [ body ] ]
271   end
273   def get(env)
274     retried = false
275     begin
276       case env["PATH_INFO"]
277       when "/"
278         index
279       when %r{\A/active/(.+)\.txt\z}
280         histogram_txt(active_stats(unescape($1)))
281       when %r{\A/active/(.+)\.html\z}
282         addr = unescape $1
283         histogram_html(active_stats(addr), addr)
284       when %r{\A/queued/(.+)\.txt\z}
285         histogram_txt(queued_stats(unescape($1)))
286       when %r{\A/queued/(.+)\.html\z}
287         addr = unescape $1
288         histogram_html(queued_stats(addr), addr)
289       when %r{\A/tail/(.+)\.txt\z}
290         tail(unescape($1), env)
291       else
292         not_found
293       end
294     rescue Errno::EDOM
295       raise if retried
296       retried = true
297       wait_snapshot
298       retry
299     end
300   end
302   def not_found
303     Rack::Response.new(["Not Found"], 404).finish
304   end
306   def post(env)
307     case env["PATH_INFO"]
308     when %r{\A/reset/(.+)\z}
309       reset!(env, unescape($1))
310     else
311       not_found
312     end
313   end
315   def reset!(env, addr)
316     @lock.synchronize do
317       @active.include?(addr) or return not_found
318       @active.delete addr
319       @queued.delete addr
320       @resets[addr] = Time.now.utc
321       @cond.wait @lock
322     end
323     req = Rack::Request.new(env)
324     res = Rack::Response.new
325     url = req.referer || "#{req.host_with_port}/"
326     res.redirect(url)
327     res["Content-Type"] = "text/plain"
328     res.write "Redirecting to #{url}"
329     res.finish
330   end
332   def index
333     updated_at, all = snapshot
334     headers = {
335       "Content-Type" => "text/html",
336       "Last-Modified" => updated_at.httpdate,
337       "Expires" => (updated_at + @delay).httpdate,
338     }
339     body = "<html><head>" \
340       "<title>#{hostname} - all interfaces</title>" \
341       "</head><body><h3>Updated at #{updated_at.iso8601}</h3>" \
342       "<table><tr>" \
343         "<th>address</th><th>active</th><th>queued</th><th>reset</th>" \
344       "</tr>" <<
345       all.sort do |a,b|
346         a[0] <=> b[0] # sort by addr
347       end.map do |addr,stats|
348         e_addr = escape addr
349         "<tr>" \
350           "<td><a href='tail/#{e_addr}.txt' " \
351             "title='&quot;tail&quot; output in real time'" \
352             ">#{escape_html addr}</a></td>" \
353           "<td><a href='active/#{e_addr}.html' " \
354             "title='show active connection stats'>#{stats.active}</a></td>" \
355           "<td><a href='queued/#{e_addr}.html' " \
356             "title='show queued connection stats'>#{stats.queued}</a></td>" \
357           "<td><form action='reset/#{e_addr}' method='post'>" \
358             "<input title='reset statistics' " \
359               "type='submit' name='x' value='x' /></form></td>" \
360         "</tr>" \
361       end.join << "</table>" \
362       "<p>" \
363         "This is running the #{self.class}</a> service, see " \
364         "<a href='#{DOC_URL}'>#{DOC_URL}</a> " \
365         "for more information and options." \
366       "</p>" \
367       "</body></html>"
368     headers["Content-Length"] = body.size.to_s
369     [ 200, headers, [ body ] ]
370   end
372   def tail(addr, env)
373     Tailer.new(self, addr, env).finish
374   end
376   # This is the response body returned for "/tail/$ADDRESS.txt".  This
377   # must use a multi-threaded Rack server with streaming response support.
378   # It is an internal class and not expected to be used directly
379   class Tailer
380     def initialize(rdmon, addr, env) # :nodoc:
381       @rdmon = rdmon
382       @addr = addr
383       q = Rack::Utils.parse_query env["QUERY_STRING"]
384       @active_min = q["active_min"].to_i
385       @queued_min = q["queued_min"].to_i
386       len = addr.size
387       len = 35 if len > 35
388       @fmt = "%20s % #{len}s % 10u % 10u\n"
389       case env["HTTP_VERSION"]
390       when "HTTP/1.0", nil
391         @chunk = false
392       else
393         @chunk = true
394       end
395     end
397     def finish
398       headers = {
399         "Content-Type" => "text/plain",
400         "Cache-Control" => "no-transform",
401         "Expires" => Time.at(0).httpdate,
402       }
403       headers["Transfer-Encoding"] = "chunked" if @chunk
404       [ 200, headers, self ]
405     end
407     # called by the Rack server
408     def each # :nodoc:
409       begin
410         time, all = @rdmon.wait_snapshot
411         stats = all[@addr] or next
412         stats.queued >= @queued_min or next
413         stats.active >= @active_min or next
414         body = sprintf(@fmt, time.iso8601, @addr, stats.active, stats.queued)
415         body = "#{body.size.to_s(16)}\r\n#{body}\r\n" if @chunk
416         yield body
417       end while true
418       yield "0\r\n\r\n" if @chunk
419     end
420   end
422   # shuts down the background thread, only for tests
423   def shutdown
424     @socket = nil
425     @thr.join if @thr
426     @thr = nil
427   end
428   # :startdoc: