4 from __future__
import absolute_import
18 from gi
.repository
import GLib
19 from gi
import PyGIDeprecationWarning
22 class IOChannel(unittest
.TestCase
):
24 self
.workdir
= tempfile
.mkdtemp()
26 self
.testutf8
= os
.path
.join(self
.workdir
, 'testutf8.txt')
27 with
open(self
.testutf8
, 'wb') as f
:
28 f
.write(u
'''hello ♥ world
31 À demain!'''.encode('UTF-8'))
33 self
.testlatin1
= os
.path
.join(self
.workdir
, 'testlatin1.txt')
34 with
open(self
.testlatin1
, 'wb') as f
:
35 f
.write(b
'''hell\xf8 world
40 self
.testout
= os
.path
.join(self
.workdir
, 'testout.txt')
43 shutil
.rmtree(self
.workdir
)
45 def test_file_readline_utf8(self
):
46 ch
= GLib
.IOChannel(filename
=self
.testutf8
)
47 self
.assertEqual(ch
.get_encoding(), 'UTF-8')
48 self
.assertTrue(ch
.get_close_on_unref())
49 self
.assertEqual(ch
.readline(), 'hello ♥ world\n')
50 self
.assertEqual(ch
.get_buffer_condition(), GLib
.IOCondition
.IN
)
51 self
.assertEqual(ch
.readline(), 'second line\n')
52 self
.assertEqual(ch
.readline(), '\n')
53 self
.assertEqual(ch
.readline(), 'À demain!')
54 self
.assertEqual(ch
.get_buffer_condition(), 0)
55 self
.assertEqual(ch
.readline(), '')
58 def test_file_readline_latin1(self
):
59 ch
= GLib
.IOChannel(filename
=self
.testlatin1
, mode
='r')
60 ch
.set_encoding('latin1')
61 self
.assertEqual(ch
.get_encoding(), 'latin1')
62 self
.assertEqual(ch
.readline(), 'hellø world\n')
63 self
.assertEqual(ch
.readline(), 'second line\n')
64 self
.assertEqual(ch
.readline(), '\n')
65 self
.assertEqual(ch
.readline(), 'À demain!')
68 def test_file_iter(self
):
70 ch
= GLib
.IOChannel(filename
=self
.testutf8
)
73 self
.assertEqual(len(items
), 4)
74 self
.assertEqual(items
[0], 'hello ♥ world\n')
77 def test_file_readlines(self
):
78 ch
= GLib
.IOChannel(filename
=self
.testutf8
)
79 lines
= ch
.readlines()
80 # Note, this really ought to be 4, but the static bindings add an extra
82 self
.assertGreaterEqual(len(lines
), 4)
83 self
.assertLessEqual(len(lines
), 5)
84 self
.assertEqual(lines
[0], 'hello ♥ world\n')
85 self
.assertEqual(lines
[3], 'À demain!')
87 self
.assertEqual(lines
[4], '')
89 def test_file_read(self
):
90 ch
= GLib
.IOChannel(filename
=self
.testutf8
)
91 with
open(self
.testutf8
, 'rb') as f
:
92 self
.assertEqual(ch
.read(), f
.read())
94 ch
= GLib
.IOChannel(filename
=self
.testutf8
)
95 with
open(self
.testutf8
, 'rb') as f
:
96 self
.assertEqual(ch
.read(10), f
.read(10))
98 ch
= GLib
.IOChannel(filename
=self
.testutf8
)
99 with
open(self
.testutf8
, 'rb') as f
:
100 self
.assertEqual(ch
.read(max_count
=15), f
.read(15))
103 ch
= GLib
.IOChannel(filename
=self
.testutf8
)
105 self
.assertEqual(ch
.read(3), b
'llo')
107 ch
.seek(2, 0) # SEEK_SET
108 self
.assertEqual(ch
.read(3), b
'llo')
110 ch
.seek(1, 1) # SEEK_CUR, skip the space
111 self
.assertEqual(ch
.read(3), b
'\xe2\x99\xa5')
113 ch
.seek(2, 2) # SEEK_END
114 # FIXME: does not work currently
115 # self.assertEqual(ch.read(2), b'n!')
117 # invalid whence value
118 self
.assertRaises(ValueError, ch
.seek
, 0, 3)
121 def test_file_write(self
):
122 ch
= GLib
.IOChannel(filename
=self
.testout
, mode
='w')
123 ch
.set_encoding('latin1')
124 ch
.write('hellø world\n')
126 ch
= GLib
.IOChannel(filename
=self
.testout
, mode
='a')
127 ch
.set_encoding('latin1')
128 ch
.write('À demain!')
131 with
open(self
.testout
, 'rb') as f
:
132 self
.assertEqual(f
.read().decode('latin1'), u
'hellø world\nÀ demain!')
134 def test_file_writelines(self
):
135 ch
= GLib
.IOChannel(filename
=self
.testout
, mode
='w')
136 ch
.writelines(['foo', 'bar\n', 'baz\n', 'end'])
139 with
open(self
.testout
, 'r') as f
:
140 self
.assertEqual(f
.read(), 'foobar\nbaz\nend')
142 def test_buffering(self
):
143 writer
= GLib
.IOChannel(filename
=self
.testout
, mode
='w')
144 writer
.set_encoding(None)
145 self
.assertTrue(writer
.get_buffered())
146 self
.assertGreater(writer
.get_buffer_size(), 10)
148 reader
= GLib
.IOChannel(filename
=self
.testout
, mode
='r')
150 # does not get written immediately on buffering
152 self
.assertEqual(reader
.read(), b
'')
154 self
.assertEqual(reader
.read(), b
'abc')
156 # does get written immediately without buffering
157 writer
.set_buffered(False)
159 self
.assertEqual(reader
.read(), b
'def')
161 # writes after buffer overflow
162 writer
.set_buffer_size(10)
163 writer
.write('0123456789012')
164 self
.assertTrue(reader
.read().startswith(b
'012'))
166 reader
.read() # ignore bits written after flushing
169 writer
.set_buffered(True)
171 writer
.shutdown(True)
172 self
.assertEqual(reader
.read(), b
'ghi')
173 reader
.shutdown(True)
175 @unittest.skipIf(os
.name
== "nt", "NONBLOCK not implemented on Windows")
176 def test_fd_read(self
):
179 ch
= GLib
.IOChannel(filedes
=r
)
180 ch
.set_encoding(None)
181 ch
.set_flags(ch
.get_flags() | GLib
.IOFlags
.NONBLOCK
)
182 self
.assertNotEqual(ch
.get_flags() | GLib
.IOFlags
.NONBLOCK
, 0)
183 self
.assertEqual(ch
.read(), b
'')
184 os
.write(w
, b
'\x01\x02')
185 self
.assertEqual(ch
.read(), b
'\x01\x02')
187 # now test blocking case, after closing the write end
188 ch
.set_flags(GLib
.IOFlags(ch
.get_flags() & ~GLib
.IOFlags
.NONBLOCK
))
189 os
.write(w
, b
'\x03\x04')
191 self
.assertEqual(ch
.read(), b
'\x03\x04')
195 @unittest.skipUnless(fcntl
, "no fcntl")
196 def test_fd_write(self
):
198 fcntl
.fcntl(r
, fcntl
.F_SETFL
, fcntl
.fcntl(r
, fcntl
.F_GETFL
) | os
.O_NONBLOCK
)
200 ch
= GLib
.IOChannel(filedes
=w
, mode
='w')
201 ch
.set_encoding(None)
202 ch
.set_buffered(False)
203 ch
.write(b
'\x01\x02')
204 self
.assertEqual(os
.read(r
, 10), b
'\x01\x02')
206 # now test blocking case, after closing the write end
207 fcntl
.fcntl(r
, fcntl
.F_SETFL
, fcntl
.fcntl(r
, fcntl
.F_GETFL
) & ~os
.O_NONBLOCK
)
208 ch
.write(b
'\x03\x04')
210 self
.assertEqual(os
.read(r
, 10), b
'\x03\x04')
213 @unittest.skipIf(os
.name
== "nt", "NONBLOCK not implemented on Windows")
214 def test_deprecated_method_add_watch_no_data(self
):
217 ch
= GLib
.IOChannel(filedes
=r
)
218 ch
.set_encoding(None)
219 ch
.set_flags(ch
.get_flags() | GLib
.IOFlags
.NONBLOCK
)
223 def cb(channel
, condition
):
224 self
.assertEqual(channel
, ch
)
225 self
.assertEqual(condition
, GLib
.IOCondition
.IN
)
226 cb_reads
.append(channel
.read())
227 if len(cb_reads
) == 2:
231 # io_add_watch() method is deprecated, use GLib.io_add_watch
232 with warnings
.catch_warnings(record
=True) as warn
:
233 warnings
.simplefilter('always')
234 ch
.add_watch(GLib
.IOCondition
.IN
, cb
, priority
=GLib
.PRIORITY_HIGH
)
235 self
.assertTrue(issubclass(warn
[0].category
, PyGIDeprecationWarning
))
239 GLib
.idle_add(lambda: os
.write(w
, b
'b') and False)
243 GLib
.timeout_add(2000, ml
.quit
)
246 self
.assertEqual(cb_reads
, [b
'a', b
'b'])
248 @unittest.skipIf(os
.name
== "nt", "NONBLOCK not implemented on Windows")
249 def test_deprecated_method_add_watch_data_priority(self
):
252 ch
= GLib
.IOChannel(filedes
=r
)
253 ch
.set_encoding(None)
254 ch
.set_flags(ch
.get_flags() | GLib
.IOFlags
.NONBLOCK
)
258 def cb(channel
, condition
, data
):
259 self
.assertEqual(channel
, ch
)
260 self
.assertEqual(condition
, GLib
.IOCondition
.IN
)
261 self
.assertEqual(data
, 'hello')
262 cb_reads
.append(channel
.read())
263 if len(cb_reads
) == 2:
268 # io_add_watch() method is deprecated, use GLib.io_add_watch
269 with warnings
.catch_warnings(record
=True) as warn
:
270 warnings
.simplefilter('always')
271 id = ch
.add_watch(GLib
.IOCondition
.IN
, cb
, 'hello', priority
=GLib
.PRIORITY_HIGH
)
272 self
.assertTrue(issubclass(warn
[0].category
, PyGIDeprecationWarning
))
274 self
.assertEqual(ml
.get_context().find_source_by_id(id).priority
,
279 GLib
.idle_add(lambda: os
.write(w
, b
'b') and False)
282 GLib
.timeout_add(2000, ml
.quit
)
285 self
.assertEqual(cb_reads
, [b
'a', b
'b'])
287 @unittest.skipIf(os
.name
== "nt", "NONBLOCK not implemented on Windows")
288 def test_add_watch_no_data(self
):
291 ch
= GLib
.IOChannel(filedes
=r
)
292 ch
.set_encoding(None)
293 ch
.set_flags(ch
.get_flags() | GLib
.IOFlags
.NONBLOCK
)
297 def cb(channel
, condition
):
298 self
.assertEqual(channel
, ch
)
299 self
.assertEqual(condition
, GLib
.IOCondition
.IN
)
300 cb_reads
.append(channel
.read())
301 if len(cb_reads
) == 2:
305 id = GLib
.io_add_watch(ch
, GLib
.PRIORITY_HIGH
, GLib
.IOCondition
.IN
, cb
)
308 self
.assertEqual(ml
.get_context().find_source_by_id(id).priority
,
313 GLib
.idle_add(lambda: os
.write(w
, b
'b') and False)
316 GLib
.timeout_add(2000, ml
.quit
)
319 self
.assertEqual(cb_reads
, [b
'a', b
'b'])
321 @unittest.skipIf(os
.name
== "nt", "NONBLOCK not implemented on Windows")
322 def test_add_watch_with_data(self
):
325 ch
= GLib
.IOChannel(filedes
=r
)
326 ch
.set_encoding(None)
327 ch
.set_flags(ch
.get_flags() | GLib
.IOFlags
.NONBLOCK
)
331 def cb(channel
, condition
, data
):
332 self
.assertEqual(channel
, ch
)
333 self
.assertEqual(condition
, GLib
.IOCondition
.IN
)
334 self
.assertEqual(data
, 'hello')
335 cb_reads
.append(channel
.read())
336 if len(cb_reads
) == 2:
340 id = GLib
.io_add_watch(ch
, GLib
.PRIORITY_HIGH
, GLib
.IOCondition
.IN
, cb
, 'hello')
343 self
.assertEqual(ml
.get_context().find_source_by_id(id).priority
,
348 GLib
.idle_add(lambda: os
.write(w
, b
'b') and False)
351 GLib
.timeout_add(2000, ml
.quit
)
354 self
.assertEqual(cb_reads
, [b
'a', b
'b'])
356 @unittest.skipIf(os
.name
== "nt", "NONBLOCK not implemented on Windows")
357 def test_add_watch_with_multi_data(self
):
360 ch
= GLib
.IOChannel(filedes
=r
)
361 ch
.set_encoding(None)
362 ch
.set_flags(ch
.get_flags() | GLib
.IOFlags
.NONBLOCK
)
366 def cb(channel
, condition
, data1
, data2
, data3
):
367 self
.assertEqual(channel
, ch
)
368 self
.assertEqual(condition
, GLib
.IOCondition
.IN
)
369 self
.assertEqual(data1
, 'a')
370 self
.assertEqual(data2
, 'b')
371 self
.assertEqual(data3
, 'c')
372 cb_reads
.append(channel
.read())
373 if len(cb_reads
) == 2:
377 id = GLib
.io_add_watch(ch
, GLib
.PRIORITY_HIGH
, GLib
.IOCondition
.IN
, cb
,
381 self
.assertEqual(ml
.get_context().find_source_by_id(id).priority
,
386 GLib
.idle_add(lambda: os
.write(w
, b
'b') and False)
389 GLib
.timeout_add(2000, ml
.quit
)
392 self
.assertEqual(cb_reads
, [b
'a', b
'b'])
394 @unittest.skipIf(os
.name
== "nt", "NONBLOCK not implemented on Windows")
395 def test_deprecated_add_watch_no_data(self
):
398 ch
= GLib
.IOChannel(filedes
=r
)
399 ch
.set_encoding(None)
400 ch
.set_flags(ch
.get_flags() | GLib
.IOFlags
.NONBLOCK
)
404 def cb(channel
, condition
):
405 self
.assertEqual(channel
, ch
)
406 self
.assertEqual(condition
, GLib
.IOCondition
.IN
)
407 cb_reads
.append(channel
.read())
408 if len(cb_reads
) == 2:
412 with warnings
.catch_warnings(record
=True) as warn
:
413 warnings
.simplefilter('always')
414 id = GLib
.io_add_watch(ch
, GLib
.IOCondition
.IN
, cb
, priority
=GLib
.PRIORITY_HIGH
)
415 self
.assertTrue(issubclass(warn
[0].category
, PyGIDeprecationWarning
))
418 self
.assertEqual(ml
.get_context().find_source_by_id(id).priority
,
423 GLib
.idle_add(lambda: os
.write(w
, b
'b') and False)
426 GLib
.timeout_add(2000, ml
.quit
)
429 self
.assertEqual(cb_reads
, [b
'a', b
'b'])
431 @unittest.skipIf(os
.name
== "nt", "NONBLOCK not implemented on Windows")
432 def test_deprecated_add_watch_with_data(self
):
435 ch
= GLib
.IOChannel(filedes
=r
)
436 ch
.set_encoding(None)
437 ch
.set_flags(ch
.get_flags() | GLib
.IOFlags
.NONBLOCK
)
441 def cb(channel
, condition
, data
):
442 self
.assertEqual(channel
, ch
)
443 self
.assertEqual(condition
, GLib
.IOCondition
.IN
)
444 self
.assertEqual(data
, 'hello')
445 cb_reads
.append(channel
.read())
446 if len(cb_reads
) == 2:
450 with warnings
.catch_warnings(record
=True) as warn
:
451 warnings
.simplefilter('always')
452 id = GLib
.io_add_watch(ch
, GLib
.IOCondition
.IN
, cb
, 'hello',
453 priority
=GLib
.PRIORITY_HIGH
)
454 self
.assertTrue(issubclass(warn
[0].category
, PyGIDeprecationWarning
))
457 self
.assertEqual(ml
.get_context().find_source_by_id(id).priority
,
462 GLib
.idle_add(lambda: os
.write(w
, b
'b') and False)
466 GLib
.timeout_add(2000, ml
.quit
)
469 self
.assertEqual(cb_reads
, [b
'a', b
'b'])
471 def test_backwards_compat_flags(self
):
472 with warnings
.catch_warnings():
473 warnings
.simplefilter('ignore', PyGIDeprecationWarning
)
475 self
.assertEqual(GLib
.IOCondition
.IN
, GLib
.IO_IN
)
476 self
.assertEqual(GLib
.IOFlags
.NONBLOCK
, GLib
.IO_FLAG_NONBLOCK
)
477 self
.assertEqual(GLib
.IOFlags
.IS_SEEKABLE
, GLib
.IO_FLAG_IS_SEEKABLE
)
478 self
.assertEqual(GLib
.IOStatus
.NORMAL
, GLib
.IO_STATUS_NORMAL
)