preliminary Rack app to track last_data_recv
[raindrops.git] / lib / raindrops / aggregate / pmq.rb
blob14f73be342464344ac64011ecb1f2ec66bebc0e1
1 # -*- encoding: binary -*-
2 require "tempfile"
3 require "aggregate"
4 require "posix_mq"
5 require "fcntl"
6 require "io/extra"
8 # Aggregate + POSIX message queues support
9 class Raindrops::Aggregate::PMQ
10   RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256")
11   WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256")
12   UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256")
14   attr_reader :nr_dropped
16   def initialize(params = {})
17     opts = {
18       :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops",
19       :worker_interval => 10,
20       :master_interval => 5,
21       :lossy => false,
22       :mq_attr => nil,
23       :mq_umask => 0666,
24       :aggregate => Aggregate.new,
25     }.merge! params
26     @master_interval = opts[:master_interval]
27     @worker_interval = opts[:worker_interval]
28     @aggregate = opts[:aggregate]
29     @worker_queue = @worker_interval ? [] : nil
31     @mq_name = opts[:queue]
32     mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr]
33     Tempfile.open("raindrops_pmq") do |t|
34       @wr = File.open(t.path, "wb")
35       @rd = File.open(t.path, "rb")
36     end
37     @cached_aggregate = @aggregate
38     flush_master
39     @mq_send = if opts[:lossy]
40       @nr_dropped = 0
41       mq.nonblock = true
42       mq.method :trysend
43     else
44       mq.method :send
45     end
46   end
48   def << val
49     if q = @worker_queue
50       q << val
51       if q.size >= @worker_interval
52         mq_send(q) or @nr_dropped += 1
53         q.clear
54       end
55     else
56       mq_send(val) or @nr_dropped += 1
57     end
58   end
60   def mq_send(val)
61     @cached_aggregate = nil
62     @mq_send.call Marshal.dump(val)
63   end
65   def master_loop
66     buf = ""
67     a = @aggregate
68     nr = 0
69     mq = POSIX_MQ.new @mq_name, :r # this one is always blocking
70     begin
71       if (nr -= 1) < 0
72         nr = @master_interval
73         flush_master
74       end
75       mq.shift(buf)
76       data = begin
77         Marshal.load(buf) or return
78       rescue ArgumentError, TypeError
79         next
80       end
81       Array === data ? data.each { |x| a << x } : a << data
82     rescue Errno::EINTR
83     rescue => e
84       warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}"
85       break
86     end while true
87     ensure
88       flush_master
89   end
91   def aggregate
92     @cached_aggregate ||= begin
93       flush
94       Marshal.load(synchronize(@rd, RDLOCK) do |rd|
95         IO.pread rd.fileno, rd.stat.size, 0
96       end)
97     end
98   end
100   def flush_master
101     dump = Marshal.dump @aggregate
102     synchronize(@wr, WRLOCK) do |wr|
103       wr.truncate 0
104       IO.pwrite wr.fileno, dump, 0
105     end
106   end
108   def stop_master_loop
109     sleep 0.1 until mq_send(false)
110     rescue Errno::EINTR
111       retry
112   end
114   def lock! io, type
115     io.fcntl Fcntl::F_SETLKW, type
116     rescue Errno::EINTR
117       retry
118   end
120   def synchronize io, type
121     lock! io, type
122     yield io
123     ensure
124       lock! io, UNLOCK
125   end
127   def flush
128     if q = @local_queue && ! q.empty?
129       mq_send q
130       q.clear
131     end
132     nil
133   end
135   def count; aggregate.count; end
136   def max; aggregate.max; end
137   def min; aggregate.min; end
138   def sum; aggregate.sum; end
139   def mean; aggregate.mean; end
140   def std_dev; aggregate.std_dev; end
141   def outliers_low; aggregate.outliers_low; end
142   def outliers_high; aggregate.outliers_high; end
143   def to_s(*args); aggregate.to_s *args; end
144   def each; aggregate.each { |*args| yield *args }; end
145   def each_nonzero; aggregate.each_nonzero { |*args| yield *args }; end