1 # -*- encoding: binary -*-
8 # Aggregate + POSIX message queues support
9 class Raindrops::Aggregate::PMQ
10 RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256")
11 WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256")
12 UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256")
14 attr_reader :nr_dropped
16 def initialize(params = {})
18 :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops",
19 :worker_interval => 10,
20 :master_interval => 5,
24 :aggregate => Aggregate.new,
26 @master_interval = opts[:master_interval]
27 @worker_interval = opts[:worker_interval]
28 @aggregate = opts[:aggregate]
29 @worker_queue = @worker_interval ? [] : nil
31 @mq_name = opts[:queue]
32 mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr]
33 Tempfile.open("raindrops_pmq") do |t|
34 @wr = File.open(t.path, "wb")
35 @rd = File.open(t.path, "rb")
37 @cached_aggregate = @aggregate
39 @mq_send = if opts[:lossy]
51 if q.size >= @worker_interval
52 mq_send(q) or @nr_dropped += 1
56 mq_send(val) or @nr_dropped += 1
61 @cached_aggregate = nil
62 @mq_send.call Marshal.dump(val)
69 mq = POSIX_MQ.new @mq_name, :r # this one is always blocking
77 Marshal.load(buf) or return
78 rescue ArgumentError, TypeError
81 Array === data ? data.each { |x| a << x } : a << data
84 warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}"
92 @cached_aggregate ||= begin
94 Marshal.load(synchronize(@rd, RDLOCK) do |rd|
95 IO.pread rd.fileno, rd.stat.size, 0
101 dump = Marshal.dump @aggregate
102 synchronize(@wr, WRLOCK) do |wr|
104 IO.pwrite wr.fileno, dump, 0
109 sleep 0.1 until mq_send(false)
115 io.fcntl Fcntl::F_SETLKW, type
120 def synchronize io, type
128 if q = @local_queue && ! q.empty?
135 def count; aggregate.count; end
136 def max; aggregate.max; end
137 def min; aggregate.min; end
138 def sum; aggregate.sum; end
139 def mean; aggregate.mean; end
140 def std_dev; aggregate.std_dev; end
141 def outliers_low; aggregate.outliers_low; end
142 def outliers_high; aggregate.outliers_high; end
143 def to_s(*args); aggregate.to_s *args; end
144 def each; aggregate.each { |*args| yield *args }; end
145 def each_nonzero; aggregate.each_nonzero { |*args| yield *args }; end