2 from __future__
import generators
6 from os
.path
import dirname
, abspath
, join
7 import tempfile
, shutil
9 rox_lib
= dirname(dirname(dirname(abspath(sys
.argv
[0]))))
10 sys
.path
.insert(0, join(rox_lib
, 'python'))
12 from rox
import proxy
, tasks
, g
, suchild
, master_proxy
14 class Slave(suchild
.Slave
):
18 class TestProxy(unittest
.TestCase
):
24 from_slave
= os
.pipe()
25 self
.master
= master_proxy
.MasterProxy(to_slave
[1], from_slave
[0])
26 self
.slave
= proxy
.SlaveProxy(from_slave
[1], to_slave
[0], Slave())
36 response
= self
.master
.root
.invoke()
37 self
.master
.write_ready()
38 self
.slave
.read_ready()
39 self
.slave
.write_ready()
40 assert not response
.happened
41 self
.master
.read_ready()
42 assert response
.happened
43 self
.assertEquals('Invoked', response
.result
)
46 blocker
= self
.master
.root
.invoke()
47 self
.master
.write_ready()
48 self
.slave
.read_ready()
49 self
.slave
.write_ready()
50 self
.master
.read_ready()
51 self
.assertEquals('Invoked', blocker
.result
)
53 def testMissing(self
):
55 response
= self
.master
.root
.missing('foo')
59 assert 0, 'Expected an exception!'
60 except AttributeError:
66 def testTooSoon(self
):
68 response
= self
.master
.root
.invoke()
71 assert 0, 'Expected an exception!'
81 # spawnvpe, waitpid, setuid and getuid are tested in testsu.py
84 tmp_dir
= tempfile
.mkdtemp('-roxlib-test')
86 assert os
.path
.isdir(tmp_dir
)
87 response
= self
.master
.root
.rmtree(tmp_dir
)
89 assert response
.result
is None
90 assert not os
.path
.exists(tmp_dir
)
96 fd
, tmp
= tempfile
.mkstemp('-roxlib-test')
99 assert os
.path
.isfile(tmp
)
100 response
= self
.master
.root
.unlink(tmp
)
102 assert response
.result
is None
103 assert not os
.path
.exists(tmp
)
108 def testFileRead(self
):
109 tmp_file
= tempfile
.NamedTemporaryFile(suffix
= '-roxlib-test')
110 tmp_file
.write('Hello\n')
112 root
= self
.master
.root
114 response
= root
.open(tmp_file
.name
)
116 stream
= response
.result
118 response
= root
.read(stream
, 5)
120 assert "Hello" == response
.result
122 response
= root
.close(stream
)
124 assert response
.result
is None
130 def testFileWrite(self
):
131 tmp_dir
= tempfile
.mkdtemp('-roxlib-test')
132 root
= self
.master
.root
133 tmp_file
= join(tmp_dir
, 'new')
135 response
= root
.open(tmp_file
, 'w')
137 stream
= response
.result
139 assert os
.path
.isfile(tmp_file
)
141 response
= root
.write(stream
, 'Hello\n')
143 assert response
.result
== None
145 response
= root
.close(stream
)
147 assert response
.result
is None
149 assert file(tmp_file
).read() == 'Hello\n'
151 response
= root
.close(stream
)
155 assert 0, 'Expected an exception!'
162 shutil
.rmtree(tmp_dir
)
164 def testRename(self
):
165 tmp_dir
= tempfile
.mkdtemp('-roxlib-test')
166 root
= self
.master
.root
167 f
= file(join(tmp_dir
, 'old'), 'w')
172 response
= root
.rename(join(tmp_dir
, 'old'),
173 join(tmp_dir
, 'new'))
175 assert response
.result
== None
177 assert file(join(tmp_dir
, 'new')).read() == 'Hello\n'
182 shutil
.rmtree(tmp_dir
)
185 tmp_file
= tempfile
.NamedTemporaryFile(suffix
= '-roxlib-test')
186 root
= self
.master
.root
187 os
.chmod(tmp_file
.name
, 0700)
190 assert os
.stat(tmp_file
.name
).st_mode
& 0777 == 0700
191 response
= root
.chmod(tmp_file
.name
, 0655)
194 assert os
.stat(tmp_file
.name
).st_mode
& 0777 == 0655
200 sys
.argv
.append('-v')