1 # -*- encoding: binary -*-
2 # frozen_string_literal: false
9 # Raindrops::Watcher is a stand-alone Rack application for watching
10 # any number of TCP and UNIX listeners (all of them by default).
12 # It depends on the {Aggregate RubyGem}[https://rubygems.org/gems/aggregate]
14 # In your Rack config.ru:
16 # run Raindrops::Watcher(options = {})
18 # It takes the following options hash:
20 # - :listeners - an array of listener names, (e.g. %w(0.0.0.0:80 /tmp/sock))
21 # - :delay - interval between stats updates in seconds (default: 1)
23 # Raindrops::Watcher is compatible any thread-safe/thread-aware Rack
24 # middleware. It does not work well with multi-process web servers
25 # but can be used to monitor them. It consumes minimal resources
26 # with the default :delay.
32 # Returns an HTML summary listing of all listen interfaces watched on
34 # === GET /active/$LISTENER.txt
36 # Returns a plain text summary + histogram with X-* HTTP headers for
39 # e.g.: curl https://yhbt.net/raindrops-demo/active/0.0.0.0%3A80.txt
41 # === GET /active/$LISTENER.html
43 # Returns an HTML summary + histogram with X-* HTTP headers for
46 # e.g.: curl https://yhbt.net/raindrops-demo/active/0.0.0.0%3A80.html
48 # === GET /queued/$LISTENER.txt
50 # Returns a plain text summary + histogram with X-* HTTP headers for
53 # e.g.: curl https://yhbt.net/raindrops-demo/queued/0.0.0.0%3A80.txt
55 # === GET /queued/$LISTENER.html
57 # Returns an HTML summary + histogram with X-* HTTP headers for
60 # e.g.: curl https://yhbt.net/raindrops-demo/queued/0.0.0.0%3A80.html
62 # === POST /reset/$LISTENER
64 # Resets the active and queued statistics for the given listener.
66 # === GET /tail/$LISTENER.txt?active_min=1&queued_min=1
68 # Streams chunked a response to the client.
69 # Interval is the preconfigured +:delay+ of the application (default 1 second)
71 # The response is plain text in the following format:
73 # ISO8601_TIMESTAMP LISTENER_NAME ACTIVE_COUNT QUEUED_COUNT LINEFEED
77 # - active_min - do not stream a line until this active count is reached
78 # - queued_min - do not stream a line until this queued count is reached
80 # == Response headers (mostly the same names as Raindrops::LastDataRecv)
82 # - X-Count - number of samples polled
83 # - X-Last-Reset - date since the last reset
85 # The following headers are only present if X-Count is greater than one.
87 # - X-Min - lowest number of connections recorded
88 # - X-Max - highest number of connections recorded
89 # - X-Mean - mean number of connections recorded
90 # - X-Std-Dev - standard deviation of connection count
91 # - X-Outliers-Low - number of low outliers (hopefully many for queued)
92 # - X-Outliers-High - number of high outliers (hopefully zero for queued)
93 # - X-Current - current number of connections
94 # - X-First-Peak-At - date of when X-Max was first reached
95 # - X-Last-Peak-At - date of when X-Max was last reached
99 # There is a server running this app at https://yhbt.net/raindrops-demo/
100 # The Raindrops::Middleware demo is also accessible at
101 # https://yhbt.net/raindrops-demo/_raindrops
103 # The demo server is only limited to 30 users, so be sure not to abuse it
104 # by using the /tail/ endpoint too much.
105 class Raindrops::Watcher
107 attr_reader :snapshot
109 include Raindrops::Linux
110 DOC_URL = "https://yhbt.net/raindrops/Raindrops/Watcher.html"
111 Peak = Struct.new(:first, :last)
113 def initialize(opts = {})
114 @tcp_listeners = @unix_listeners = nil
115 if l = opts[:listeners]
117 Array(l).each { |addr| (addr =~ %r{\A/} ? unix : tcp) << addr }
118 unless tcp.empty? && unix.empty?
120 @unix_listeners = unix
124 @agg_class = opts[:agg_class] || Aggregate
125 @start_time = Time.now.utc
126 @active = Hash.new { |h,k| h[k] = @agg_class.new }
127 @queued = Hash.new { |h,k| h[k] = @agg_class.new }
128 @resets = Hash.new { |h,k| h[k] = @start_time }
129 @peak_active = Hash.new { |h,k| h[k] = Peak.new(@start_time, @start_time) }
130 @peak_queued = Hash.new { |h,k| h[k] = Peak.new(@start_time, @start_time) }
131 @snapshot = [ @start_time, {} ]
132 @delay = opts[:delay] || 1
135 @cond = ConditionVariable.new
145 @start.synchronize { @thr ||= aggregator_thread(env["rack.logger"]) }
146 case env["REQUEST_METHOD"]
156 Rack::Response.new(["Method Not Allowed"], 405).finish
160 def aggregate!(agg_hash, peak_hash, addr, number, now)
162 if (max = agg.max) && number > 0 && number >= max
163 peak = peak_hash[addr]
164 peak.first = now if number > max
170 def aggregator_thread(logger) # :nodoc:
171 @socket = sock = Raindrops::InetDiagSocket.new
174 combined = tcp_listener_stats(@tcp_listeners, sock)
175 combined.merge!(unix_listener_stats(@unix_listeners))
178 combined.each do |addr,stats|
179 aggregate!(@active, @peak_active, addr, stats.active, now)
180 aggregate!(@queued, @peak_queued, addr, stats.queued, now)
182 @snapshot = [ now, combined ]
186 logger.error "#{e.class} #{e.inspect}"
187 end while sleep(@delay) && @socket
194 def non_existent_stats(time)
195 [ time, @start_time, @agg_class.new, 0, Peak.new(@start_time, @start_time) ]
198 def active_stats(addr) # :nodoc:
200 time, combined = @snapshot
201 stats = combined[addr] or return non_existent_stats(time)
202 tmp, peak = @active[addr], @peak_active[addr]
203 [ time, @resets[addr], tmp.dup, stats.active, peak ]
207 def queued_stats(addr) # :nodoc:
209 time, combined = @snapshot
210 stats = combined[addr] or return non_existent_stats(time)
211 tmp, peak = @queued[addr], @peak_queued[addr]
212 [ time, @resets[addr], tmp.dup, stats.queued, peak ]
229 def agg_to_hash(reset_at, agg, current, peak)
231 "X-Count" => agg.count.to_s,
232 "X-Min" => agg.min.to_s,
233 "X-Max" => agg.max.to_s,
234 "X-Mean" => agg.mean.to_s,
235 "X-Std-Dev" => std_dev(agg),
236 "X-Outliers-Low" => agg.outliers_low.to_s,
237 "X-Outliers-High" => agg.outliers_high.to_s,
238 "X-Last-Reset" => reset_at.httpdate,
239 "X-Current" => current.to_s,
240 "X-First-Peak-At" => peak.first.httpdate,
241 "X-Last-Peak-At" => peak.last.httpdate,
245 def histogram_txt(agg)
246 updated_at, reset_at, agg, current, peak = *agg
247 headers = agg_to_hash(reset_at, agg, current, peak)
248 body = agg.to_s # 7-bit ASCII-clean
249 headers["Content-Type"] = "text/plain"
250 headers["Expires"] = (updated_at + @delay).httpdate
251 headers["Content-Length"] = body.size.to_s
252 [ 200, headers, [ body ] ]
255 def histogram_html(agg, addr)
256 updated_at, reset_at, agg, current, peak = *agg
257 headers = agg_to_hash(reset_at, agg, current, peak)
259 "<head><title>#{hostname} - #{escape_html addr}</title></head>" \
262 "<tr><td>#{k.gsub(/^X-/, '')}</td><td>#{v}</td></tr>"
263 }.join << "</table><pre>#{escape_html agg}</pre>" \
264 "<form action='../reset/#{escape addr}' method='post'>" \
265 "<input type='submit' name='x' value='reset' /></form>" \
267 headers["Content-Type"] = "text/html"
268 headers["Expires"] = (updated_at + @delay).httpdate
269 headers["Content-Length"] = body.size.to_s
270 [ 200, headers, [ body ] ]
276 case env["PATH_INFO"]
279 when %r{\A/active/(.+)\.txt\z}
280 histogram_txt(active_stats(unescape($1)))
281 when %r{\A/active/(.+)\.html\z}
283 histogram_html(active_stats(addr), addr)
284 when %r{\A/queued/(.+)\.txt\z}
285 histogram_txt(queued_stats(unescape($1)))
286 when %r{\A/queued/(.+)\.html\z}
288 histogram_html(queued_stats(addr), addr)
289 when %r{\A/tail/(.+)\.txt\z}
290 tail(unescape($1), env)
303 Rack::Response.new(["Not Found"], 404).finish
307 case env["PATH_INFO"]
308 when %r{\A/reset/(.+)\z}
309 reset!(env, unescape($1))
315 def reset!(env, addr)
317 @active.include?(addr) or return not_found
320 @resets[addr] = Time.now.utc
323 req = Rack::Request.new(env)
324 res = Rack::Response.new
325 url = req.referer || "#{req.host_with_port}/"
327 res["Content-Type"] = "text/plain"
328 res.write "Redirecting to #{url}"
333 updated_at, all = snapshot
335 "Content-Type" => "text/html",
336 "Last-Modified" => updated_at.httpdate,
337 "Expires" => (updated_at + @delay).httpdate,
339 body = "<html><head>" \
340 "<title>#{hostname} - all interfaces</title>" \
341 "</head><body><h3>Updated at #{updated_at.iso8601}</h3>" \
343 "<th>address</th><th>active</th><th>queued</th><th>reset</th>" \
346 a[0] <=> b[0] # sort by addr
347 end.map do |addr,stats|
350 "<td><a href='tail/#{e_addr}.txt' " \
351 "title='"tail" output in real time'" \
352 ">#{escape_html addr}</a></td>" \
353 "<td><a href='active/#{e_addr}.html' " \
354 "title='show active connection stats'>#{stats.active}</a></td>" \
355 "<td><a href='queued/#{e_addr}.html' " \
356 "title='show queued connection stats'>#{stats.queued}</a></td>" \
357 "<td><form action='reset/#{e_addr}' method='post'>" \
358 "<input title='reset statistics' " \
359 "type='submit' name='x' value='x' /></form></td>" \
361 end.join << "</table>" \
363 "This is running the #{self.class}</a> service, see " \
364 "<a href='#{DOC_URL}'>#{DOC_URL}</a> " \
365 "for more information and options." \
368 headers["Content-Length"] = body.size.to_s
369 [ 200, headers, [ body ] ]
373 Tailer.new(self, addr, env).finish
376 # This is the response body returned for "/tail/$ADDRESS.txt". This
377 # must use a multi-threaded Rack server with streaming response support.
378 # It is an internal class and not expected to be used directly
380 def initialize(rdmon, addr, env) # :nodoc:
383 q = Rack::Utils.parse_query env["QUERY_STRING"]
384 @active_min = q["active_min"].to_i
385 @queued_min = q["queued_min"].to_i
388 @fmt = "%20s % #{len}s % 10u % 10u\n"
389 case env["HTTP_VERSION"]
399 "Content-Type" => "text/plain",
400 "Cache-Control" => "no-transform",
401 "Expires" => Time.at(0).httpdate,
403 headers["Transfer-Encoding"] = "chunked" if @chunk
404 [ 200, headers, self ]
407 # called by the Rack server
410 time, all = @rdmon.wait_snapshot
411 stats = all[@addr] or next
412 stats.queued >= @queued_min or next
413 stats.active >= @active_min or next
414 body = sprintf(@fmt, time.iso8601, @addr, stats.active, stats.queued)
415 body = "#{body.size.to_s(16)}\r\n#{body}\r\n" if @chunk
418 yield "0\r\n\r\n" if @chunk
422 # shuts down the background thread, only for tests