avoid warning for rb_thread_io_blocking_region
[raindrops.git] / test / test_linux.rb
blob65f25e0b6b9b0d459b06af8bddc8aea056748bc4
1 # -*- encoding: binary -*-
2 require 'test/unit'
3 require 'tempfile'
4 require 'raindrops'
5 require 'socket'
6 require 'pp'
7 $stderr.sync = $stdout.sync = true
9 class TestLinux < Test::Unit::TestCase
10   include Raindrops::Linux
12   TEST_ADDR = ENV['UNICORN_TEST_ADDR'] || '127.0.0.1'
14   def setup
15     @to_close = []
16   end
18   def teardown
19     @to_close.each { |io| io.close unless io.closed? }
20   end
22   def test_unix
23     tmp = Tempfile.new("\xde\xad\xbe\xef") # valid path, really :)
24     File.unlink(tmp.path)
25     us = UNIXServer.new(tmp.path)
26     stats = unix_listener_stats([tmp.path])
27     assert_equal 1, stats.size
28     assert_equal 0, stats[tmp.path].active
29     assert_equal 0, stats[tmp.path].queued
31     @to_close << UNIXSocket.new(tmp.path)
32     stats = unix_listener_stats([tmp.path])
33     assert_equal 1, stats.size
34     assert_equal 0, stats[tmp.path].active
35     assert_equal 1, stats[tmp.path].queued
37     @to_close << UNIXSocket.new(tmp.path)
38     stats = unix_listener_stats([tmp.path])
39     assert_equal 1, stats.size
40     assert_equal 0, stats[tmp.path].active
41     assert_equal 2, stats[tmp.path].queued
43     @to_close << us.accept
44     stats = unix_listener_stats([tmp.path])
45     assert_equal 1, stats.size
46     assert_equal 1, stats[tmp.path].active
47     assert_equal 1, stats[tmp.path].queued
48   end
50   def test_unix_all
51     tmp = Tempfile.new("\xde\xad\xbe\xef") # valid path, really :)
52     File.unlink(tmp.path)
53     us = UNIXServer.new(tmp.path)
54     @to_close << UNIXSocket.new(tmp.path)
55     stats = unix_listener_stats
56     assert_equal 0, stats[tmp.path].active
57     assert_equal 1, stats[tmp.path].queued
59     @to_close << UNIXSocket.new(tmp.path)
60     stats = unix_listener_stats
61     assert_equal 0, stats[tmp.path].active
62     assert_equal 2, stats[tmp.path].queued
64     @to_close << us.accept
65     stats = unix_listener_stats
66     assert_equal 1, stats[tmp.path].active
67     assert_equal 1, stats[tmp.path].queued
68   end
70   def test_tcp
71     s = TCPServer.new(TEST_ADDR, 0)
72     port = s.addr[1]
73     addr = "#{TEST_ADDR}:#{port}"
74     addrs = [ addr ]
75     stats = tcp_listener_stats(addrs)
76     assert_equal 1, stats.size
77     assert_equal 0, stats[addr].queued
78     assert_equal 0, stats[addr].active
80     @to_close << TCPSocket.new(TEST_ADDR, port)
81     stats = tcp_listener_stats(addrs)
82     assert_equal 1, stats.size
83     assert_equal 1, stats[addr].queued
84     assert_equal 0, stats[addr].active
86     @to_close << s.accept
87     stats = tcp_listener_stats(addrs)
88     assert_equal 1, stats.size
89     assert_equal 0, stats[addr].queued
90     assert_equal 1, stats[addr].active
91   end
93   def test_tcp_reuse_sock
94     nlsock = Raindrops::InetDiagSocket.new
95     s = TCPServer.new(TEST_ADDR, 0)
96     port = s.addr[1]
97     addr = "#{TEST_ADDR}:#{port}"
98     addrs = [ addr ]
99     stats = tcp_listener_stats(addrs, nlsock)
100     assert_equal 1, stats.size
101     assert_equal 0, stats[addr].queued
102     assert_equal 0, stats[addr].active
104     @to_close << TCPSocket.new(TEST_ADDR, port)
105     stats = tcp_listener_stats(addrs, nlsock)
106     assert_equal 1, stats.size
107     assert_equal 1, stats[addr].queued
108     assert_equal 0, stats[addr].active
110     @to_close << s.accept
111     stats = tcp_listener_stats(addrs, nlsock)
112     assert_equal 1, stats.size
113     assert_equal 0, stats[addr].queued
114     assert_equal 1, stats[addr].active
115     ensure
116       nlsock.close
117   end
119   def test_tcp_multi
120     s1 = TCPServer.new(TEST_ADDR, 0)
121     s2 = TCPServer.new(TEST_ADDR, 0)
122     port1, port2 = s1.addr[1], s2.addr[1]
123     addr1, addr2 = "#{TEST_ADDR}:#{port1}", "#{TEST_ADDR}:#{port2}"
124     addrs = [ addr1, addr2 ]
125     stats = tcp_listener_stats(addrs)
126     assert_equal 2, stats.size
127     assert_equal 0, stats[addr1].queued
128     assert_equal 0, stats[addr1].active
129     assert_equal 0, stats[addr2].queued
130     assert_equal 0, stats[addr2].active
132     @to_close << TCPSocket.new(TEST_ADDR, port1)
133     stats = tcp_listener_stats(addrs)
134     assert_equal 2, stats.size
135     assert_equal 1, stats[addr1].queued
136     assert_equal 0, stats[addr1].active
137     assert_equal 0, stats[addr2].queued
138     assert_equal 0, stats[addr2].active
140     sc1 = s1.accept
141     stats = tcp_listener_stats(addrs)
142     assert_equal 2, stats.size
143     assert_equal 0, stats[addr1].queued
144     assert_equal 1, stats[addr1].active
145     assert_equal 0, stats[addr2].queued
146     assert_equal 0, stats[addr2].active
148     @to_close << TCPSocket.new(TEST_ADDR, port2)
149     stats = tcp_listener_stats(addrs)
150     assert_equal 2, stats.size
151     assert_equal 0, stats[addr1].queued
152     assert_equal 1, stats[addr1].active
153     assert_equal 1, stats[addr2].queued
154     assert_equal 0, stats[addr2].active
156     @to_close << TCPSocket.new(TEST_ADDR, port2)
157     stats = tcp_listener_stats(addrs)
158     assert_equal 2, stats.size
159     assert_equal 0, stats[addr1].queued
160     assert_equal 1, stats[addr1].active
161     assert_equal 2, stats[addr2].queued
162     assert_equal 0, stats[addr2].active
164     @to_close << s2.accept
165     stats = tcp_listener_stats(addrs)
166     assert_equal 2, stats.size
167     assert_equal 0, stats[addr1].queued
168     assert_equal 1, stats[addr1].active
169     assert_equal 1, stats[addr2].queued
170     assert_equal 1, stats[addr2].active
172     sc1.close
173     stats = tcp_listener_stats(addrs)
174     assert_equal 0, stats[addr1].queued
175     assert_equal 0, stats[addr1].active
176     assert_equal 1, stats[addr2].queued
177     assert_equal 1, stats[addr2].active
178   end
180   # tries to overflow buffers
181   def test_tcp_stress_test
182     nr_proc = 32
183     nr_sock = 500
184     s = TCPServer.new(TEST_ADDR, 0)
185     port = s.addr[1]
186     addr = "#{TEST_ADDR}:#{port}"
187     addrs = [ addr ]
188     rda, wra = IO.pipe
189     rdb, wrb = IO.pipe
191     nr_proc.times do
192       fork do
193         rda.close
194         wrb.close
195         @to_close.concat((1..nr_sock).map { s.accept })
196         wra.syswrite('.')
197         wra.close
198         rdb.sysread(1) # wait for parent to nuke us
199       end
200     end
202     nr_proc.times do
203       fork do
204         rda.close
205         wrb.close
206         @to_close.concat((1..nr_sock).map { TCPSocket.new(TEST_ADDR, port) })
207         wra.syswrite('.')
208         wra.close
209         rdb.sysread(1) # wait for parent to nuke us
210       end
211     end
213     assert_equal('.' * (nr_proc * 2), rda.read(nr_proc * 2))
215     rda.close
216     stats = tcp_listener_stats(addrs)
217     expect = { addr => Raindrops::ListenStats[nr_sock * nr_proc, 0] }
218     assert_equal expect, stats
220     @to_close << TCPSocket.new(TEST_ADDR, port)
221     stats = tcp_listener_stats(addrs)
222     expect = { addr => Raindrops::ListenStats[nr_sock * nr_proc, 1] }
223     assert_equal expect, stats
225     if ENV["BENCHMARK"].to_i != 0
226       require 'benchmark'
227       puts(Benchmark.measure{1000.times { tcp_listener_stats(addrs) }})
228     end
230     wrb.syswrite('.' * (nr_proc * 2)) # broadcast a wakeup
231     statuses = Process.waitall
232     statuses.each { |(_,status)| assert status.success?, status.inspect }
233   end if ENV["STRESS"].to_i != 0
234 end if RUBY_PLATFORM =~ /linux/