Some error handling/reporting fixes.
[pygobject.git] / tests / test_iochannel.py
blobaa130ec66628ee255764dba3310a4290a302bb69
1 # -*- Mode: Python -*-
2 # encoding: UTF-8
3 from __future__ import unicode_literals
5 import unittest
6 import tempfile
7 import os.path
8 import fcntl
9 import shutil
10 import warnings
12 from gi.repository import GLib
13 from gi import PyGIDeprecationWarning
15 from compathelper import _unicode
18 class IOChannel(unittest.TestCase):
19 def setUp(self):
20 self.workdir = tempfile.mkdtemp()
22 self.testutf8 = os.path.join(self.workdir, 'testutf8.txt')
23 with open(self.testutf8, 'wb') as f:
24 f.write('''hello ♥ world
25 second line
27 À demain!'''.encode('UTF-8'))
29 self.testlatin1 = os.path.join(self.workdir, 'testlatin1.txt')
30 with open(self.testlatin1, 'wb') as f:
31 f.write(b'''hell\xf8 world
32 second line
34 \xc0 demain!''')
36 self.testout = os.path.join(self.workdir, 'testout.txt')
38 def tearDown(self):
39 shutil.rmtree(self.workdir)
41 def test_file_readline_utf8(self):
42 ch = GLib.IOChannel(filename=self.testutf8)
43 self.assertEqual(ch.get_encoding(), 'UTF-8')
44 self.assertTrue(ch.get_close_on_unref())
45 self.assertEqual(_unicode(ch.readline()), 'hello ♥ world\n')
46 self.assertEqual(ch.get_buffer_condition(), GLib.IOCondition.IN)
47 self.assertEqual(ch.readline(), 'second line\n')
48 self.assertEqual(ch.readline(), '\n')
49 self.assertEqual(_unicode(ch.readline()), 'À demain!')
50 self.assertEqual(ch.get_buffer_condition(), 0)
51 self.assertEqual(ch.readline(), '')
52 ch.shutdown(True)
54 def test_file_readline_latin1(self):
55 ch = GLib.IOChannel(filename=self.testlatin1, mode='r')
56 ch.set_encoding('latin1')
57 self.assertEqual(ch.get_encoding(), 'latin1')
58 self.assertEqual(_unicode(ch.readline()), 'hellø world\n')
59 self.assertEqual(ch.readline(), 'second line\n')
60 self.assertEqual(ch.readline(), '\n')
61 self.assertEqual(_unicode(ch.readline()), 'À demain!')
62 ch.shutdown(True)
64 def test_file_iter(self):
65 items = []
66 ch = GLib.IOChannel(filename=self.testutf8)
67 for item in ch:
68 items.append(item)
69 self.assertEqual(len(items), 4)
70 self.assertEqual(_unicode(items[0]), 'hello ♥ world\n')
71 ch.shutdown(True)
73 def test_file_readlines(self):
74 ch = GLib.IOChannel(filename=self.testutf8)
75 lines = ch.readlines()
76 # Note, this really ought to be 4, but the static bindings add an extra
77 # empty one
78 self.assertGreaterEqual(len(lines), 4)
79 self.assertLessEqual(len(lines), 5)
80 self.assertEqual(_unicode(lines[0]), 'hello ♥ world\n')
81 self.assertEqual(_unicode(lines[3]), 'À demain!')
82 if len(lines) == 4:
83 self.assertEqual(lines[4], '')
85 def test_file_read(self):
86 ch = GLib.IOChannel(filename=self.testutf8)
87 with open(self.testutf8, 'rb') as f:
88 self.assertEqual(ch.read(), f.read())
90 ch = GLib.IOChannel(filename=self.testutf8)
91 with open(self.testutf8, 'rb') as f:
92 self.assertEqual(ch.read(10), f.read(10))
94 ch = GLib.IOChannel(filename=self.testutf8)
95 with open(self.testutf8, 'rb') as f:
96 self.assertEqual(ch.read(max_count=15), f.read(15))
98 def test_seek(self):
99 ch = GLib.IOChannel(filename=self.testutf8)
100 ch.seek(2)
101 self.assertEqual(ch.read(3), b'llo')
103 ch.seek(2, 0) # SEEK_SET
104 self.assertEqual(ch.read(3), b'llo')
106 ch.seek(1, 1) # SEEK_CUR, skip the space
107 self.assertEqual(ch.read(3), b'\xe2\x99\xa5')
109 ch.seek(2, 2) # SEEK_END
110 # FIXME: does not work currently
111 # self.assertEqual(ch.read(2), b'n!')
113 # invalid whence value
114 self.assertRaises(ValueError, ch.seek, 0, 3)
116 def test_file_write(self):
117 ch = GLib.IOChannel(filename=self.testout, mode='w')
118 ch.set_encoding('latin1')
119 ch.write('hellø world\n')
120 ch.shutdown(True)
121 ch = GLib.IOChannel(filename=self.testout, mode='a')
122 ch.set_encoding('latin1')
123 ch.write('À demain!')
124 ch.shutdown(True)
126 with open(self.testout, 'rb') as f:
127 self.assertEqual(f.read().decode('latin1'), 'hellø world\nÀ demain!')
129 def test_file_writelines(self):
130 ch = GLib.IOChannel(filename=self.testout, mode='w')
131 ch.writelines(['foo', 'bar\n', 'baz\n', 'end'])
132 ch.shutdown(True)
134 with open(self.testout, 'r') as f:
135 self.assertEqual(f.read(), 'foobar\nbaz\nend')
137 def test_buffering(self):
138 writer = GLib.IOChannel(filename=self.testout, mode='w')
139 writer.set_encoding(None)
140 self.assertTrue(writer.get_buffered())
141 self.assertGreater(writer.get_buffer_size(), 10)
143 reader = GLib.IOChannel(filename=self.testout, mode='r')
145 # does not get written immediately on buffering
146 writer.write('abc')
147 self.assertEqual(reader.read(), b'')
148 writer.flush()
149 self.assertEqual(reader.read(), b'abc')
151 # does get written immediately without buffering
152 writer.set_buffered(False)
153 writer.write('def')
154 self.assertEqual(reader.read(), b'def')
156 # writes after buffer overflow
157 writer.set_buffer_size(10)
158 writer.write('0123456789012')
159 self.assertTrue(reader.read().startswith(b'012'))
160 writer.flush()
161 reader.read() # ignore bits written after flushing
163 # closing flushes
164 writer.set_buffered(True)
165 writer.write('ghi')
166 writer.shutdown(True)
167 self.assertEqual(reader.read(), b'ghi')
168 reader.shutdown(True)
170 def test_fd_read(self):
171 (r, w) = os.pipe()
173 ch = GLib.IOChannel(filedes=r)
174 ch.set_encoding(None)
175 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
176 self.assertNotEqual(ch.get_flags() | GLib.IOFlags.NONBLOCK, 0)
177 self.assertEqual(ch.read(), b'')
178 os.write(w, b'\x01\x02')
179 self.assertEqual(ch.read(), b'\x01\x02')
181 # now test blocking case, after closing the write end
182 ch.set_flags(GLib.IOFlags(ch.get_flags() & ~GLib.IOFlags.NONBLOCK))
183 os.write(w, b'\x03\x04')
184 os.close(w)
185 self.assertEqual(ch.read(), b'\x03\x04')
187 ch.shutdown(True)
189 def test_fd_write(self):
190 (r, w) = os.pipe()
191 fcntl.fcntl(r, fcntl.F_SETFL, fcntl.fcntl(r, fcntl.F_GETFL) | os.O_NONBLOCK)
193 ch = GLib.IOChannel(filedes=w, mode='w')
194 ch.set_encoding(None)
195 ch.set_buffered(False)
196 ch.write(b'\x01\x02')
197 self.assertEqual(os.read(r, 10), b'\x01\x02')
199 # now test blocking case, after closing the write end
200 fcntl.fcntl(r, fcntl.F_SETFL, fcntl.fcntl(r, fcntl.F_GETFL) & ~os.O_NONBLOCK)
201 ch.write(b'\x03\x04')
202 ch.shutdown(True)
203 self.assertEqual(os.read(r, 10), b'\x03\x04')
204 os.close(r)
206 def test_deprecated_method_add_watch_no_data(self):
207 (r, w) = os.pipe()
209 ch = GLib.IOChannel(filedes=r)
210 ch.set_encoding(None)
211 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
213 cb_reads = []
215 def cb(channel, condition):
216 self.assertEqual(channel, ch)
217 self.assertEqual(condition, GLib.IOCondition.IN)
218 cb_reads.append(channel.read())
219 return True
221 # io_add_watch() method is deprecated, use GLib.io_add_watch
222 with warnings.catch_warnings(record=True) as warn:
223 warnings.simplefilter('always')
224 ch.add_watch(GLib.IOCondition.IN, cb)
225 self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning))
227 ml = GLib.MainLoop()
229 GLib.timeout_add(10, lambda: os.write(w, b'a') and False)
230 GLib.timeout_add(100, lambda: os.write(w, b'b') and False)
231 GLib.timeout_add(200, ml.quit)
232 ml.run()
234 self.assertEqual(cb_reads, [b'a', b'b'])
236 def test_deprecated_method_add_watch_data_priority(self):
237 (r, w) = os.pipe()
239 ch = GLib.IOChannel(filedes=r)
240 ch.set_encoding(None)
241 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
243 cb_reads = []
245 def cb(channel, condition, data):
246 self.assertEqual(channel, ch)
247 self.assertEqual(condition, GLib.IOCondition.IN)
248 self.assertEqual(data, 'hello')
249 cb_reads.append(channel.read())
250 return True
252 ml = GLib.MainLoop()
253 # io_add_watch() method is deprecated, use GLib.io_add_watch
254 with warnings.catch_warnings(record=True) as warn:
255 warnings.simplefilter('always')
256 id = ch.add_watch(GLib.IOCondition.IN, cb, 'hello', priority=GLib.PRIORITY_HIGH)
257 self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning))
259 self.assertEqual(ml.get_context().find_source_by_id(id).priority,
260 GLib.PRIORITY_HIGH)
262 GLib.timeout_add(10, lambda: os.write(w, b'a') and False)
263 GLib.timeout_add(100, lambda: os.write(w, b'b') and False)
264 GLib.timeout_add(200, ml.quit)
265 ml.run()
267 self.assertEqual(cb_reads, [b'a', b'b'])
269 def test_add_watch_no_data(self):
270 (r, w) = os.pipe()
272 ch = GLib.IOChannel(filedes=r)
273 ch.set_encoding(None)
274 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
276 cb_reads = []
278 def cb(channel, condition):
279 self.assertEqual(channel, ch)
280 self.assertEqual(condition, GLib.IOCondition.IN)
281 cb_reads.append(channel.read())
282 return True
284 id = GLib.io_add_watch(ch, GLib.PRIORITY_HIGH, GLib.IOCondition.IN, cb)
286 ml = GLib.MainLoop()
287 self.assertEqual(ml.get_context().find_source_by_id(id).priority,
288 GLib.PRIORITY_HIGH)
289 GLib.timeout_add(10, lambda: os.write(w, b'a') and False)
290 GLib.timeout_add(100, lambda: os.write(w, b'b') and False)
291 GLib.timeout_add(200, ml.quit)
292 ml.run()
294 self.assertEqual(cb_reads, [b'a', b'b'])
296 def test_add_watch_with_data(self):
297 (r, w) = os.pipe()
299 ch = GLib.IOChannel(filedes=r)
300 ch.set_encoding(None)
301 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
303 cb_reads = []
305 def cb(channel, condition, data):
306 self.assertEqual(channel, ch)
307 self.assertEqual(condition, GLib.IOCondition.IN)
308 self.assertEqual(data, 'hello')
309 cb_reads.append(channel.read())
310 return True
312 id = GLib.io_add_watch(ch, GLib.PRIORITY_HIGH, GLib.IOCondition.IN, cb, 'hello')
314 ml = GLib.MainLoop()
315 self.assertEqual(ml.get_context().find_source_by_id(id).priority,
316 GLib.PRIORITY_HIGH)
317 GLib.timeout_add(10, lambda: os.write(w, b'a') and False)
318 GLib.timeout_add(100, lambda: os.write(w, b'b') and False)
319 GLib.timeout_add(200, ml.quit)
320 ml.run()
322 self.assertEqual(cb_reads, [b'a', b'b'])
324 def test_add_watch_with_multi_data(self):
325 (r, w) = os.pipe()
327 ch = GLib.IOChannel(filedes=r)
328 ch.set_encoding(None)
329 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
331 cb_reads = []
333 def cb(channel, condition, data1, data2, data3):
334 self.assertEqual(channel, ch)
335 self.assertEqual(condition, GLib.IOCondition.IN)
336 self.assertEqual(data1, 'a')
337 self.assertEqual(data2, 'b')
338 self.assertEqual(data3, 'c')
339 cb_reads.append(channel.read())
340 return True
342 id = GLib.io_add_watch(ch, GLib.PRIORITY_HIGH, GLib.IOCondition.IN, cb,
343 'a', 'b', 'c')
345 ml = GLib.MainLoop()
346 self.assertEqual(ml.get_context().find_source_by_id(id).priority,
347 GLib.PRIORITY_HIGH)
348 GLib.timeout_add(10, lambda: os.write(w, b'a') and False)
349 GLib.timeout_add(100, lambda: os.write(w, b'b') and False)
350 GLib.timeout_add(200, ml.quit)
351 ml.run()
353 self.assertEqual(cb_reads, [b'a', b'b'])
355 def test_deprecated_add_watch_no_data(self):
356 (r, w) = os.pipe()
358 ch = GLib.IOChannel(filedes=r)
359 ch.set_encoding(None)
360 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
362 cb_reads = []
364 def cb(channel, condition):
365 self.assertEqual(channel, ch)
366 self.assertEqual(condition, GLib.IOCondition.IN)
367 cb_reads.append(channel.read())
368 return True
370 with warnings.catch_warnings(record=True) as warn:
371 warnings.simplefilter('always')
372 id = GLib.io_add_watch(ch, GLib.IOCondition.IN, cb, priority=GLib.PRIORITY_HIGH)
373 self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning))
375 ml = GLib.MainLoop()
376 self.assertEqual(ml.get_context().find_source_by_id(id).priority,
377 GLib.PRIORITY_HIGH)
378 GLib.timeout_add(10, lambda: os.write(w, b'a') and False)
379 GLib.timeout_add(100, lambda: os.write(w, b'b') and False)
380 GLib.timeout_add(200, ml.quit)
381 ml.run()
383 self.assertEqual(cb_reads, [b'a', b'b'])
385 def test_deprecated_add_watch_with_data(self):
386 (r, w) = os.pipe()
388 ch = GLib.IOChannel(filedes=r)
389 ch.set_encoding(None)
390 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
392 cb_reads = []
394 def cb(channel, condition, data):
395 self.assertEqual(channel, ch)
396 self.assertEqual(condition, GLib.IOCondition.IN)
397 self.assertEqual(data, 'hello')
398 cb_reads.append(channel.read())
399 return True
401 with warnings.catch_warnings(record=True) as warn:
402 warnings.simplefilter('always')
403 id = GLib.io_add_watch(ch, GLib.IOCondition.IN, cb, 'hello',
404 priority=GLib.PRIORITY_HIGH)
405 self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning))
407 ml = GLib.MainLoop()
408 self.assertEqual(ml.get_context().find_source_by_id(id).priority,
409 GLib.PRIORITY_HIGH)
410 GLib.timeout_add(10, lambda: os.write(w, b'a') and False)
411 GLib.timeout_add(100, lambda: os.write(w, b'b') and False)
412 GLib.timeout_add(200, ml.quit)
413 ml.run()
415 self.assertEqual(cb_reads, [b'a', b'b'])
417 def test_backwards_compat_flags(self):
418 with warnings.catch_warnings():
419 warnings.simplefilter('ignore', PyGIDeprecationWarning)
421 self.assertEqual(GLib.IOCondition.IN, GLib.IO_IN)
422 self.assertEqual(GLib.IOFlags.NONBLOCK, GLib.IO_FLAG_NONBLOCK)
423 self.assertEqual(GLib.IOFlags.IS_SEEKABLE, GLib.IO_FLAG_IS_SEEKABLE)
424 self.assertEqual(GLib.IOStatus.NORMAL, GLib.IO_STATUS_NORMAL)
426 if __name__ == '__main__':
427 unittest.main()