1 # -*- encoding: binary -*-
9 # \Aggregate + POSIX message queues support for Ruby 1.9 and \Linux
11 # This class is duck-type compatible with \Aggregate and allows us to
12 # aggregate and share statistics from multiple processes/threads aided
13 # POSIX message queues. This is designed to be used with the
14 # Raindrops::LastDataRecv Rack application, but can be used independently
15 # on compatible Runtimes.
17 # Unlike the core of raindrops, this is only supported on Ruby 1.9 and
18 # Linux 2.6. Using this class requires the following additional RubyGems
21 # * aggregate (tested with 0.2.2)
22 # * io-extra (tested with 1.2.3)
23 # * posix_mq (tested with 1.0.0)
27 # There is one master thread which aggregates statistics. Individual
28 # worker processes or threads will write to a shared POSIX message
29 # queue (default: "/raindrops") that the master reads from. At a
30 # predefined interval, the master thread will write out to a shared,
31 # anonymous temporary file that workers may read from
33 # Setting +:worker_interval+ and +:master_interval+ to +1+ will result
34 # in perfect accuracy but at the cost of a high synchronization
35 # overhead. Larger intervals mean less frequent messaging for higher
36 # performance but lower accuracy.
37 class Raindrops::Aggregate::PMQ
40 # These constants are for Linux. This is designed for aggregating
42 RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256")
43 WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256")
44 UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256")
47 # returns the number of dropped messages sent to a POSIX message
48 # queue if non-blocking operation was desired with :lossy
49 attr_reader :nr_dropped
52 # Creates a new Raindrops::Aggregate::PMQ object
54 # Raindrops::Aggregate::PMQ.new(options = {}) -> aggregate
56 # +options+ is a hash that accepts the following keys:
58 # * :queue - name of the POSIX message queue (default: "/raindrops")
59 # * :worker_interval - interval to send to the master (default: 10)
60 # * :master_interval - interval to for the master to write out (default: 5)
61 # * :lossy - workers drop packets if master cannot keep up (default: false)
62 # * :aggregate - \Aggregate object (default: \Aggregate.new)
63 # * :mq_umask - umask for creatingthe POSIX message queue (default: 0666)
65 def initialize(params = {})
67 :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops",
68 :worker_interval => 10,
69 :master_interval => 5,
73 :aggregate => Aggregate.new,
75 @master_interval = opts[:master_interval]
76 @worker_interval = opts[:worker_interval]
77 @aggregate = opts[:aggregate]
78 @worker_queue = @worker_interval ? [] : nil
81 @mq_name = opts[:queue]
82 mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr]
83 Tempfile.open("raindrops_pmq") do |t|
84 @wr = File.open(t.path, "wb")
85 @rd = File.open(t.path, "rb")
87 @cached_aggregate = @aggregate
89 @mq_send = if opts[:lossy]
98 # adds a sample to the underlying \Aggregate object
102 if q.size >= @worker_interval
103 mq_send(q) or @nr_dropped += 1
107 mq_send(val) or @nr_dropped += 1
111 def mq_send(val) # :nodoc:
112 @cached_aggregate = nil
113 @mq_send.call Marshal.dump(val)
117 # Starts running a master loop, usually in a dedicated thread or process:
119 # Thread.new { agg.master_loop }
121 # Any worker can call +agg.stop_master_loop+ to stop the master loop
122 # (possibly causing the thread or process to exit)
127 mq = POSIX_MQ.new @mq_name, :r # this one is always blocking
130 nr = @master_interval
135 Marshal.load(buf) or return
136 rescue ArgumentError, TypeError
139 Array === data ? data.each { |x| a << x } : a << data
142 warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}"
149 # Loads the last shared \Aggregate from the master thread/process
151 @cached_aggregate ||= begin
153 Marshal.load(synchronize(@rd, RDLOCK) do |rd|
154 IO.pread rd.fileno, rd.stat.size, 0
159 # Flushes the currently aggregate statistics to a temporary file.
160 # There is no need to call this explicitly as +:worker_interval+ defines
161 # how frequently your data will be flushed for workers to read.
163 dump = Marshal.dump @aggregate
164 synchronize(@wr, WRLOCK) do |wr|
166 IO.pwrite wr.fileno, dump, 0
170 # stops the currently running master loop, may be called from any
171 # worker thread or process
173 sleep 0.1 until mq_send(false)
178 def lock! io, type # :nodoc:
179 io.fcntl Fcntl::F_SETLKW, type
184 # we use both a mutex for thread-safety and fcntl lock for process-safety
185 def synchronize io, type # :nodoc:
186 @mutex.synchronize do
196 # flushes the local queue of the worker process, sending all pending
197 # data to the master. There is no need to call this explicitly as
198 # +:worker_interval+ defines how frequently your queue will be flushed
200 if q = @local_queue && ! q.empty?
207 # proxy for \Aggregate#count
208 def count; aggregate.count; end
210 # proxy for \Aggregate#max
211 def max; aggregate.max; end
213 # proxy for \Aggregate#min
214 def min; aggregate.min; end
216 # proxy for \Aggregate#sum
217 def sum; aggregate.sum; end
219 # proxy for \Aggregate#mean
220 def mean; aggregate.mean; end
222 # proxy for \Aggregate#std_dev
223 def std_dev; aggregate.std_dev; end
225 # proxy for \Aggregate#outliers_low
226 def outliers_low; aggregate.outliers_low; end
228 # proxy for \Aggregate#outliers_high
229 def outliers_high; aggregate.outliers_high; end
231 # proxy for \Aggregate#to_s
232 def to_s(*args); aggregate.to_s(*args); end
234 # proxy for \Aggregate#each
235 def each; aggregate.each { |*args| yield(*args) }; end
237 # proxy for \Aggregate#each_nonzero
238 def each_nonzero; aggregate.each_nonzero { |*args| yield(*args) }; end