Exceptions raised during renaming in rotating file handlers are now passed to handleE...
[python.git] / Lib / test / test_poll.py
blob3a48bcecaf49153dca0e257547eb5c9e8beeb0de
1 # Test case for the os.poll() function
3 import sys, os, select, random
4 from test.test_support import verify, verbose, TestSkipped, TESTFN
6 try:
7 select.poll
8 except AttributeError:
9 raise TestSkipped, "select.poll not defined -- skipping test_poll"
12 def find_ready_matching(ready, flag):
13 match = []
14 for fd, mode in ready:
15 if mode & flag:
16 match.append(fd)
17 return match
19 def test_poll1():
20 """Basic functional test of poll object
22 Create a bunch of pipe and test that poll works with them.
23 """
24 print 'Running poll test 1'
25 p = select.poll()
27 NUM_PIPES = 12
28 MSG = " This is a test."
29 MSG_LEN = len(MSG)
30 readers = []
31 writers = []
32 r2w = {}
33 w2r = {}
35 for i in range(NUM_PIPES):
36 rd, wr = os.pipe()
37 p.register(rd, select.POLLIN)
38 p.register(wr, select.POLLOUT)
39 readers.append(rd)
40 writers.append(wr)
41 r2w[rd] = wr
42 w2r[wr] = rd
44 while writers:
45 ready = p.poll()
46 ready_writers = find_ready_matching(ready, select.POLLOUT)
47 if not ready_writers:
48 raise RuntimeError, "no pipes ready for writing"
49 wr = random.choice(ready_writers)
50 os.write(wr, MSG)
52 ready = p.poll()
53 ready_readers = find_ready_matching(ready, select.POLLIN)
54 if not ready_readers:
55 raise RuntimeError, "no pipes ready for reading"
56 rd = random.choice(ready_readers)
57 buf = os.read(rd, MSG_LEN)
58 verify(len(buf) == MSG_LEN)
59 print buf
60 os.close(r2w[rd]) ; os.close( rd )
61 p.unregister( r2w[rd] )
62 p.unregister( rd )
63 writers.remove(r2w[rd])
65 poll_unit_tests()
66 print 'Poll test 1 complete'
68 def poll_unit_tests():
69 # returns NVAL for invalid file descriptor
70 FD = 42
71 try:
72 os.close(FD)
73 except OSError:
74 pass
75 p = select.poll()
76 p.register(FD)
77 r = p.poll()
78 verify(r[0] == (FD, select.POLLNVAL))
80 f = open(TESTFN, 'w')
81 fd = f.fileno()
82 p = select.poll()
83 p.register(f)
84 r = p.poll()
85 verify(r[0][0] == fd)
86 f.close()
87 r = p.poll()
88 verify(r[0] == (fd, select.POLLNVAL))
89 os.unlink(TESTFN)
91 # type error for invalid arguments
92 p = select.poll()
93 try:
94 p.register(p)
95 except TypeError:
96 pass
97 else:
98 print "Bogus register call did not raise TypeError"
99 try:
100 p.unregister(p)
101 except TypeError:
102 pass
103 else:
104 print "Bogus unregister call did not raise TypeError"
106 # can't unregister non-existent object
107 p = select.poll()
108 try:
109 p.unregister(3)
110 except KeyError:
111 pass
112 else:
113 print "Bogus unregister call did not raise KeyError"
115 # Test error cases
116 pollster = select.poll()
117 class Nope:
118 pass
120 class Almost:
121 def fileno(self):
122 return 'fileno'
124 try:
125 pollster.register( Nope(), 0 )
126 except TypeError: pass
127 else: print 'expected TypeError exception, not raised'
129 try:
130 pollster.register( Almost(), 0 )
131 except TypeError: pass
132 else: print 'expected TypeError exception, not raised'
135 # Another test case for poll(). This is copied from the test case for
136 # select(), modified to use poll() instead.
138 def test_poll2():
139 print 'Running poll test 2'
140 cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
141 p = os.popen(cmd, 'r')
142 pollster = select.poll()
143 pollster.register( p, select.POLLIN )
144 for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
145 if verbose:
146 print 'timeout =', tout
147 fdlist = pollster.poll(tout)
148 if (fdlist == []):
149 continue
150 fd, flags = fdlist[0]
151 if flags & select.POLLHUP:
152 line = p.readline()
153 if line != "":
154 print 'error: pipe seems to be closed, but still returns data'
155 continue
157 elif flags & select.POLLIN:
158 line = p.readline()
159 if verbose:
160 print repr(line)
161 if not line:
162 if verbose:
163 print 'EOF'
164 break
165 continue
166 else:
167 print 'Unexpected return value from select.poll:', fdlist
168 p.close()
169 print 'Poll test 2 complete'
171 def test_poll3():
172 # test int overflow
173 print 'Running poll test 3'
174 pollster = select.poll()
175 pollster.register(1)
177 try:
178 pollster.poll(1L << 64)
179 except OverflowError:
180 pass
181 else:
182 print 'Expected OverflowError with excessive timeout'
184 x = 2 + 3
185 if x != 5:
186 print 'Overflow must have occurred'
187 print 'Poll test 3 complete'
190 test_poll1()
191 test_poll2()
192 test_poll3()