1 # -*- encoding: binary -*-
9 # Aggregate + POSIX message queues support
10 class Raindrops::Aggregate::PMQ
12 # These constants are for Linux. Tthis is designed for aggregating
14 RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256")
15 WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256")
16 UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256")
18 attr_reader :nr_dropped
20 def initialize(params = {})
22 :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops",
23 :worker_interval => 10,
24 :master_interval => 5,
28 :aggregate => Aggregate.new,
30 @master_interval = opts[:master_interval]
31 @worker_interval = opts[:worker_interval]
32 @aggregate = opts[:aggregate]
33 @worker_queue = @worker_interval ? [] : nil
36 @mq_name = opts[:queue]
37 mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr]
38 Tempfile.open("raindrops_pmq") do |t|
39 @wr = File.open(t.path, "wb")
40 @rd = File.open(t.path, "rb")
42 @cached_aggregate = @aggregate
44 @mq_send = if opts[:lossy]
56 if q.size >= @worker_interval
57 mq_send(q) or @nr_dropped += 1
61 mq_send(val) or @nr_dropped += 1
66 @cached_aggregate = nil
67 @mq_send.call Marshal.dump(val)
74 mq = POSIX_MQ.new @mq_name, :r # this one is always blocking
82 Marshal.load(buf) or return
83 rescue ArgumentError, TypeError
86 Array === data ? data.each { |x| a << x } : a << data
89 warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}"
97 @cached_aggregate ||= begin
99 Marshal.load(synchronize(@rd, RDLOCK) do |rd|
100 IO.pread rd.fileno, rd.stat.size, 0
106 dump = Marshal.dump @aggregate
107 synchronize(@wr, WRLOCK) do |wr|
109 IO.pwrite wr.fileno, dump, 0
114 sleep 0.1 until mq_send(false)
120 io.fcntl Fcntl::F_SETLKW, type
125 def synchronize io, type
126 @mutex.synchronize do
137 if q = @local_queue && ! q.empty?
144 def count; aggregate.count; end
145 def max; aggregate.max; end
146 def min; aggregate.min; end
147 def sum; aggregate.sum; end
148 def mean; aggregate.mean; end
149 def std_dev; aggregate.std_dev; end
150 def outliers_low; aggregate.outliers_low; end
151 def outliers_high; aggregate.outliers_high; end
152 def to_s(*args); aggregate.to_s *args; end
153 def each; aggregate.each { |*args| yield *args }; end
154 def each_nonzero; aggregate.each_nonzero { |*args| yield *args }; end