copy_stream: handle and block on EAGAIN from the non-pipe IO
[ruby_io_splice.git] / test / test_io_splice.rb
blobf42d5c6db8f44ed05b77b1fb1cdea01b098720d1
1 # -*- encoding: binary -*-
2 require 'test/unit'
3 require 'tempfile'
4 require 'socket'
5 require 'io/nonblock'
6 require 'timeout'
7 $-w = true
8 require 'io/splice'
10 # unused_port provides an unused port on +addr+ usable for TCP that is
11 # guaranteed to be unused across all unicorn builds on that system.  It
12 # prevents race conditions by using a lock file other unicorn builds
13 # will see.  This is required if you perform several builds in parallel
14 # with a continuous integration system or run tests in parallel via
15 # gmake.  This is NOT guaranteed to be race-free if you run other
16 # processes that bind to random ports for testing (but the window
17 # for a race condition is very small).
18 def unused_port(addr = '127.0.0.1')
19   retries = 100
20   base = 5000
21   port = sock = nil
22   begin
23     begin
24       port = base + rand(32768 - base)
25       while port == 8080
26         port = base + rand(32768 - base)
27       end
29       sock = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
30       sock.bind(Socket.pack_sockaddr_in(port, addr))
31       sock.listen(5)
32     rescue Errno::EADDRINUSE, Errno::EACCES
33       sock.close rescue nil
34       retry if (retries -= 1) >= 0
35     end
37     # since we'll end up closing the random port we just got, there's a race
38     # condition could allow the random port we just chose to reselect itself
39     # when running tests in parallel with gmake.  Create a lock file while
40     # we have the port here to ensure that does not happen .
41     lock_path = "#{Dir::tmpdir}/unicorn_test.#{addr}:#{port}.lock"
42     lock = File.open(lock_path, File::WRONLY|File::CREAT|File::EXCL, 0600)
43     at_exit { File.unlink(lock_path) rescue nil }
44   rescue Errno::EEXIST
45     sock.close rescue nil
46     retry
47   end
48   sock.close rescue nil
49   port
50 end
52 class Test_IO_Splice < Test::Unit::TestCase
54   def test_splice
55     str = 'abcde'
56     size = 5
57     rd, wr = IO.pipe
58     tmp = Tempfile.new('ruby_io_splice')
60     assert_nothing_raised {
61       tmp.syswrite(str)
62       tmp.sysseek(0)
63     }
65     nr = IO.splice(tmp.fileno, nil, wr.fileno, nil, size, 0)
66     assert_equal size, nr
67     assert_equal str, rd.sysread(size)
68   end
70   def test_splice_io
71     str = 'abcde'
72     size = 5
73     rd, wr = IO.pipe
74     tmp = Tempfile.new('ruby_io_splice')
76     assert_nothing_raised {
77       tmp.syswrite(str)
78       tmp.sysseek(0)
79     }
81     nr = IO.splice(tmp, nil, wr, nil, size, 0)
82     assert_equal size, nr
83     assert_equal str, rd.sysread(size)
84   end
86   def test_splice_io_ish
87     str = 'abcde'
88     size = 5
89     rd, wr = IO.pipe
90     tmp = Tempfile.new('ruby_io_splice')
91     io_ish = [ tmp ]
92     def io_ish.to_io
93       first.to_io
94     end
96     assert_nothing_raised {
97       tmp.syswrite(str)
98       tmp.sysseek(0)
99     }
101     nr = IO.splice(io_ish, nil, wr, nil, size, 0)
102     assert_equal size, nr
103     assert_equal str, rd.sysread(size)
104   end
106   def test_splice_in_offset
107     str = 'abcde'
108     off = 3
109     len = 2
110     rd, wr = IO.pipe
111     tmp = Tempfile.new('ruby_io_splice')
113     assert_nothing_raised {
114       tmp.syswrite(str)
115       tmp.sysseek(0)
116     }
118     nr = IO.splice(tmp.fileno, off, wr.fileno, nil, len, 0)
119     assert_equal len, nr
120     assert_equal 'de', rd.sysread(len)
121   end
123   def test_splice_out_offset
124     str = 'abcde'
125     rd, wr = IO.pipe
126     tmp = Tempfile.new('ruby_io_splice')
128     assert_nothing_raised { wr.syswrite(str) }
129     nr = IO.splice(rd.fileno, nil, tmp.fileno, 3, str.size, 0)
130     assert_equal 5, nr
131     assert_nothing_raised { tmp.sysseek(0) }
132     assert_equal "\0\0\0abcde", tmp.sysread(9)
133   end
135   def test_splice_nonblock
136     rd, wr = IO.pipe
137     tmp = Tempfile.new('ruby_io_splice')
139     assert_raises(Errno::EAGAIN) {
140       IO.splice(rd.fileno, nil, tmp.fileno, 0, 5, IO::Splice::F_NONBLOCK)
141     }
142   end
144   def test_splice_eof
145     rd, wr = IO.pipe
146     tmp = Tempfile.new('ruby_io_splice')
147     wr.syswrite 'abc'
148     wr.close
150     nr = IO.splice(rd.fileno, nil, tmp.fileno, 0, 5, IO::Splice::F_NONBLOCK)
151     assert_equal 3, nr
152     assert_raises(EOFError) {
153       IO.splice(rd.fileno, nil, tmp.fileno, 0, 5, IO::Splice::F_NONBLOCK)
154     }
155   end
157   def test_splice_nonblock_socket
158     port = unused_port
159     server = TCPServer.new('127.0.0.1', port)
160     rp, wp = IO.pipe
161     rs = TCPSocket.new('127.0.0.1', port)
162     rs.nonblock = true
163     assert_raises(Errno::EAGAIN) { IO.splice(rs, nil, wp, nil, 1024, 0) }
164     rs.close
165     server.close
166   end
168   def test_tee
169     str = 'abcde'
170     size = 5
171     rda, wra = IO.pipe
172     rdb, wrb = IO.pipe
174     assert_nothing_raised { wra.syswrite(str) }
175     nr = IO.tee(rda.fileno, wrb.fileno, size, 0)
176     assert_equal 5, nr
177     assert_equal str, rdb.sysread(5)
178     assert_equal str, rda.sysread(5)
179   end
181   def test_tee_eof
182     rda, wra = IO.pipe
183     rdb, wrb = IO.pipe
184     wra.close
185     assert_raises(EOFError) { IO.tee(rda.fileno, wrb.fileno, 4096, 0) }
186   end
188   def test_tee_nonblock
189     rda, wra = IO.pipe
190     rdb, wrb = IO.pipe
191     assert_raises(Errno::EAGAIN) {
192       IO.tee(rda.fileno, wrb.fileno, 4096, IO::Splice::F_NONBLOCK)
193     }
194   end
196   def test_tee_io
197     str = 'abcde'
198     size = 5
199     rda, wra = IO.pipe
200     rdb, wrb = IO.pipe
202     assert_nothing_raised { wra.syswrite(str) }
203     nr = IO.tee(rda, wrb, size, 0)
204     assert_equal 5, nr
205     assert_equal str, rdb.sysread(5)
206     assert_equal str, rda.sysread(5)
207   end
209   def test_vmsplice_array
210     data = %w(hello world how are you today)
211     r, w = IO.pipe
212     n = IO.vmsplice(w.fileno, data, 0)
213     assert_equal data.join('').size, n
214     assert_equal data.join(''), r.readpartial(16384)
215   end
217   def test_vmsplice_string
218     r, w = IO.pipe
219     assert_equal 5, IO.vmsplice(w, 'hello', 0)
220     assert_equal 'hello', r.read(5)
221   end
223   def test_vmsplice_array_io
224     data = %w(hello world how are you today)
225     r, w = IO.pipe
226     n = IO.vmsplice(w, data, 0)
227     assert_equal data.join('').size, n
228     assert_equal data.join(''), r.readpartial(16384)
229   end
231   def test_vmsplice_nonblock
232     data = %w(hello world how are you today)
233     r, w = IO.pipe
234     w.syswrite('.' * IO::Splice::PIPE_CAPA)
235     assert_raises(Errno::EAGAIN) {
236       IO.vmsplice(w.fileno, data, IO::Splice::F_NONBLOCK)
237     }
238   end
240   def test_vmsplice_in_full
241     empty = ""
243     # bs * count should be > PIPE_BUF
244     [ [ 512, 512 ], [ 131073, 3 ], [ 4098, 64 ] ].each do |(bs,count)|
245       rd, wr = IO.pipe
246       buf = File.open('/dev/urandom', 'rb') { |fp| fp.sysread(bs) }
248       vec = (1..count).map { buf }
249       pid = fork do
250         wr.close
251         tmp = []
252         begin
253           sleep 0.005
254           tmp << rd.readpartial(8192, buf)
255         rescue EOFError
256           break
257         end while true
258         ok = (vec.join(empty) == tmp.join(empty))
259         exit! ok
260       end
261       assert_nothing_raised { rd.close }
262       assert_equal(bs * count, IO.vmsplice(wr.fileno, vec, 0))
263       assert_nothing_raised { wr.close }
264       _, status = Process.waitpid2(pid)
265       assert status.success?
266     end
267   end
269   def test_vmsplice_nil
270     data = %w(hello world how are you today)
271     assert_raises(TypeError) { IO.vmsplice(nil, data, 0) }
272   end
274   def test_constants
275     assert IO::Splice::PIPE_BUF > 0
276     %w(move nonblock more gift).each { |x|
277       assert Integer === IO::Splice.const_get("F_#{x.upcase}")
278     }
279     assert IO::Splice::PIPE_CAPA >= IO::Splice::PIPE_BUF
280   end
282   def test_splice_copy_stream_file_to_file_small
283     a, b = Tempfile.new('a'), Tempfile.new('b')
284     a.syswrite 'hello world'
285     a.sysseek(0)
286     IO::Splice.copy_stream(a, b)
287     b.rewind
288     assert_equal 'hello world', b.read
289   end
291   def test_splice_copy_stream_file_to_file_big
292     buf = ('ab' * IO::Splice::PIPE_CAPA) + 'hi'
293     a, b = Tempfile.new('a'), Tempfile.new('b')
294     a.syswrite buf
295     a.sysseek(0)
296     IO::Splice.copy_stream(a, b)
297     b.rewind
298     assert_equal buf, b.read
299   end
301   def test_splice_copy_stream_file_to_file_big_partial
302     nr = IO::Splice::PIPE_CAPA
303     buf = ('ab' * nr) + 'hi'
304     a, b = Tempfile.new('a'), Tempfile.new('b')
305     a.syswrite buf
306     a.sysseek(0)
307     assert_equal nr, IO::Splice.copy_stream(a, b, nr)
308     b.rewind
309     assert_equal('ab' * (nr/2), b.read)
310   end
312   def test_splice_copy_stream_file_to_file_len
313     a, b = Tempfile.new('a'), Tempfile.new('b')
314     a.syswrite 'hello world'
315     a.sysseek(0)
316     IO::Splice.copy_stream(a, b, 5)
317     b.rewind
318     assert_equal 'hello', b.read
319   end
321   def test_splice_copy_stream_pipe_to_file_len
322     a = Tempfile.new('a')
323     r, w = IO.pipe
324     w.syswrite 'hello world'
325     IO::Splice.copy_stream(r, a, 5)
326     a.rewind
327     assert_equal 'hello', a.read
328   end
330   def test_splice_copy_stream_paths
331     a = Tempfile.new('a')
332     b = Tempfile.new('a')
333     a.syswrite('hello world')
334     IO::Splice.copy_stream(a.path, b.path, 5)
335     assert_equal 'hello', b.read
336   end
338   def test_splice_copy_stream_src_offset
339     a = Tempfile.new('a')
340     b = Tempfile.new('a')
341     a.syswrite('hello world')
342     IO::Splice.copy_stream(a.path, b.path, 5, 6)
343     assert_equal 'world', b.read
344   end
346   def test_copy_stream_nonblock_src
347     port = unused_port
348     server = TCPServer.new('127.0.0.1', port)
349     rp, wp = IO.pipe
350     rs = TCPSocket.new('127.0.0.1', port)
351     rs.nonblock = true
352     nr = 0
353     assert_raises(Timeout::Error) do
354       timeout(0.05) { nr += IO::Splice.copy_stream(rs, wp, 5) }
355     end
356     assert_equal 0, nr
357     rs.close
358     server.close
359   end
361   def test_copy_stream_nonblock_dst
362     port = unused_port
363     server = TCPServer.new('127.0.0.1', port)
364     rp, wp = IO.pipe
365     rs = TCPSocket.new('127.0.0.1', port)
366     rs.nonblock = true
367     client = server.accept
368     buf = ' ' * IO::Splice::PIPE_CAPA
369     nr = 0
370     assert_raises(Timeout::Error) do
371       loop do
372         begin
373           wp.write_nonblock(buf)
374         rescue Errno::EAGAIN
375         end
376         timeout(0.05) do
377           nr += IO::Splice.copy_stream(rp, rs, IO::Splice::PIPE_CAPA)
378         end
379       end
380     end
381     assert_equal nr, client.read(nr).size
382     rs.close
383     server.close
384   end
386   def test_copy_stream_eof
387     r, w = IO.pipe
388     w.syswrite 'hello world'
389     w.close
390     a = Tempfile.new('a')
391     assert_equal 11, IO::Splice.copy_stream(r, a)
392     a.rewind
393     assert_equal 'hello world', a.read
394   end