fix Ruby warnings
[raindrops.git] / lib / raindrops / aggregate / pmq.rb
bloba2dd45e6b25a9ac08674fbfb055327591eb2e428
1 # -*- encoding: binary -*-
2 require "tempfile"
3 require "aggregate"
4 require "posix_mq"
5 require "fcntl"
6 require "io/extra"
7 require "thread"
9 # \Aggregate + POSIX message queues support for Ruby 1.9 and \Linux
11 # This class is duck-type compatible with \Aggregate and allows us to
12 # aggregate and share statistics from multiple processes/threads aided
13 # POSIX message queues.  This is designed to be used with the
14 # Raindrops::LastDataRecv Rack application, but can be used independently
15 # on compatible Runtimes.
17 # Unlike the core of raindrops, this is only supported on Ruby 1.9 and
18 # Linux 2.6.  Using this class requires the following additional RubyGems
19 # or libraries:
21 # * aggregate (tested with 0.2.2)
22 # * io-extra  (tested with 1.2.3)
23 # * posix_mq  (tested with 1.0.0)
25 # == Design
27 # There is one master thread which aggregates statistics.  Individual
28 # worker processes or threads will write to a shared POSIX message
29 # queue (default: "/raindrops") that the master reads from.  At a
30 # predefined interval, the master thread will write out to a shared,
31 # anonymous temporary file that workers may read from
33 # Setting +:worker_interval+ and +:master_interval+ to +1+ will result
34 # in perfect accuracy but at the cost of a high synchronization
35 # overhead.  Larger intervals mean less frequent messaging for higher
36 # performance but lower accuracy.
37 class Raindrops::Aggregate::PMQ
39   # :stopdoc:
40   # These constants are for Linux.  This is designed for aggregating
41   # TCP_INFO.
42   RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256")
43   WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256")
44   UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256")
45   # :startdoc:
47   # returns the number of dropped messages sent to a POSIX message
48   # queue if non-blocking operation was desired with :lossy
49   attr_reader :nr_dropped
51   #
52   # Creates a new Raindrops::Aggregate::PMQ object
53   #
54   #   Raindrops::Aggregate::PMQ.new(options = {})  -> aggregate
55   #
56   # +options+ is a hash that accepts the following keys:
57   #
58   # * :queue - name of the POSIX message queue (default: "/raindrops")
59   # * :worker_interval - interval to send to the master (default: 10)
60   # * :master_interval - interval to for the master to write out (default: 5)
61   # * :lossy - workers drop packets if master cannot keep up (default: false)
62   # * :aggregate - \Aggregate object (default: \Aggregate.new)
63   # * :mq_umask - umask for creatingthe POSIX message queue (default: 0666)
64   #
65   def initialize(params = {})
66     opts = {
67       :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops",
68       :worker_interval => 10,
69       :master_interval => 5,
70       :lossy => false,
71       :mq_attr => nil,
72       :mq_umask => 0666,
73       :aggregate => Aggregate.new,
74     }.merge! params
75     @master_interval = opts[:master_interval]
76     @worker_interval = opts[:worker_interval]
77     @aggregate = opts[:aggregate]
78     @worker_queue = @worker_interval ? [] : nil
79     @mutex = Mutex.new
81     @mq_name = opts[:queue]
82     mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr]
83     Tempfile.open("raindrops_pmq") do |t|
84       @wr = File.open(t.path, "wb")
85       @rd = File.open(t.path, "rb")
86     end
87     @cached_aggregate = @aggregate
88     flush_master
89     @mq_send = if opts[:lossy]
90       @nr_dropped = 0
91       mq.nonblock = true
92       mq.method :trysend
93     else
94       mq.method :send
95     end
96   end
98   # adds a sample to the underlying \Aggregate object
99   def << val
100     if q = @worker_queue
101       q << val
102       if q.size >= @worker_interval
103         mq_send(q) or @nr_dropped += 1
104         q.clear
105       end
106     else
107       mq_send(val) or @nr_dropped += 1
108     end
109   end
111   def mq_send(val) # :nodoc:
112     @cached_aggregate = nil
113     @mq_send.call Marshal.dump(val)
114   end
116   #
117   # Starts running a master loop, usually in a dedicated thread or process:
118   #
119   #   Thread.new { agg.master_loop }
120   #
121   # Any worker can call +agg.stop_master_loop+ to stop the master loop
122   # (possibly causing the thread or process to exit)
123   def master_loop
124     buf = ""
125     a = @aggregate
126     nr = 0
127     mq = POSIX_MQ.new @mq_name, :r # this one is always blocking
128     begin
129       if (nr -= 1) < 0
130         nr = @master_interval
131         flush_master
132       end
133       mq.shift(buf)
134       data = begin
135         Marshal.load(buf) or return
136       rescue ArgumentError, TypeError
137         next
138       end
139       Array === data ? data.each { |x| a << x } : a << data
140     rescue Errno::EINTR
141     rescue => e
142       warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}"
143       break
144     end while true
145     ensure
146       flush_master
147   end
149   # Loads the last shared \Aggregate from the master thread/process
150   def aggregate
151     @cached_aggregate ||= begin
152       flush
153       Marshal.load(synchronize(@rd, RDLOCK) do |rd|
154         IO.pread rd.fileno, rd.stat.size, 0
155       end)
156     end
157   end
159   # Flushes the currently aggregate statistics to a temporary file.
160   # There is no need to call this explicitly as +:worker_interval+ defines
161   # how frequently your data will be flushed for workers to read.
162   def flush_master
163     dump = Marshal.dump @aggregate
164     synchronize(@wr, WRLOCK) do |wr|
165       wr.truncate 0
166       IO.pwrite wr.fileno, dump, 0
167     end
168   end
170   # stops the currently running master loop, may be called from any
171   # worker thread or process
172   def stop_master_loop
173     sleep 0.1 until mq_send(false)
174     rescue Errno::EINTR
175       retry
176   end
178   def lock! io, type # :nodoc:
179     io.fcntl Fcntl::F_SETLKW, type
180     rescue Errno::EINTR
181       retry
182   end
184   # we use both a mutex for thread-safety and fcntl lock for process-safety
185   def synchronize io, type # :nodoc:
186     @mutex.synchronize do
187       begin
188         lock! io, type
189         yield io
190       ensure
191         lock! io, UNLOCK
192       end
193     end
194   end
196   # flushes the local queue of the worker process, sending all pending
197   # data to the master.  There is no need to call this explicitly as
198   # +:worker_interval+ defines how frequently your queue will be flushed
199   def flush
200     if q = @local_queue && ! q.empty?
201       mq_send q
202       q.clear
203     end
204     nil
205   end
207   # proxy for \Aggregate#count
208   def count; aggregate.count; end
210   # proxy for \Aggregate#max
211   def max; aggregate.max; end
213   # proxy for \Aggregate#min
214   def min; aggregate.min; end
216   # proxy for \Aggregate#sum
217   def sum; aggregate.sum; end
219   # proxy for \Aggregate#mean
220   def mean; aggregate.mean; end
222   # proxy for \Aggregate#std_dev
223   def std_dev; aggregate.std_dev; end
225   # proxy for \Aggregate#outliers_low
226   def outliers_low; aggregate.outliers_low; end
228   # proxy for \Aggregate#outliers_high
229   def outliers_high; aggregate.outliers_high; end
231   # proxy for \Aggregate#to_s
232   def to_s(*args); aggregate.to_s(*args); end
234   # proxy for \Aggregate#each
235   def each; aggregate.each { |*args| yield(*args) }; end
237   # proxy for \Aggregate#each_nonzero
238   def each_nonzero; aggregate.each_nonzero { |*args| yield(*args) }; end