Moved into a sub-dir, so that a svn checkout has the same structure as
[rox-lib/lack.git] / ROX-Lib2 / tests / python / testproxy.py
blob063e0e0bf55ed47ff79acadc5d145ecd5cc60bd9
1 #!/usr/bin/env python2.3
2 from __future__ import generators
3 import unittest
4 import sys
5 import os
6 from os.path import dirname, abspath, join
7 import tempfile, shutil
9 rox_lib = dirname(dirname(dirname(abspath(sys.argv[0]))))
10 sys.path.insert(0, join(rox_lib, 'python'))
12 from rox import proxy, tasks, g, suchild, master_proxy
14 class Slave(suchild.Slave):
15 def invoke(self):
16 return "Invoked"
18 class TestProxy(unittest.TestCase):
19 master = None
20 slave = None
22 def setUp(self):
23 to_slave = os.pipe()
24 from_slave = os.pipe()
25 self.master = master_proxy.MasterProxy(to_slave[1], from_slave[0])
26 self.slave = proxy.SlaveProxy(from_slave[1], to_slave[0], Slave())
28 def tearDown(self):
29 self.master.finish()
30 self.slave.finish()
32 def testSetup(self):
33 pass
35 def testManual(self):
36 response = self.master.root.invoke()
37 self.master.write_ready()
38 self.slave.read_ready()
39 self.slave.write_ready()
40 assert not response.happened
41 self.master.read_ready()
42 assert response.happened
43 self.assertEquals('Invoked', response.result)
45 def testSingle(self):
46 blocker = self.master.root.invoke()
47 self.master.write_ready()
48 self.slave.read_ready()
49 self.slave.write_ready()
50 self.master.read_ready()
51 self.assertEquals('Invoked', blocker.result)
53 def testMissing(self):
54 def run():
55 response = self.master.root.missing('foo')
56 yield response
57 try:
58 response.result
59 assert 0, 'Expected an exception!'
60 except AttributeError:
61 pass
62 g.main_quit()
63 tasks.Task(run())
64 g.main()
66 def testTooSoon(self):
67 def run():
68 response = self.master.root.invoke()
69 try:
70 response.result
71 assert 0, 'Expected an exception!'
72 except Exception:
73 pass
74 yield response
75 response.result
76 g.main_quit()
77 tasks.Task(run())
78 g.main()
81 # spawnvpe, waitpid, setuid and getuid are tested in testsu.py
83 def testRmTree(self):
84 tmp_dir = tempfile.mktemp('-roxlib-test')
85 os.mkdir(tmp_dir)
86 def run():
87 assert os.path.isdir(tmp_dir)
88 response = self.master.root.rmtree(tmp_dir)
89 yield response
90 assert response.result is None
91 assert not os.path.exists(tmp_dir)
92 g.main_quit()
93 tasks.Task(run())
94 g.main()
96 def testUnlink(self):
97 tmp = tempfile.mktemp('-roxlib-test')
98 file(tmp, 'w').close()
99 def run():
100 assert os.path.isfile(tmp)
101 response = self.master.root.unlink(tmp)
102 yield response
103 assert response.result is None
104 assert not os.path.exists(tmp)
105 g.main_quit()
106 tasks.Task(run())
107 g.main()
109 def testFileRead(self):
110 tmp_file = tempfile.mktemp(suffix = '-roxlib-test')
111 s = file(tmp_file, 'w')
112 s.write('Hello\n')
113 s.close()
114 root = self.master.root
115 def run():
116 response = root.open(tmp_file)
117 yield response
118 stream = response.result
120 response = root.read(stream, 5)
121 yield response
122 assert "Hello" == response.result
124 response = root.close(stream)
125 yield response
126 assert response.result is None
128 g.main_quit()
129 tasks.Task(run())
130 g.main()
131 os.unlink(tmp_file)
133 def testFileWrite(self):
134 tmp_dir = tempfile.mktemp('-roxlib-test')
135 os.mkdir(tmp_dir)
136 root = self.master.root
137 tmp_file = join(tmp_dir, 'new')
138 def run():
139 response = root.open(tmp_file, 'w')
140 yield response
141 stream = response.result
143 assert os.path.isfile(tmp_file)
145 response = root.write(stream, 'Hello\n')
146 yield response
147 assert response.result == None
149 response = root.close(stream)
150 yield response
151 assert response.result is None
153 assert file(tmp_file).read() == 'Hello\n'
155 response = root.close(stream)
156 yield response
157 try:
158 response.result
159 assert 0, 'Expected an exception!'
160 except KeyError:
161 pass
163 g.main_quit()
164 tasks.Task(run())
165 g.main()
166 shutil.rmtree(tmp_dir)
168 def testRename(self):
169 tmp_dir = tempfile.mktemp('-roxlib-test')
170 os.mkdir(tmp_dir)
171 root = self.master.root
172 f = file(join(tmp_dir, 'old'), 'w')
173 f.write('Hello\n')
174 f.close()
176 def run():
177 response = root.rename(join(tmp_dir, 'old'),
178 join(tmp_dir, 'new'))
179 yield response
180 assert response.result == None
182 assert file(join(tmp_dir, 'new')).read() == 'Hello\n'
184 g.main_quit()
185 tasks.Task(run())
186 g.main()
187 shutil.rmtree(tmp_dir)
189 def testChmod(self):
190 tmp_file = tempfile.mktemp(suffix = '-roxlib-test')
191 s = file(tmp_file, 'w')
192 s.close()
193 root = self.master.root
194 os.chmod(tmp_file, 0700)
196 def run():
197 assert os.stat(tmp_file).st_mode & 0777 == 0700
198 response = root.chmod(tmp_file, 0655)
199 yield response
200 response.result
201 assert os.stat(tmp_file).st_mode & 0777 == 0655
202 g.main_quit()
203 tasks.Task(run())
204 g.main()
205 os.unlink(tmp_file)
207 suite = unittest.makeSuite(TestProxy)
208 if __name__ == '__main__':
209 sys.argv.append('-v')
210 unittest.main()