From 89dc5af4d59419e63a9d332fb4bdfa923205135e Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 28 Nov 2009 00:16:18 -0800 Subject: [PATCH] actor_spawn: basically ThreadSpawn with Actors for now... Rubinius Actor specs seem a bit lacking at the moment. If we find time, we'll fix them, otherwise we'll let somebody else do it. --- lib/rainbows/actor_spawn.rb | 104 ++++++++++++++++++++------------------------ 1 file changed, 46 insertions(+), 58 deletions(-) rewrite lib/rainbows/actor_spawn.rb (60%) diff --git a/lib/rainbows/actor_spawn.rb b/lib/rainbows/actor_spawn.rb dissimilarity index 60% index 5d86417..30e62a9 100644 --- a/lib/rainbows/actor_spawn.rb +++ b/lib/rainbows/actor_spawn.rb @@ -1,58 +1,46 @@ -# -*- encoding: binary -*- - -require 'actor' -module Rainbows - module ActorSpawn - include Base - - # runs inside each forked worker, this sits around and waits - # for connections and doesn't die until the parent dies (or is - # given a INT, QUIT, or TERM signal) - def worker_loop(worker) - init_worker_process(worker) - limit = worker_connections - root = Actor.current - clients = {} - - # ticker - Actor.spawn do - while true - sleep 1 - G.tick - end - end - - listeners = LISTENERS.map do |s| - Actor.spawn(s) do |l| - begin - while clients.size >= limit - logger.info "busy: clients=#{clients.size} >= limit=#{limit}" - Actor.receive { |filter| filter.when(:resume) {} } - end - Actor.spawn(l.accept) do |c| - clients[Actor.current] = false - begin - process_client(c) - ensure - root << Actor.current - end - end - rescue Errno::EAGAIN, Errno::ECONNABORTED - rescue => e - Error.listen_loop(e) - end while G.alive - end - end - - begin - Actor.receive do |filter| - filter.when(Actor) do |actor| - orig = clients.size - clients.delete(actor) - orig >= limit and listeners.each { |l| l << :resume } - end - end - end while G.alive || clients.size > 0 - end - end -end +# -*- encoding: binary -*- + +require 'actor' +module Rainbows + + # Actor concurrency model for Rubinius. We can't seem to get message + # passing working right, so we're throwing a Mutex into the mix for + # now. Hopefully somebody can fix things for us. + # + # This is different from the Revactor one which is not prone to race + # conditions at all (since it uses Fibers). + module ActorSpawn + include Base + + # runs inside each forked worker, this sits around and waits + # for connections and doesn't die until the parent dies (or is + # given a INT, QUIT, or TERM signal) + def worker_loop(worker) + init_worker_process(worker) + limit = worker_connections + nr = 0 + + # can't seem to get the message passing to work right at the moment :< + lock = Mutex.new + + begin + ret = IO.select(LISTENERS, nil, nil, 1) and ret.first.each do |l| + next if lock.synchronize { nr >= limit } + begin + Actor.spawn(l.accept_nonblock) do |c| + lock.synchronize { nr += 1 } + begin + process_client(c) + ensure + lock.synchronize { nr -= 1 } + end + end + rescue Errno::EAGAIN, Errno::ECONNABORTED + end + end + rescue => e + Error.listen_loop(e) + end while G.tick || lock.synchronize { nr > 0 } + end + end +end -- 2.11.4.GIT