aggregate/pmq: we need a Mutex to protect fcntl() locks
[raindrops.git] / lib / raindrops / aggregate / pmq.rb
blob0e7246d9d1dd5809dca05cdb21b5700711656b8e
1 # -*- encoding: binary -*-
2 require "tempfile"
3 require "aggregate"
4 require "posix_mq"
5 require "fcntl"
6 require "io/extra"
7 require "thread"
9 # Aggregate + POSIX message queues support
10 class Raindrops::Aggregate::PMQ
12   # These constants are for Linux.  Tthis is designed for aggregating
13   # TCP_INFO.
14   RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256")
15   WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256")
16   UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256")
18   attr_reader :nr_dropped
20   def initialize(params = {})
21     opts = {
22       :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops",
23       :worker_interval => 10,
24       :master_interval => 5,
25       :lossy => false,
26       :mq_attr => nil,
27       :mq_umask => 0666,
28       :aggregate => Aggregate.new,
29     }.merge! params
30     @master_interval = opts[:master_interval]
31     @worker_interval = opts[:worker_interval]
32     @aggregate = opts[:aggregate]
33     @worker_queue = @worker_interval ? [] : nil
34     @mutex = Mutex.new
36     @mq_name = opts[:queue]
37     mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr]
38     Tempfile.open("raindrops_pmq") do |t|
39       @wr = File.open(t.path, "wb")
40       @rd = File.open(t.path, "rb")
41     end
42     @cached_aggregate = @aggregate
43     flush_master
44     @mq_send = if opts[:lossy]
45       @nr_dropped = 0
46       mq.nonblock = true
47       mq.method :trysend
48     else
49       mq.method :send
50     end
51   end
53   def << val
54     if q = @worker_queue
55       q << val
56       if q.size >= @worker_interval
57         mq_send(q) or @nr_dropped += 1
58         q.clear
59       end
60     else
61       mq_send(val) or @nr_dropped += 1
62     end
63   end
65   def mq_send(val)
66     @cached_aggregate = nil
67     @mq_send.call Marshal.dump(val)
68   end
70   def master_loop
71     buf = ""
72     a = @aggregate
73     nr = 0
74     mq = POSIX_MQ.new @mq_name, :r # this one is always blocking
75     begin
76       if (nr -= 1) < 0
77         nr = @master_interval
78         flush_master
79       end
80       mq.shift(buf)
81       data = begin
82         Marshal.load(buf) or return
83       rescue ArgumentError, TypeError
84         next
85       end
86       Array === data ? data.each { |x| a << x } : a << data
87     rescue Errno::EINTR
88     rescue => e
89       warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}"
90       break
91     end while true
92     ensure
93       flush_master
94   end
96   def aggregate
97     @cached_aggregate ||= begin
98       flush
99       Marshal.load(synchronize(@rd, RDLOCK) do |rd|
100         IO.pread rd.fileno, rd.stat.size, 0
101       end)
102     end
103   end
105   def flush_master
106     dump = Marshal.dump @aggregate
107     synchronize(@wr, WRLOCK) do |wr|
108       wr.truncate 0
109       IO.pwrite wr.fileno, dump, 0
110     end
111   end
113   def stop_master_loop
114     sleep 0.1 until mq_send(false)
115     rescue Errno::EINTR
116       retry
117   end
119   def lock! io, type
120     io.fcntl Fcntl::F_SETLKW, type
121     rescue Errno::EINTR
122       retry
123   end
125   def synchronize io, type
126     @mutex.synchronize do
127       begin
128         lock! io, type
129         yield io
130       ensure
131         lock! io, UNLOCK
132       end
133     end
134   end
136   def flush
137     if q = @local_queue && ! q.empty?
138       mq_send q
139       q.clear
140     end
141     nil
142   end
144   def count; aggregate.count; end
145   def max; aggregate.max; end
146   def min; aggregate.min; end
147   def sum; aggregate.sum; end
148   def mean; aggregate.mean; end
149   def std_dev; aggregate.std_dev; end
150   def outliers_low; aggregate.outliers_low; end
151   def outliers_high; aggregate.outliers_high; end
152   def to_s(*args); aggregate.to_s *args; end
153   def each; aggregate.each { |*args| yield *args }; end
154   def each_nonzero; aggregate.each_nonzero { |*args| yield *args }; end