net-http-persistent usage respects timeouts
[ruby-mogilefs-client.git] / test / test_mogilefs.rb
bloba6bcf3546662fc096ea23583f72a3e7e8dfec91e
1 # -*- encoding: binary -*-
2 require './test/setup'
3 require 'stringio'
4 require 'tempfile'
5 require 'fileutils'
7 class TestMogileFS__MogileFS < TestMogileFS
8   def setup
9     @klass = MogileFS::MogileFS
10     super
11   end
13   def read_headers(client)
14     headers = ""
15     while line = client.gets
16       headers << line
17       return headers if line == "\r\n"
18     end
19   end
21   def test_initialize
22     assert_equal 'test', @client.domain
24     assert_raises ArgumentError do
25       MogileFS::MogileFS.new :hosts => ['kaa:6001']
26     end
27   end
29   def test_get_file_data_http
30     tmp = Tempfile.new('accept')
31     accept = File.open(tmp.path, "ab")
32     svr = Proc.new do |serv, port|
33       client, _ = serv.accept
34       client.sync = true
35       readed = read_headers(client)
36       assert_match(
37         %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n},
38         readed)
39       accept.syswrite('.')
40       client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
41       client.close
42     end
43     t1 = TempServer.new(svr)
44     t2 = TempServer.new(svr)
45     path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
46     path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
48     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
50     assert_equal 'data!', @client.get_file_data('key')
51     assert_equal 1, accept.stat.size
52     ensure
53       TempServer.destroy_all!
54   end
56   def test_get_file_data_http_not_found_failover
57     tmp = Tempfile.new('accept')
58     accept = File.open(tmp.path, 'ab')
59     svr1 = Proc.new do |serv, port|
60       client, _ = serv.accept
61       client.sync = true
62       readed = read_headers(client)
63       assert_match(
64         %r{\AGET /dev1/0/000/000/0000000062\.fid HTTP/1.[01]\r\n},
65         readed)
66       accept.syswrite('.')
67       client.send("HTTP/1.0 404 Not Found\r\n\r\ndata!", 0)
68       client.close
69     end
71     svr2 = Proc.new do |serv, port|
72       client, _ = serv.accept
73       client.sync = true
74       readed = read_headers(client)
75       assert_match(
76         %r{\AGET /dev2/0/000/000/0000000062\.fid HTTP/1.[01].*\r\n},
77         readed)
78       accept.syswrite('.')
79       client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
80       client.close
81     end
83     t1 = TempServer.new(svr1)
84     t2 = TempServer.new(svr2)
85     path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
86     path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
87     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
89     assert_equal 'data!', @client.get_file_data('key')
90     assert_equal 2, accept.stat.size
91     ensure
92       TempServer.destroy_all!
93   end
95   def test_get_file_data_http_block
96     tmpfp = Tempfile.new('test_mogilefs.open_data')
97     nr = nr_chunks
98     chunk_size = 1024 * 1024
99     expect_size = nr * chunk_size
100     header = "HTTP/1.0 200 OK\r\n" \
101              "Content-Length: #{expect_size}\r\n\r\n"
102     assert_equal header.size, tmpfp.syswrite(header)
103     nr.times { assert_equal chunk_size, tmpfp.syswrite(' ' * chunk_size) }
104     assert_equal expect_size + header.size, File.size(tmpfp.path)
105     tmpfp.sysseek(0)
107     accept = Tempfile.new('accept')
108     svr = Proc.new do |serv, port|
109       client, _ = serv.accept
110       client.sync = true
111       accept.syswrite('.')
112       readed = read_headers(client)
113       assert_match(
114         %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n},
115         readed)
116       MogileFS.io.copy_stream(tmpfp, client)
117       client.close
118       exit 0
119     end
120     t1 = TempServer.new(svr)
121     t2 = TempServer.new(svr)
122     path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
123     path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
125     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
127     data = Tempfile.new('test_mogilefs.dest_data')
128     read_nr = nr = 0
129     @client.get_file_data('key') do |fp|
130       buf = ''
131       loop do
132         begin
133           fp.sysread(16384, buf)
134           read_nr = buf.size
135           nr += read_nr
136           assert_equal read_nr, data.syswrite(buf), "partial write"
137         rescue Errno::EAGAIN
138           retry
139         rescue EOFError
140           break
141         end
142       end
143     end
144     assert_equal expect_size, nr, "size mismatch"
145     assert_equal 1, accept.stat.size
146   end
148   def test_get_paths
149     path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
150     path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
152     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
154     expected = [ path1, path2 ]
156     assert_equal expected, @client.get_paths('key').sort
157   end
159   def test_get_uris
160     path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
161     path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
163     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
165     expected = [ URI.parse(path1), URI.parse(path2) ]
167     assert_equal expected, @client.get_uris('key')
168   end
171   def test_get_paths_unknown_key
172     @backend.get_paths = ['unknown_key', '']
174     assert_raises MogileFS::Backend::UnknownKeyError do
175       assert_equal nil, @client.get_paths('key')
176     end
177   end
179   def test_delete_existing
180     @backend.delete = { }
181     @client.delete 'no_such_key'
182   end
184   def test_delete_nonexisting
185     @backend.delete = 'unknown_key', ''
186     assert_raises MogileFS::Backend::UnknownKeyError do
187       @client.delete('no_such_key')
188     end
189   end
191   def test_delete_readonly
192     @client.readonly = true
193     assert_raises MogileFS::ReadOnlyError do
194       @client.delete 'no_such_key'
195     end
196   end
198   def test_each_key
199     @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_2',
200                            'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
201     @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_4',
202                            'key_1' => 'new_key_3', 'key_2' => 'new_key_4' }
203     @backend.list_keys = { 'key_count' => 0, 'next_after' => 'new_key_4' }
204     keys = []
205     @client.each_key 'new' do |key|
206       keys << key
207     end
209     assert_equal %w[new_key_1 new_key_2 new_key_3 new_key_4], keys
210   end
212   def test_list_keys
213     @backend.list_keys = { 'key_count' => '2', 'next_after' => 'new_key_2',
214                            'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
216     keys, next_after = @client.list_keys 'new'
217     assert_equal ['new_key_1', 'new_key_2'], keys.sort
218     assert_equal 'new_key_2', next_after
219   end
221   def test_new_file_http
222     @client.readonly = true
223     assert_raises MogileFS::ReadOnlyError do
224       @client.new_file 'new_key', 'test'
225     end
226   end
228   def test_new_file_readonly
229     @client.readonly = true
230     assert_raises MogileFS::ReadOnlyError do
231       @client.new_file 'new_key', 'test'
232     end
233   end
235   def test_store_file_small_http
236     received = Tempfile.new('received')
237     to_store = Tempfile.new('small')
238     to_store.syswrite('data')
240     t = TempServer.new(Proc.new do |serv, accept|
241       client, _ = serv.accept
242       while buf = client.readpartial(666)
243         received.syswrite(buf)
244         break if buf =~ /data/
245       end
246       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
247       client.close
248     end)
250     @backend.create_open = {
251       'devid' => '1',
252       'path' => "http://127.0.0.1:#{t.port}/path",
253     }
254     nr = @client.store_file 'new_key', 'test', to_store.path
255     assert_equal 4, nr
256     received.sysseek(0)
258     a = received.sysread(999999).split(/\r\n/)
259     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
260     assert_equal("data", a[-1])
261     assert_equal("", a[-2])
262     assert a.grep(%r{\AContent-Length: 4\z})[0]
263     ensure
264       TempServer.destroy_all!
265   end
267   def test_store_content_http
268     received = Tempfile.new('received')
270     t = TempServer.new(Proc.new do |serv, accept|
271       client, _ = serv.accept
272       client.sync = true
273       seen = ""
274       while seen !~ /\r\n\r\ndata/
275         buf = client.readpartial(4096)
276         seen << buf
277         received.syswrite(buf)
278       end
279       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
280       client.close
281     end)
283     @backend.create_open = {
284       'devid' => '1',
285       'path' => "http://127.0.0.1:#{t.port}/path",
286     }
288     nr = @client.store_content 'new_key', 'test', 'data'
289     assert nr
290     assert_equal 4, nr
292     received.sysseek(0)
293     a = received.sysread(999999).split(/\r\n/)
294     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
295     assert_equal("data", a[-1])
296     assert_equal("", a[-2])
297     assert a.grep(%r{\AContent-Length: 4\z})[0]
298     ensure
299       TempServer.destroy_all!
300   end
303   def test_store_content_with_writer_callback
304     received = Tempfile.new('received')
305     expected = "PUT /path HTTP/1.0\r\nContent-Length: 40\r\n\r\n"
306     10.times do
307       expected += "data"
308     end
309     t = TempServer.new(Proc.new do |serv, accept|
310       client, _ = serv.accept
311       client.sync = true
312       nr = 0
313       seen = ''
314       loop do
315         buf = client.readpartial(8192) or break
316         break if buf.length == 0
317         assert_equal buf.length, received.syswrite(buf)
318         nr += buf.length
319         seen << buf
320         break if seen =~ /\r\n\r\n(?:data){10}/
321       end
322       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
323       client.close
324     end)
326     @backend.create_open = {
327       'devid' => '1',
328       'path' => "http://127.0.0.1:#{t.port}/path",
329     }
331     cbk = MogileFS::Util::StoreContent.new(40) do |write_callback|
332       10.times do
333         write_callback.call("data")
334       end
335     end
336     assert_equal 40, cbk.length
337     nr = @client.store_content('new_key', 'test', cbk)
338     assert_equal 40, nr
340     received.sysseek(0)
341     a = received.sysread(999999).split(/\r\n/)
342     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
343     assert_equal("data" * 10, a[-1])
344     assert_equal("", a[-2])
345     assert a.grep(%r{\AContent-Length: 40\z})[0]
346     ensure
347       TempServer.destroy_all!
348   end
350   def test_store_content_multi_dest_failover_path
351     test_store_content_multi_dest_failover(true)
352   end
354   def test_store_content_multi_dest_failover(big_io = false)
355     received1 = Tempfile.new('received')
356     received2 = Tempfile.new('received')
358     t1 = TempServer.new(Proc.new do |serv, accept|
359       client, _ = serv.accept
360       seen = ""
361       while seen !~ /\r\n\r\ndata/
362         buf = client.readpartial(4096)
363         seen << buf
364         received1.syswrite(buf)
365       end
366       client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
367       client.close
368     end)
370     t2 = TempServer.new(Proc.new do |serv, accept|
371       client, _ = serv.accept
372       seen = ""
373       while seen !~ /\r\n\r\ndata/
374         buf = client.readpartial(4096)
375         seen << buf
376         received2.syswrite(buf)
377       end
378       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
379       client.close
380     end)
382     @backend.create_open = {
383       'dev_count' => '2',
384       'devid_1' => '1',
385       'path_1' => "http://127.0.0.1:#{t1.port}/path",
386       'devid_2' => '2',
387       'path_2' => "http://127.0.0.1:#{t2.port}/path",
388     }
390     if big_io
391       tmp = Tempfile.new('data')
392       tmp.sync = true
393       tmp.write 'data'
394       nr = @client.store_file('new_key', 'test', tmp.path)
395       tmp.close!
396     else
397       nr = @client.store_content 'new_key', 'test', 'data'
398     end
399     assert_equal 4, nr
400     received1.sysseek(0)
401     received2.sysseek(0)
402     a = received1.sysread(4096).split(/\r\n/)
403     b = received2.sysread(4096).split(/\r\n/)
404     assert_equal a[0], b[0]
405     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
406     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, b[0])
407     assert_equal("data", a[-1])
408     assert_equal("data", b[-1])
409     assert_equal("", a[-2])
410     assert_equal("", b[-2])
411     assert a.grep(%r{\AContent-Length: 4\z})[0]
412     assert b.grep(%r{\AContent-Length: 4\z})[0]
413     ensure
414       TempServer.destroy_all!
415   end
417   def test_store_content_http_fail
418     t = TempServer.new(Proc.new do |serv, accept|
419       client, _ = serv.accept
420       client.sync = true
421       read_headers(client)
422       client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
423       client.close
424     end)
426     @backend.create_open = {
427       'devid' => '1',
428       'path' => "http://127.0.0.1:#{t.port}/path",
429     }
431     assert_raises MogileFS::HTTPFile::NoStorageNodesError do
432       @client.store_content 'new_key', 'test', 'data'
433     end
434   end
436   def test_store_content_http_empty
437     received = Tempfile.new('received')
438     t = TempServer.new(Proc.new do |serv, accept|
439       client, _ = serv.accept
440       client.sync = true
441       received.syswrite(client.recv(4096, 0))
442       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
443       client.close
444     end)
446     @backend.create_open = {
447       'devid' => '1',
448       'path' => "http://127.0.0.1:#{t.port}/path",
449     }
451     nr = @client.store_content 'new_key', 'test', ''
452     assert_equal 0, nr
453     received.sysseek(0)
454     a = received.sysread(4096).split(/\r\n/)
455     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
456     assert a.grep(%r{\AContent-Length: 0\z})[0]
457   end
459   def test_store_content_nfs
460     @backend.create_open = {
461       'dev_count' => '1',
462       'devid_1' => '1',
463       'path_1' => '/path',
464     }
465     assert_raises MogileFS::UnsupportedPathError do
466       @client.store_content 'new_key', 'test', 'data'
467     end
468   end
470   def test_new_file_http_large
471     expect = Tempfile.new('test_mogilefs.expect')
472     to_put = Tempfile.new('test_mogilefs.to_put')
473     received = Tempfile.new('test_mogilefs.received')
475     nr = nr_chunks
476     chunk_size = 1024 * 1024
477     expect_size = nr * chunk_size
479     header = "PUT /path HTTP/1.0\r\n" \
480              "Content-Length: #{expect_size}\r\n\r\n"
481     assert_equal header.size, expect.syswrite(header)
482     nr.times do
483       assert_equal chunk_size, expect.syswrite(' ' * chunk_size)
484       assert_equal chunk_size, to_put.syswrite(' ' * chunk_size)
485     end
486     assert_equal expect_size + header.size, expect.stat.size
487     assert_equal expect_size, to_put.stat.size
489     readed = Tempfile.new('readed')
490     t = TempServer.new(Proc.new do |serv, accept|
491       client, _ = serv.accept
492       client.sync = true
493       nr = 0
494       loop do
495         buf = client.readpartial(8192) or break
496         break if buf.length == 0
497         assert_equal buf.length, received.syswrite(buf)
498         nr += buf.length
499         break if nr >= expect.stat.size
500       end
501       readed.syswrite("#{nr}")
502       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
503       client.close
504     end)
506     @backend.create_open = {
507       'devid' => '1',
508       'path' => "http://127.0.0.1:#{t.port}/path",
509     }
511     orig_size = to_put.size
512     nr = @client.store_file('new_key', 'test', to_put.path)
513     assert nr, nr.inspect
514     assert_equal orig_size, nr
515     assert_equal orig_size, to_put.size
516     readed.sysseek(0)
517     assert_equal expect.stat.size, readed.sysread(4096).to_i
519     ENV['PATH'].split(/:/).each do |path|
520       cmp_bin = "#{path}/cmp"
521       File.executable?(cmp_bin) or next
522       # puts "running #{cmp_bin} #{expect.path} #{received.path}"
523       assert( system(cmp_bin, expect.path, received.path) )
524       break
525     end
527     ensure
528       TempServer.destroy_all!
529   end
531   def test_store_content_readonly
532     @client.readonly = true
534     assert_raises MogileFS::ReadOnlyError do
535       @client.store_content 'new_key', 'test', nil
536     end
537   end
539   def test_store_file_readonly
540     @client.readonly = true
541     assert_raises MogileFS::ReadOnlyError do
542       @client.store_file 'new_key', 'test', nil
543     end
544   end
546   def test_rename_existing
547     @backend.rename = {}
549     assert_nil @client.rename('from_key', 'to_key')
550   end
552   def test_rename_nonexisting
553     @backend.rename = 'unknown_key', ''
555     assert_raises MogileFS::Backend::UnknownKeyError do
556       @client.rename('from_key', 'to_key')
557     end
558   end
560   def test_rename_no_key
561     @backend.rename = 'no_key', 'no_key'
563     e = assert_raises MogileFS::Backend::NoKeyError do
564       @client.rename 'new_key', 'test'
565     end
567     assert_equal 'no_key', e.message
568   end
570   def test_rename_readonly
571     @client.readonly = true
573     e = assert_raises MogileFS::ReadOnlyError do
574       @client.rename 'new_key', 'test'
575     end
577     assert_equal 'readonly mogilefs', e.message
578   end
580   def assert_get_paths_args(expect, *args)
581     sock = TCPServer.new("127.0.0.1", 0)
582     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
583     c = MogileFS::MogileFS.new(nargs)
584     received = []
585     th = Thread.new do
586       a = sock.accept
587       line = a.gets
588       received << line
589       a.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
590       a.close
591     end
592     paths_expect = %w(http://0/a http://0/b)
593     assert_equal paths_expect, c.get_paths("f", *args)
594     th.join
595     assert_equal 1, received.size
596     tmp = c.backend.url_decode(received[0].split(/\s+/)[1])
597     assert_equal "f", tmp.delete("key")
598     assert_equal "foo", tmp.delete("domain")
599     assert_equal expect, tmp
600     c.backend.shutdown
601     ensure
602       sock.close
603   end
605   def test_get_paths_args
606     assert_get_paths_args({"noverify"=>"1", "zone"=>""})
607     assert_get_paths_args({"noverify"=>"0", "zone"=>""}, false)
608     assert_get_paths_args({"noverify"=>"0", "zone"=>""}, :noverify=>false)
609     assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"}, true, "alt")
610     assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"},
611                           {:noverify => true, :zone => "alt"})
612     assert_get_paths_args({"noverify"=>"1", "zone"=>"alt","pathcount"=>"666"},
613                           {:noverify => true, :zone => "alt", :pathcount=>666})
614   end
616   def test_idempotent_command_eof
617     ip = "127.0.0.1"
618     a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
619     hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
620     args = { :hosts => hosts, :domain => "foo" }
621     c = MogileFS::MogileFS.new(args)
622     received = []
623     th = Thread.new do
624       r = IO.select([a, b])
625       x = r[0][0].accept
626       received << x.gets
627       x.close
629       r = IO.select([a, b])
630       x = r[0][0].accept
631       received << x.gets
632       x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
633       x.close
634     end
635     expect = %w(http://0/a http://0/b)
636     assert_equal expect, c.get_paths("f")
637     th.join
638     assert_equal 2, received.size
639     assert_equal received[0], received[1]
640   end
642   def test_idempotent_command_slow
643     ip = "127.0.0.1"
644     a = TCPServer.new(ip, 0)
645     hosts = [ "#{ip}:#{a.addr[1]}" ]
646     q = Queue.new
647     timeout = 1
648     args = { :hosts => hosts, :domain => "foo", :timeout => timeout }
649     c = MogileFS::MogileFS.new(args)
650     secs = timeout + 1
651     th = Thread.new do
652       close_later = []
653       x = a.accept
654       close_later << x
655       line = x.gets
656       %r{key=(\w+)} =~ line
658       sleep(secs) # cause the client to timeout:
660       begin
661         x.write("OK paths=1&path1=http://0/#{$1}\r\n")
662       rescue Errno::EPIPE
663         # EPIPE may or may not get raised due to timing issue,
664         # we don't care either way
665       rescue => e
666         flunk("#{e.message} (#{e.class})")
667       end
668       q << :continue_test
670       # client should start a new connection here
671       y = a.accept
672       close_later << y
673       line = y.gets
674       %r{key=(\w+)} =~ line
675       begin
676         y.write("OK paths=1&path1=http://0/#{$1}\r\n")
677       rescue => e
678         flunk("#{e.message} (#{e.class})")
679       end
681       # the client should've killed the old connection:
682       assert_raises(Errno::EPIPE) do
683         loop { x.write("OK paths=1&path1=http://0/#{$1}\r\n") }
684       end
686       close_later # main thread closes
687     end
688     assert_raises(MogileFS::UnreadableSocketError) do
689       c.get_paths("a")
690     end
691     assert_equal :continue_test, q.pop, "avoid race during test"
692     expect2 = %w(http://0/b)
693     assert_equal expect2, c.get_paths("b")
694     a.close
695     close_later = th.value
696     close_later.each { |io| assert_nil io.close }
697   end
699   def test_idempotent_command_response_truncated
700     ip = "127.0.0.1"
701     a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
702     hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
703     args = { :hosts => hosts, :domain => "foo" }
704     c = MogileFS::MogileFS.new(args)
705     received = []
706     th = Thread.new do
707       r = IO.select([a, b])
708       x = r[0][0].accept
709       received << x.gets
710       x.write("OK paths=2&path1=http://0/a&path2=http://0/")
711       x.close
713       r = IO.select([a, b])
714       x = r[0][0].accept
715       received << x.gets
716       x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
717       x.close
718     end
719     expect = %w(http://0/a http://0/b)
720     assert_equal expect, c.get_paths("f")
721     th.join
722     assert_equal 2, received.size
723     assert_equal received[0], received[1]
724   end
726   def test_non_idempotent_command_eof
727     ip = "127.0.0.1"
728     a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
729     hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
730     args = { :hosts => hosts, :domain => "foo" }
731     c = MogileFS::MogileFS.new(args)
732     received = []
733     th = Thread.new do
734       r = IO.select([a, b])
735       x = r[0][0].accept
736       received << x.gets
737       x.close
738     end
739     assert_raises(EOFError) { c.rename("a", "b") }
740     th.join
741     assert_equal 1, received.size
742   end
744   def test_list_keys_verbose_ordering # implementation detail
745     received = []
746     sock = TCPServer.new("127.0.0.1", 0)
747     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
748     c = MogileFS::MogileFS.new(nargs)
749     th = Thread.new do
750       a = sock.accept
751       %w(a b c d e).each do |key|
752         line = a.gets
753         cmd, args = line.split(/\s+/, 2)
754         args = c.backend.url_decode(args.strip)
755         assert_equal "file_info", cmd
756         assert_equal key, args["key"]
757       end
758       out = { "length" => 3, "devcount" => 6 }
759       %w(a b c d e).shuffle.each do |key|
760         out["key"] = key
761         a.write "OK #{c.backend.url_encode(out)}\r\n"
762       end
763       a.close
764     end
766     blk = lambda do |key, length, devcount|
767       received << [ key, length, devcount ]
768     end
769     c.list_keys_verbose(%w(a b c d e), blk)
770     th.join
771     received.map! { |(key,_,_)| key }
772     assert_equal %w(a b c d e), received
773     ensure
774       sock.close
775   end
777   def test_list_keys_verbose_retry_eof # implementation detail
778     received = []
779     sock = TCPServer.new("127.0.0.1", 0)
780     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
781     c = MogileFS::MogileFS.new(nargs)
782     th = Thread.new do
783       a = sock.accept
784       %w(a b c d e).each do |key|
785         line = a.gets
786         cmd, args = line.split(/\s+/, 2)
787         args = c.backend.url_decode(args.strip)
788         assert_equal "file_info", cmd
789         assert_equal key, args["key"]
790       end
791       out = { "length" => 3, "devcount" => 6 }
792       %w(d e).each do |key|
793         out["key"] = key
794         a.write "OK #{c.backend.url_encode(out)}\r\n"
795       end
796       a.close # trigger EOF
797       a = sock.accept # client will retry
798       %w(a b c).each do |key|
799         line = a.gets
800         cmd, args = line.split(/\s+/, 2)
801         args = c.backend.url_decode(args.strip)
802         assert_equal "file_info", cmd
803         assert_equal key, args["key"]
804         out["key"] = key
805         a.write "OK #{c.backend.url_encode(out)}\r\n"
806       end
807       a.close
808     end
810     blk = lambda do |key, length, devcount|
811       received << [ key, length, devcount ]
812     end
813     c.list_keys_verbose(%w(a b c d e), blk)
814     th.join
815     received.map! { |(key,_,_)| key }
816     assert_equal %w(a b c d e), received
817     ensure
818       sock.close
819   end
821   def test_list_keys_verbose_retry_truncated # implementation detail
822     received = []
823     sock = TCPServer.new("127.0.0.1", 0)
824     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
825     c = MogileFS::MogileFS.new(nargs)
826     th = Thread.new do
827       a = sock.accept
828       %w(a b c d e).each do |key|
829         line = a.gets
830         cmd, args = line.split(/\s+/, 2)
831         args = c.backend.url_decode(args.strip)
832         assert_equal "file_info", cmd
833         assert_equal key, args["key"]
834       end
835       out = { "length" => 3, "devcount" => 6 }
836       out["key"] = "a"
837       a.write "OK #{c.backend.url_encode(out)}\r\n"
838       out["key"] = "b"
839       a.write "OK #{c.backend.url_encode(out)}"
840       a.close # trigger EOF
842       a = sock.accept # client will retry
843       %w(b c d e).each do |key|
844         line = a.gets
845         cmd, args = line.split(/\s+/, 2)
846         args = c.backend.url_decode(args.strip)
847         assert_equal "file_info", cmd
848         assert_equal key, args["key"]
849         out["key"] = key
850         a.write "OK #{c.backend.url_encode(out)}\r\n"
851       end
852       a.close
853     end
855     blk = lambda do |key, length, devcount|
856       received << [ key, length, devcount ]
857     end
858     c.list_keys_verbose(%w(a b c d e), blk)
859     th.join
860     received.map! { |(key,_,_)| key }
861     assert_equal %w(a b c d e), received
862     ensure
863       sock.close
864   end
866   def test_sleep
867     @backend.sleep = {}
868     assert_equal({}, @client.sleep(2))
869   end
871   private
873     # tested with 1000, though it takes a while
874     def nr_chunks
875       ENV['NR_CHUNKS'] ? ENV['NR_CHUNKS'].to_i : 10
876     end