fixed message passing stuf
[tore.git] / tord / pyro.py
blob499fed660eb9b869390fb90faf49d7b338a072e7
1 import Pyro.core, Pyro.errors
2 from socket import gethostname, gethostbyname
3 from threading import Thread
4 from common import log, test
6 def ping(uri):
7 try: Pyro.core.getProxyForURI(uri)
8 except: return False
9 return True
11 def shutdown(obj, daemon, client = False):
12 obj.looping = False
13 obj.shutdown()
14 daemon.shutdown()
16 def run(obj, name, client = False, daemon = None):
17 Pyro.core.initServer()
18 if client: Pyro.core.initClient()
19 try:
20 daemon = Pyro.core.Daemon()
21 uri = daemon.connect(obj, name)
22 addr = gethostbyname(gethostname())
23 if client: obj.register(addr, obj.get_core())
24 obj.start()
25 log.info(uri)
26 daemon.requestLoop(lambda: obj.looping, 1)
27 except: log.debug("Exception in: " + name)
28 finally:
29 if daemon: shutdown(obj, daemon, client)
31 class PyroBase(Thread, Pyro.core.ObjBase):
32 def __init__(self):
33 Pyro.core.ObjBase.__init__(self)
34 Thread.__init__(self)
35 self.setDaemon(True)
36 self.looping = True
38 def run(self):
39 self.go()
40 self.looping = False
42 def PyroClient(Client, server=None):
43 class PyroClient(PyroBase, Client):
44 def __init__(self, uri):
45 PyroBase.__init__(self)
46 Client.__init__(self)
47 self.uri = uri
49 def get_core(self):
50 return Pyro.core.getProxyForURI(self.uri)
52 def register(self, name, core):
53 core.join(name, self.getProxy())
55 if not server: server = gethostname()
56 uri = "PYROLOC://"+server+":7766/tord1"
57 if ping(uri):
58 run(PyroClient(uri), "torc", True)
59 else: log.error("unable to find server")
61 def PyroServer(Server, name):
62 class PyroServer(PyroBase, Server):
63 def __init__(self):
64 PyroBase.__init__(self)
65 Server.__init__(self)
67 self.publishMutex = Pyro.util.getLockObject()
68 self.clients = []
70 def join(self, name, cb):
71 log.debug("PYRO joined: " + name)
72 self.clients.append((name, cb))
74 @test
75 def publish(self, msg):
76 for name, cb in self.clients:
77 try:
78 self.publishMutex.acquire()
79 try:
80 cb._transferThread()
81 cb.message(msg)
82 finally:
83 self.publishMutex.release()
84 except Pyro.errors.ConnectionClosedError, x:
85 log.debug("PYRO removed: %s %s" % (name, x))
86 if (name, cb) in self.clients:
87 self.clients.remove((name, cb))
88 except Pyro.errors.ProtocolError, x:
89 log.error("Pyro.protoco: " + str(x))
90 except RuntimeError, x:
91 log.error("runtime: " + str(x))
94 run(PyroServer(), "tord1")