1 #!/usr/bin/env python2.2
2 from __future__
import generators
6 from os
.path
import dirname
, abspath
, join
7 import tempfile
, shutil
9 rox_lib
= dirname(dirname(dirname(abspath(sys
.argv
[0]))))
10 sys
.path
.insert(0, join(rox_lib
, 'python'))
12 from rox
import proxy
, tasks
, g
, suchild
, master_proxy
14 class Slave(suchild
.Slave
):
18 class TestProxy(unittest
.TestCase
):
24 from_slave
= os
.pipe()
25 self
.master
= master_proxy
.MasterProxy(to_slave
[1], from_slave
[0])
26 self
.slave
= proxy
.SlaveProxy(from_slave
[1], to_slave
[0], Slave())
36 response
= self
.master
.root
.invoke()
37 self
.master
.write_ready()
38 self
.slave
.read_ready()
39 self
.slave
.write_ready()
40 assert not response
.happened
41 self
.master
.read_ready()
42 assert response
.happened
43 self
.assertEquals('Invoked', response
.result
)
46 blocker
= self
.master
.root
.invoke()
47 self
.master
.write_ready()
48 self
.slave
.read_ready()
49 self
.slave
.write_ready()
50 self
.master
.read_ready()
51 self
.assertEquals('Invoked', blocker
.result
)
53 def testMissing(self
):
55 response
= self
.master
.root
.missing('foo')
59 assert 0, 'Expected an exception!'
60 except AttributeError:
66 def testTooSoon(self
):
68 response
= self
.master
.root
.invoke()
71 assert 0, 'Expected an exception!'
81 # spawnvpe, waitpid, setuid and getuid are tested in testsu.py
84 tmp_dir
= tempfile
.mktemp('-roxlib-test')
87 assert os
.path
.isdir(tmp_dir
)
88 response
= self
.master
.root
.rmtree(tmp_dir
)
90 assert response
.result
is None
91 assert not os
.path
.exists(tmp_dir
)
97 tmp
= tempfile
.mktemp('-roxlib-test')
98 file(tmp
, 'w').close()
100 assert os
.path
.isfile(tmp
)
101 response
= self
.master
.root
.unlink(tmp
)
103 assert response
.result
is None
104 assert not os
.path
.exists(tmp
)
109 def testFileRead(self
):
110 tmp_file
= tempfile
.mktemp(suffix
= '-roxlib-test')
111 s
= file(tmp_file
, 'w')
114 root
= self
.master
.root
116 response
= root
.open(tmp_file
)
118 stream
= response
.result
120 response
= root
.read(stream
, 5)
122 assert "Hello" == response
.result
124 response
= root
.close(stream
)
126 assert response
.result
is None
133 def testFileWrite(self
):
134 tmp_dir
= tempfile
.mktemp('-roxlib-test')
136 root
= self
.master
.root
137 tmp_file
= join(tmp_dir
, 'new')
139 response
= root
.open(tmp_file
, 'w')
141 stream
= response
.result
143 assert os
.path
.isfile(tmp_file
)
145 response
= root
.write(stream
, 'Hello\n')
147 assert response
.result
== None
149 response
= root
.close(stream
)
151 assert response
.result
is None
153 assert file(tmp_file
).read() == 'Hello\n'
155 response
= root
.close(stream
)
159 assert 0, 'Expected an exception!'
166 shutil
.rmtree(tmp_dir
)
168 def testRename(self
):
169 tmp_dir
= tempfile
.mktemp('-roxlib-test')
171 root
= self
.master
.root
172 f
= file(join(tmp_dir
, 'old'), 'w')
177 response
= root
.rename(join(tmp_dir
, 'old'),
178 join(tmp_dir
, 'new'))
180 assert response
.result
== None
182 assert file(join(tmp_dir
, 'new')).read() == 'Hello\n'
187 shutil
.rmtree(tmp_dir
)
190 tmp_file
= tempfile
.mktemp(suffix
= '-roxlib-test')
191 s
= file(tmp_file
, 'w')
193 root
= self
.master
.root
194 os
.chmod(tmp_file
, 0700)
197 assert os
.stat(tmp_file
).st_mode
& 0777 == 0700
198 response
= root
.chmod(tmp_file
, 0655)
201 assert os
.stat(tmp_file
).st_mode
& 0777 == 0655
207 suite
= unittest
.makeSuite(TestProxy
)
208 if __name__
== '__main__':
209 sys
.argv
.append('-v')