functions: revert the function init order to make pylint happy again. See #217
[pygobject.git] / tests / test_iochannel.py
blob56a0aea9974fa0d2a54efffc2a239c33448fdbc1
1 # -*- Mode: Python -*-
2 # encoding: UTF-8
4 from __future__ import absolute_import
6 import os
7 import unittest
8 import tempfile
9 import os.path
10 import shutil
11 import warnings
13 try:
14 import fcntl
15 except ImportError:
16 fcntl = None
18 from gi.repository import GLib
19 from gi import PyGIDeprecationWarning
22 class IOChannel(unittest.TestCase):
23 def setUp(self):
24 self.workdir = tempfile.mkdtemp()
26 self.testutf8 = os.path.join(self.workdir, 'testutf8.txt')
27 with open(self.testutf8, 'wb') as f:
28 f.write(u'''hello ♥ world
29 second line
31 À demain!'''.encode('UTF-8'))
33 self.testlatin1 = os.path.join(self.workdir, 'testlatin1.txt')
34 with open(self.testlatin1, 'wb') as f:
35 f.write(b'''hell\xf8 world
36 second line
38 \xc0 demain!''')
40 self.testout = os.path.join(self.workdir, 'testout.txt')
42 def tearDown(self):
43 shutil.rmtree(self.workdir)
45 def test_file_readline_utf8(self):
46 ch = GLib.IOChannel(filename=self.testutf8)
47 self.assertEqual(ch.get_encoding(), 'UTF-8')
48 self.assertTrue(ch.get_close_on_unref())
49 self.assertEqual(ch.readline(), 'hello ♥ world\n')
50 self.assertEqual(ch.get_buffer_condition(), GLib.IOCondition.IN)
51 self.assertEqual(ch.readline(), 'second line\n')
52 self.assertEqual(ch.readline(), '\n')
53 self.assertEqual(ch.readline(), 'À demain!')
54 self.assertEqual(ch.get_buffer_condition(), 0)
55 self.assertEqual(ch.readline(), '')
56 ch.shutdown(True)
58 def test_file_readline_latin1(self):
59 ch = GLib.IOChannel(filename=self.testlatin1, mode='r')
60 ch.set_encoding('latin1')
61 self.assertEqual(ch.get_encoding(), 'latin1')
62 self.assertEqual(ch.readline(), 'hellø world\n')
63 self.assertEqual(ch.readline(), 'second line\n')
64 self.assertEqual(ch.readline(), '\n')
65 self.assertEqual(ch.readline(), 'À demain!')
66 ch.shutdown(True)
68 def test_file_iter(self):
69 items = []
70 ch = GLib.IOChannel(filename=self.testutf8)
71 for item in ch:
72 items.append(item)
73 self.assertEqual(len(items), 4)
74 self.assertEqual(items[0], 'hello ♥ world\n')
75 ch.shutdown(True)
77 def test_file_readlines(self):
78 ch = GLib.IOChannel(filename=self.testutf8)
79 lines = ch.readlines()
80 # Note, this really ought to be 4, but the static bindings add an extra
81 # empty one
82 self.assertGreaterEqual(len(lines), 4)
83 self.assertLessEqual(len(lines), 5)
84 self.assertEqual(lines[0], 'hello ♥ world\n')
85 self.assertEqual(lines[3], 'À demain!')
86 if len(lines) == 4:
87 self.assertEqual(lines[4], '')
89 def test_file_read(self):
90 ch = GLib.IOChannel(filename=self.testutf8)
91 with open(self.testutf8, 'rb') as f:
92 self.assertEqual(ch.read(), f.read())
94 ch = GLib.IOChannel(filename=self.testutf8)
95 with open(self.testutf8, 'rb') as f:
96 self.assertEqual(ch.read(10), f.read(10))
98 ch = GLib.IOChannel(filename=self.testutf8)
99 with open(self.testutf8, 'rb') as f:
100 self.assertEqual(ch.read(max_count=15), f.read(15))
102 def test_seek(self):
103 ch = GLib.IOChannel(filename=self.testutf8)
104 ch.seek(2)
105 self.assertEqual(ch.read(3), b'llo')
107 ch.seek(2, 0) # SEEK_SET
108 self.assertEqual(ch.read(3), b'llo')
110 ch.seek(1, 1) # SEEK_CUR, skip the space
111 self.assertEqual(ch.read(3), b'\xe2\x99\xa5')
113 ch.seek(2, 2) # SEEK_END
114 # FIXME: does not work currently
115 # self.assertEqual(ch.read(2), b'n!')
117 # invalid whence value
118 self.assertRaises(ValueError, ch.seek, 0, 3)
119 ch.shutdown(True)
121 def test_file_write(self):
122 ch = GLib.IOChannel(filename=self.testout, mode='w')
123 ch.set_encoding('latin1')
124 ch.write('hellø world\n')
125 ch.shutdown(True)
126 ch = GLib.IOChannel(filename=self.testout, mode='a')
127 ch.set_encoding('latin1')
128 ch.write('À demain!')
129 ch.shutdown(True)
131 with open(self.testout, 'rb') as f:
132 self.assertEqual(f.read().decode('latin1'), u'hellø world\nÀ demain!')
134 def test_file_writelines(self):
135 ch = GLib.IOChannel(filename=self.testout, mode='w')
136 ch.writelines(['foo', 'bar\n', 'baz\n', 'end'])
137 ch.shutdown(True)
139 with open(self.testout, 'r') as f:
140 self.assertEqual(f.read(), 'foobar\nbaz\nend')
142 def test_buffering(self):
143 writer = GLib.IOChannel(filename=self.testout, mode='w')
144 writer.set_encoding(None)
145 self.assertTrue(writer.get_buffered())
146 self.assertGreater(writer.get_buffer_size(), 10)
148 reader = GLib.IOChannel(filename=self.testout, mode='r')
150 # does not get written immediately on buffering
151 writer.write('abc')
152 self.assertEqual(reader.read(), b'')
153 writer.flush()
154 self.assertEqual(reader.read(), b'abc')
156 # does get written immediately without buffering
157 writer.set_buffered(False)
158 writer.write('def')
159 self.assertEqual(reader.read(), b'def')
161 # writes after buffer overflow
162 writer.set_buffer_size(10)
163 writer.write('0123456789012')
164 self.assertTrue(reader.read().startswith(b'012'))
165 writer.flush()
166 reader.read() # ignore bits written after flushing
168 # closing flushes
169 writer.set_buffered(True)
170 writer.write('ghi')
171 writer.shutdown(True)
172 self.assertEqual(reader.read(), b'ghi')
173 reader.shutdown(True)
175 @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows")
176 def test_fd_read(self):
177 (r, w) = os.pipe()
179 ch = GLib.IOChannel(filedes=r)
180 ch.set_encoding(None)
181 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
182 self.assertNotEqual(ch.get_flags() | GLib.IOFlags.NONBLOCK, 0)
183 self.assertEqual(ch.read(), b'')
184 os.write(w, b'\x01\x02')
185 self.assertEqual(ch.read(), b'\x01\x02')
187 # now test blocking case, after closing the write end
188 ch.set_flags(GLib.IOFlags(ch.get_flags() & ~GLib.IOFlags.NONBLOCK))
189 os.write(w, b'\x03\x04')
190 os.close(w)
191 self.assertEqual(ch.read(), b'\x03\x04')
193 ch.shutdown(True)
195 @unittest.skipUnless(fcntl, "no fcntl")
196 def test_fd_write(self):
197 (r, w) = os.pipe()
198 fcntl.fcntl(r, fcntl.F_SETFL, fcntl.fcntl(r, fcntl.F_GETFL) | os.O_NONBLOCK)
200 ch = GLib.IOChannel(filedes=w, mode='w')
201 ch.set_encoding(None)
202 ch.set_buffered(False)
203 ch.write(b'\x01\x02')
204 self.assertEqual(os.read(r, 10), b'\x01\x02')
206 # now test blocking case, after closing the write end
207 fcntl.fcntl(r, fcntl.F_SETFL, fcntl.fcntl(r, fcntl.F_GETFL) & ~os.O_NONBLOCK)
208 ch.write(b'\x03\x04')
209 ch.shutdown(True)
210 self.assertEqual(os.read(r, 10), b'\x03\x04')
211 os.close(r)
213 @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows")
214 def test_deprecated_method_add_watch_no_data(self):
215 (r, w) = os.pipe()
217 ch = GLib.IOChannel(filedes=r)
218 ch.set_encoding(None)
219 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
221 cb_reads = []
223 def cb(channel, condition):
224 self.assertEqual(channel, ch)
225 self.assertEqual(condition, GLib.IOCondition.IN)
226 cb_reads.append(channel.read())
227 if len(cb_reads) == 2:
228 ml.quit()
229 return True
231 # io_add_watch() method is deprecated, use GLib.io_add_watch
232 with warnings.catch_warnings(record=True) as warn:
233 warnings.simplefilter('always')
234 ch.add_watch(GLib.IOCondition.IN, cb, priority=GLib.PRIORITY_HIGH)
235 self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning))
237 def write():
238 os.write(w, b'a')
239 GLib.idle_add(lambda: os.write(w, b'b') and False)
241 ml = GLib.MainLoop()
242 GLib.idle_add(write)
243 GLib.timeout_add(2000, ml.quit)
244 ml.run()
246 self.assertEqual(cb_reads, [b'a', b'b'])
248 @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows")
249 def test_deprecated_method_add_watch_data_priority(self):
250 (r, w) = os.pipe()
252 ch = GLib.IOChannel(filedes=r)
253 ch.set_encoding(None)
254 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
256 cb_reads = []
258 def cb(channel, condition, data):
259 self.assertEqual(channel, ch)
260 self.assertEqual(condition, GLib.IOCondition.IN)
261 self.assertEqual(data, 'hello')
262 cb_reads.append(channel.read())
263 if len(cb_reads) == 2:
264 ml.quit()
265 return True
267 ml = GLib.MainLoop()
268 # io_add_watch() method is deprecated, use GLib.io_add_watch
269 with warnings.catch_warnings(record=True) as warn:
270 warnings.simplefilter('always')
271 id = ch.add_watch(GLib.IOCondition.IN, cb, 'hello', priority=GLib.PRIORITY_HIGH)
272 self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning))
274 self.assertEqual(ml.get_context().find_source_by_id(id).priority,
275 GLib.PRIORITY_HIGH)
277 def write():
278 os.write(w, b'a')
279 GLib.idle_add(lambda: os.write(w, b'b') and False)
281 GLib.idle_add(write)
282 GLib.timeout_add(2000, ml.quit)
283 ml.run()
285 self.assertEqual(cb_reads, [b'a', b'b'])
287 @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows")
288 def test_add_watch_no_data(self):
289 (r, w) = os.pipe()
291 ch = GLib.IOChannel(filedes=r)
292 ch.set_encoding(None)
293 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
295 cb_reads = []
297 def cb(channel, condition):
298 self.assertEqual(channel, ch)
299 self.assertEqual(condition, GLib.IOCondition.IN)
300 cb_reads.append(channel.read())
301 if len(cb_reads) == 2:
302 ml.quit()
303 return True
305 id = GLib.io_add_watch(ch, GLib.PRIORITY_HIGH, GLib.IOCondition.IN, cb)
307 ml = GLib.MainLoop()
308 self.assertEqual(ml.get_context().find_source_by_id(id).priority,
309 GLib.PRIORITY_HIGH)
311 def write():
312 os.write(w, b'a')
313 GLib.idle_add(lambda: os.write(w, b'b') and False)
315 GLib.idle_add(write)
316 GLib.timeout_add(2000, ml.quit)
317 ml.run()
319 self.assertEqual(cb_reads, [b'a', b'b'])
321 @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows")
322 def test_add_watch_with_data(self):
323 (r, w) = os.pipe()
325 ch = GLib.IOChannel(filedes=r)
326 ch.set_encoding(None)
327 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
329 cb_reads = []
331 def cb(channel, condition, data):
332 self.assertEqual(channel, ch)
333 self.assertEqual(condition, GLib.IOCondition.IN)
334 self.assertEqual(data, 'hello')
335 cb_reads.append(channel.read())
336 if len(cb_reads) == 2:
337 ml.quit()
338 return True
340 id = GLib.io_add_watch(ch, GLib.PRIORITY_HIGH, GLib.IOCondition.IN, cb, 'hello')
342 ml = GLib.MainLoop()
343 self.assertEqual(ml.get_context().find_source_by_id(id).priority,
344 GLib.PRIORITY_HIGH)
346 def write():
347 os.write(w, b'a')
348 GLib.idle_add(lambda: os.write(w, b'b') and False)
350 GLib.idle_add(write)
351 GLib.timeout_add(2000, ml.quit)
352 ml.run()
354 self.assertEqual(cb_reads, [b'a', b'b'])
356 @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows")
357 def test_add_watch_with_multi_data(self):
358 (r, w) = os.pipe()
360 ch = GLib.IOChannel(filedes=r)
361 ch.set_encoding(None)
362 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
364 cb_reads = []
366 def cb(channel, condition, data1, data2, data3):
367 self.assertEqual(channel, ch)
368 self.assertEqual(condition, GLib.IOCondition.IN)
369 self.assertEqual(data1, 'a')
370 self.assertEqual(data2, 'b')
371 self.assertEqual(data3, 'c')
372 cb_reads.append(channel.read())
373 if len(cb_reads) == 2:
374 ml.quit()
375 return True
377 id = GLib.io_add_watch(ch, GLib.PRIORITY_HIGH, GLib.IOCondition.IN, cb,
378 'a', 'b', 'c')
380 ml = GLib.MainLoop()
381 self.assertEqual(ml.get_context().find_source_by_id(id).priority,
382 GLib.PRIORITY_HIGH)
384 def write():
385 os.write(w, b'a')
386 GLib.idle_add(lambda: os.write(w, b'b') and False)
388 GLib.idle_add(write)
389 GLib.timeout_add(2000, ml.quit)
390 ml.run()
392 self.assertEqual(cb_reads, [b'a', b'b'])
394 @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows")
395 def test_deprecated_add_watch_no_data(self):
396 (r, w) = os.pipe()
398 ch = GLib.IOChannel(filedes=r)
399 ch.set_encoding(None)
400 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
402 cb_reads = []
404 def cb(channel, condition):
405 self.assertEqual(channel, ch)
406 self.assertEqual(condition, GLib.IOCondition.IN)
407 cb_reads.append(channel.read())
408 if len(cb_reads) == 2:
409 ml.quit()
410 return True
412 with warnings.catch_warnings(record=True) as warn:
413 warnings.simplefilter('always')
414 id = GLib.io_add_watch(ch, GLib.IOCondition.IN, cb, priority=GLib.PRIORITY_HIGH)
415 self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning))
417 ml = GLib.MainLoop()
418 self.assertEqual(ml.get_context().find_source_by_id(id).priority,
419 GLib.PRIORITY_HIGH)
421 def write():
422 os.write(w, b'a')
423 GLib.idle_add(lambda: os.write(w, b'b') and False)
425 GLib.idle_add(write)
426 GLib.timeout_add(2000, ml.quit)
427 ml.run()
429 self.assertEqual(cb_reads, [b'a', b'b'])
431 @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows")
432 def test_deprecated_add_watch_with_data(self):
433 (r, w) = os.pipe()
435 ch = GLib.IOChannel(filedes=r)
436 ch.set_encoding(None)
437 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
439 cb_reads = []
441 def cb(channel, condition, data):
442 self.assertEqual(channel, ch)
443 self.assertEqual(condition, GLib.IOCondition.IN)
444 self.assertEqual(data, 'hello')
445 cb_reads.append(channel.read())
446 if len(cb_reads) == 2:
447 ml.quit()
448 return True
450 with warnings.catch_warnings(record=True) as warn:
451 warnings.simplefilter('always')
452 id = GLib.io_add_watch(ch, GLib.IOCondition.IN, cb, 'hello',
453 priority=GLib.PRIORITY_HIGH)
454 self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning))
456 ml = GLib.MainLoop()
457 self.assertEqual(ml.get_context().find_source_by_id(id).priority,
458 GLib.PRIORITY_HIGH)
460 def write():
461 os.write(w, b'a')
462 GLib.idle_add(lambda: os.write(w, b'b') and False)
464 GLib.idle_add(write)
466 GLib.timeout_add(2000, ml.quit)
467 ml.run()
469 self.assertEqual(cb_reads, [b'a', b'b'])
471 def test_backwards_compat_flags(self):
472 with warnings.catch_warnings():
473 warnings.simplefilter('ignore', PyGIDeprecationWarning)
475 self.assertEqual(GLib.IOCondition.IN, GLib.IO_IN)
476 self.assertEqual(GLib.IOFlags.NONBLOCK, GLib.IO_FLAG_NONBLOCK)
477 self.assertEqual(GLib.IOFlags.IS_SEEKABLE, GLib.IO_FLAG_IS_SEEKABLE)
478 self.assertEqual(GLib.IOStatus.NORMAL, GLib.IO_STATUS_NORMAL)