test: remove assert_nothing_raised checks
[ruby-mogilefs-client.git] / test / test_mogilefs.rb
blobdbaddf34bb47bd42fa4797ee7bacd2d04e70bcef
1 # -*- encoding: binary -*-
2 require './test/setup'
3 require 'stringio'
4 require 'tempfile'
5 require 'fileutils'
7 class TestMogileFS__MogileFS < TestMogileFS
8   def setup
9     @klass = MogileFS::MogileFS
10     super
11   end
13   def test_initialize
14     assert_equal 'test', @client.domain
16     assert_raises ArgumentError do
17       MogileFS::MogileFS.new :hosts => ['kaa:6001']
18     end
19   end
21   def test_get_file_data_http
22     tmp = Tempfile.new('accept')
23     accept = File.open(tmp.path, "ab")
24     svr = Proc.new do |serv, port|
25       client, _ = serv.accept
26       client.sync = true
27       readed = client.recv(4096, 0)
28       assert(readed =~ \
29             %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
30       accept.syswrite('.')
31       client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
32       client.close
33     end
34     t1 = TempServer.new(svr)
35     t2 = TempServer.new(svr)
36     path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
37     path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
39     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
41     assert_equal 'data!', @client.get_file_data('key')
42     assert_equal 1, accept.stat.size
43     ensure
44       TempServer.destroy_all!
45   end
47   def test_get_file_data_http_not_found_failover
48     tmp = Tempfile.new('accept')
49     accept = File.open(tmp.path, 'ab')
50     svr1 = Proc.new do |serv, port|
51       client, _ = serv.accept
52       client.sync = true
53       readed = client.recv(4096, 0)
54       assert(readed =~ \
55             %r{\AGET /dev1/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
56       accept.syswrite('.')
57       client.send("HTTP/1.0 404 Not Found\r\n\r\ndata!", 0)
58       client.close
59     end
61     svr2 = Proc.new do |serv, port|
62       client, _ = serv.accept
63       client.sync = true
64       readed = client.recv(4096, 0)
65       assert(readed =~ \
66             %r{\AGET /dev2/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
67       accept.syswrite('.')
68       client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
69       client.close
70     end
72     t1 = TempServer.new(svr1)
73     t2 = TempServer.new(svr2)
74     path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
75     path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
76     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
78     assert_equal 'data!', @client.get_file_data('key')
79     assert_equal 2, accept.stat.size
80     ensure
81       TempServer.destroy_all!
82   end
84   def test_get_file_data_http_block
85     tmpfp = Tempfile.new('test_mogilefs.open_data')
86     nr = nr_chunks
87     chunk_size = 1024 * 1024
88     expect_size = nr * chunk_size
89     header = "HTTP/1.0 200 OK\r\n" \
90              "Content-Length: #{expect_size}\r\n\r\n"
91     assert_equal header.size, tmpfp.syswrite(header)
92     nr.times { assert_equal chunk_size, tmpfp.syswrite(' ' * chunk_size) }
93     assert_equal expect_size + header.size, File.size(tmpfp.path)
94     tmpfp.sysseek(0)
96     accept = Tempfile.new('accept')
97     svr = Proc.new do |serv, port|
98       client, _ = serv.accept
99       client.sync = true
100       accept.syswrite('.')
101       readed = client.recv(4096, 0)
102       assert(readed =~ \
103             %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
104       MogileFS.io.copy_stream(tmpfp, client)
105       client.close
106       exit 0
107     end
108     t1 = TempServer.new(svr)
109     t2 = TempServer.new(svr)
110     path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
111     path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
113     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
115     data = Tempfile.new('test_mogilefs.dest_data')
116     read_nr = nr = 0
117     @client.get_file_data('key') do |fp|
118       buf = ''
119       loop do
120         begin
121           fp.sysread(16384, buf)
122           read_nr = buf.size
123           nr += read_nr
124           assert_equal read_nr, data.syswrite(buf), "partial write"
125         rescue Errno::EAGAIN
126           retry
127         rescue EOFError
128           break
129         end
130       end
131     end
132     assert_equal expect_size, nr, "size mismatch"
133     assert_equal 1, accept.stat.size
134   end
136   def test_get_paths
137     path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
138     path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
140     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
142     expected = [ path1, path2 ]
144     assert_equal expected, @client.get_paths('key').sort
145   end
147   def test_get_uris
148     path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
149     path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
151     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
153     expected = [ URI.parse(path1), URI.parse(path2) ]
155     assert_equal expected, @client.get_uris('key')
156   end
159   def test_get_paths_unknown_key
160     @backend.get_paths = ['unknown_key', '']
162     assert_raises MogileFS::Backend::UnknownKeyError do
163       assert_equal nil, @client.get_paths('key')
164     end
165   end
167   def test_delete_existing
168     @backend.delete = { }
169     @client.delete 'no_such_key'
170   end
172   def test_delete_nonexisting
173     @backend.delete = 'unknown_key', ''
174     assert_raises MogileFS::Backend::UnknownKeyError do
175       @client.delete('no_such_key')
176     end
177   end
179   def test_delete_readonly
180     @client.readonly = true
181     assert_raises MogileFS::ReadOnlyError do
182       @client.delete 'no_such_key'
183     end
184   end
186   def test_each_key
187     @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_2',
188                            'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
189     @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_4',
190                            'key_1' => 'new_key_3', 'key_2' => 'new_key_4' }
191     @backend.list_keys = { 'key_count' => 0, 'next_after' => 'new_key_4' }
192     keys = []
193     @client.each_key 'new' do |key|
194       keys << key
195     end
197     assert_equal %w[new_key_1 new_key_2 new_key_3 new_key_4], keys
198   end
200   def test_list_keys
201     @backend.list_keys = { 'key_count' => '2', 'next_after' => 'new_key_2',
202                            'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
204     keys, next_after = @client.list_keys 'new'
205     assert_equal ['new_key_1', 'new_key_2'], keys.sort
206     assert_equal 'new_key_2', next_after
207   end
209   def test_new_file_http
210     @client.readonly = true
211     assert_raises MogileFS::ReadOnlyError do
212       @client.new_file 'new_key', 'test'
213     end
214   end
216   def test_new_file_readonly
217     @client.readonly = true
218     assert_raises MogileFS::ReadOnlyError do
219       @client.new_file 'new_key', 'test'
220     end
221   end
223   def test_store_file_small_http
224     received = Tempfile.new('received')
225     to_store = Tempfile.new('small')
226     to_store.syswrite('data')
228     t = TempServer.new(Proc.new do |serv, accept|
229       client, _ = serv.accept
230       while buf = client.readpartial(666)
231         received.syswrite(buf)
232         break if buf =~ /data/
233       end
234       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
235       client.close
236     end)
238     @backend.create_open = {
239       'devid' => '1',
240       'path' => "http://127.0.0.1:#{t.port}/path",
241     }
242     nr = @client.store_file 'new_key', 'test', to_store.path
243     assert_equal 4, nr
244     received.sysseek(0)
246     a = received.sysread(999999).split(/\r\n/)
247     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
248     assert_equal("data", a[-1])
249     assert_equal("", a[-2])
250     assert a.grep(%r{\AContent-Length: 4\z})[0]
251     ensure
252       TempServer.destroy_all!
253   end
255   def test_store_content_http
256     received = Tempfile.new('received')
258     t = TempServer.new(Proc.new do |serv, accept|
259       client, _ = serv.accept
260       client.sync = true
261       seen = ""
262       while seen !~ /\r\n\r\ndata/
263         buf = client.readpartial(4096)
264         seen << buf
265         received.syswrite(buf)
266       end
267       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
268       client.close
269     end)
271     @backend.create_open = {
272       'devid' => '1',
273       'path' => "http://127.0.0.1:#{t.port}/path",
274     }
276     nr = @client.store_content 'new_key', 'test', 'data'
277     assert nr
278     assert_equal 4, nr
280     received.sysseek(0)
281     a = received.sysread(999999).split(/\r\n/)
282     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
283     assert_equal("data", a[-1])
284     assert_equal("", a[-2])
285     assert a.grep(%r{\AContent-Length: 4\z})[0]
286     ensure
287       TempServer.destroy_all!
288   end
291   def test_store_content_with_writer_callback
292     received = Tempfile.new('received')
293     expected = "PUT /path HTTP/1.0\r\nContent-Length: 40\r\n\r\n"
294     10.times do
295       expected += "data"
296     end
297     t = TempServer.new(Proc.new do |serv, accept|
298       client, _ = serv.accept
299       client.sync = true
300       nr = 0
301       seen = ''
302       loop do
303         buf = client.readpartial(8192) or break
304         break if buf.length == 0
305         assert_equal buf.length, received.syswrite(buf)
306         nr += buf.length
307         seen << buf
308         break if seen =~ /\r\n\r\n(?:data){10}/
309       end
310       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
311       client.close
312     end)
314     @backend.create_open = {
315       'devid' => '1',
316       'path' => "http://127.0.0.1:#{t.port}/path",
317     }
319     cbk = MogileFS::Util::StoreContent.new(40) do |write_callback|
320       10.times do
321         write_callback.call("data")
322       end
323     end
324     assert_equal 40, cbk.length
325     nr = @client.store_content('new_key', 'test', cbk)
326     assert_equal 40, nr
328     received.sysseek(0)
329     a = received.sysread(999999).split(/\r\n/)
330     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
331     assert_equal("data" * 10, a[-1])
332     assert_equal("", a[-2])
333     assert a.grep(%r{\AContent-Length: 40\z})[0]
334     ensure
335       TempServer.destroy_all!
336   end
338   def test_store_content_multi_dest_failover_path
339     test_store_content_multi_dest_failover(true)
340   end
342   def test_store_content_multi_dest_failover(big_io = false)
343     received1 = Tempfile.new('received')
344     received2 = Tempfile.new('received')
346     t1 = TempServer.new(Proc.new do |serv, accept|
347       client, _ = serv.accept
348       seen = ""
349       while seen !~ /\r\n\r\ndata/
350         buf = client.readpartial(4096)
351         seen << buf
352         received1.syswrite(buf)
353       end
354       client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
355       client.close
356     end)
358     t2 = TempServer.new(Proc.new do |serv, accept|
359       client, _ = serv.accept
360       seen = ""
361       while seen !~ /\r\n\r\ndata/
362         buf = client.readpartial(4096)
363         seen << buf
364         received2.syswrite(buf)
365       end
366       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
367       client.close
368     end)
370     @backend.create_open = {
371       'dev_count' => '2',
372       'devid_1' => '1',
373       'path_1' => "http://127.0.0.1:#{t1.port}/path",
374       'devid_2' => '2',
375       'path_2' => "http://127.0.0.1:#{t2.port}/path",
376     }
378     if big_io
379       tmp = Tempfile.new('data')
380       tmp.sync = true
381       tmp.write 'data'
382       nr = @client.store_file('new_key', 'test', tmp.path)
383       tmp.close!
384     else
385       nr = @client.store_content 'new_key', 'test', 'data'
386     end
387     assert_equal 4, nr
388     received1.sysseek(0)
389     received2.sysseek(0)
390     a = received1.sysread(4096).split(/\r\n/)
391     b = received2.sysread(4096).split(/\r\n/)
392     assert_equal a[0], b[0]
393     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
394     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, b[0])
395     assert_equal("data", a[-1])
396     assert_equal("data", b[-1])
397     assert_equal("", a[-2])
398     assert_equal("", b[-2])
399     assert a.grep(%r{\AContent-Length: 4\z})[0]
400     assert b.grep(%r{\AContent-Length: 4\z})[0]
401     ensure
402       TempServer.destroy_all!
403   end
405   def test_store_content_http_fail
406     t = TempServer.new(Proc.new do |serv, accept|
407       client, _ = serv.accept
408       client.sync = true
409       client.recv(4096, 0)
410       client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
411       client.close
412     end)
414     @backend.create_open = {
415       'devid' => '1',
416       'path' => "http://127.0.0.1:#{t.port}/path",
417     }
419     assert_raises MogileFS::HTTPFile::NoStorageNodesError do
420       @client.store_content 'new_key', 'test', 'data'
421     end
422   end
424   def test_store_content_http_empty
425     received = Tempfile.new('received')
426     t = TempServer.new(Proc.new do |serv, accept|
427       client, _ = serv.accept
428       client.sync = true
429       received.syswrite(client.recv(4096, 0))
430       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
431       client.close
432     end)
434     @backend.create_open = {
435       'devid' => '1',
436       'path' => "http://127.0.0.1:#{t.port}/path",
437     }
439     nr = @client.store_content 'new_key', 'test', ''
440     assert_equal 0, nr
441     received.sysseek(0)
442     a = received.sysread(4096).split(/\r\n/)
443     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
444     assert a.grep(%r{\AContent-Length: 0\z})[0]
445   end
447   def test_store_content_nfs
448     @backend.create_open = {
449       'dev_count' => '1',
450       'devid_1' => '1',
451       'path_1' => '/path',
452     }
453     assert_raises MogileFS::UnsupportedPathError do
454       @client.store_content 'new_key', 'test', 'data'
455     end
456   end
458   def test_new_file_http_large
459     expect = Tempfile.new('test_mogilefs.expect')
460     to_put = Tempfile.new('test_mogilefs.to_put')
461     received = Tempfile.new('test_mogilefs.received')
463     nr = nr_chunks
464     chunk_size = 1024 * 1024
465     expect_size = nr * chunk_size
467     header = "PUT /path HTTP/1.0\r\n" \
468              "Content-Length: #{expect_size}\r\n\r\n"
469     assert_equal header.size, expect.syswrite(header)
470     nr.times do
471       assert_equal chunk_size, expect.syswrite(' ' * chunk_size)
472       assert_equal chunk_size, to_put.syswrite(' ' * chunk_size)
473     end
474     assert_equal expect_size + header.size, expect.stat.size
475     assert_equal expect_size, to_put.stat.size
477     readed = Tempfile.new('readed')
478     t = TempServer.new(Proc.new do |serv, accept|
479       client, _ = serv.accept
480       client.sync = true
481       nr = 0
482       loop do
483         buf = client.readpartial(8192) or break
484         break if buf.length == 0
485         assert_equal buf.length, received.syswrite(buf)
486         nr += buf.length
487         break if nr >= expect.stat.size
488       end
489       readed.syswrite("#{nr}")
490       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
491       client.close
492     end)
494     @backend.create_open = {
495       'devid' => '1',
496       'path' => "http://127.0.0.1:#{t.port}/path",
497     }
499     orig_size = to_put.size
500     nr = @client.store_file('new_key', 'test', to_put.path)
501     assert nr, nr.inspect
502     assert_equal orig_size, nr
503     assert_equal orig_size, to_put.size
504     readed.sysseek(0)
505     assert_equal expect.stat.size, readed.sysread(4096).to_i
507     ENV['PATH'].split(/:/).each do |path|
508       cmp_bin = "#{path}/cmp"
509       File.executable?(cmp_bin) or next
510       # puts "running #{cmp_bin} #{expect.path} #{received.path}"
511       assert( system(cmp_bin, expect.path, received.path) )
512       break
513     end
515     ensure
516       TempServer.destroy_all!
517   end
519   def test_store_content_readonly
520     @client.readonly = true
522     assert_raises MogileFS::ReadOnlyError do
523       @client.store_content 'new_key', 'test', nil
524     end
525   end
527   def test_store_file_readonly
528     @client.readonly = true
529     assert_raises MogileFS::ReadOnlyError do
530       @client.store_file 'new_key', 'test', nil
531     end
532   end
534   def test_rename_existing
535     @backend.rename = {}
537     assert_nil @client.rename('from_key', 'to_key')
538   end
540   def test_rename_nonexisting
541     @backend.rename = 'unknown_key', ''
543     assert_raises MogileFS::Backend::UnknownKeyError do
544       @client.rename('from_key', 'to_key')
545     end
546   end
548   def test_rename_no_key
549     @backend.rename = 'no_key', 'no_key'
551     e = assert_raises MogileFS::Backend::NoKeyError do
552       @client.rename 'new_key', 'test'
553     end
555     assert_equal 'no_key', e.message
556   end
558   def test_rename_readonly
559     @client.readonly = true
561     e = assert_raises MogileFS::ReadOnlyError do
562       @client.rename 'new_key', 'test'
563     end
565     assert_equal 'readonly mogilefs', e.message
566   end
568   def assert_get_paths_args(expect, *args)
569     sock = TCPServer.new("127.0.0.1", 0)
570     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
571     c = MogileFS::MogileFS.new(nargs)
572     received = []
573     th = Thread.new do
574       a = sock.accept
575       line = a.gets
576       received << line
577       a.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
578       a.close
579     end
580     paths_expect = %w(http://0/a http://0/b)
581     assert_equal paths_expect, c.get_paths("f", *args)
582     th.join
583     assert_equal 1, received.size
584     tmp = c.backend.url_decode(received[0].split(/\s+/)[1])
585     assert_equal "f", tmp.delete("key")
586     assert_equal "foo", tmp.delete("domain")
587     assert_equal expect, tmp
588     c.backend.shutdown
589     ensure
590       sock.close
591   end
593   def test_get_paths_args
594     assert_get_paths_args({"noverify"=>"1", "zone"=>""})
595     assert_get_paths_args({"noverify"=>"0", "zone"=>""}, false)
596     assert_get_paths_args({"noverify"=>"0", "zone"=>""}, :noverify=>false)
597     assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"}, true, "alt")
598     assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"},
599                           {:noverify => true, :zone => "alt"})
600     assert_get_paths_args({"noverify"=>"1", "zone"=>"alt","pathcount"=>"666"},
601                           {:noverify => true, :zone => "alt", :pathcount=>666})
602   end
604   def test_idempotent_command_eof
605     ip = "127.0.0.1"
606     a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
607     hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
608     args = { :hosts => hosts, :domain => "foo" }
609     c = MogileFS::MogileFS.new(args)
610     received = []
611     th = Thread.new do
612       r = IO.select([a, b])
613       x = r[0][0].accept
614       received << x.gets
615       x.close
617       r = IO.select([a, b])
618       x = r[0][0].accept
619       received << x.gets
620       x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
621       x.close
622     end
623     expect = %w(http://0/a http://0/b)
624     assert_equal expect, c.get_paths("f")
625     th.join
626     assert_equal 2, received.size
627     assert_equal received[0], received[1]
628   end
630   def test_idempotent_command_slow
631     ip = "127.0.0.1"
632     a = TCPServer.new(ip, 0)
633     hosts = [ "#{ip}:#{a.addr[1]}" ]
634     q = Queue.new
635     timeout = 1
636     args = { :hosts => hosts, :domain => "foo", :timeout => timeout }
637     c = MogileFS::MogileFS.new(args)
638     secs = timeout + 1
639     th = Thread.new do
640       close_later = []
641       x = a.accept
642       close_later << x
643       line = x.gets
644       %r{key=(\w+)} =~ line
646       sleep(secs) # cause the client to timeout:
648       begin
649         x.write("OK paths=1&path1=http://0/#{$1}\r\n")
650       rescue Errno::EPIPE
651         # EPIPE may or may not get raised due to timing issue,
652         # we don't care either way
653       rescue => e
654         flunk("#{e.message} (#{e.class})")
655       end
656       q << :continue_test
658       # client should start a new connection here
659       y = a.accept
660       close_later << y
661       line = y.gets
662       %r{key=(\w+)} =~ line
663       begin
664         y.write("OK paths=1&path1=http://0/#{$1}\r\n")
665       rescue => e
666         flunk("#{e.message} (#{e.class})")
667       end
669       # the client should've killed the old connection:
670       assert_raises(Errno::EPIPE) do
671         loop { x.write("OK paths=1&path1=http://0/#{$1}\r\n") }
672       end
674       close_later # main thread closes
675     end
676     assert_raises(MogileFS::UnreadableSocketError) do
677       c.get_paths("a")
678     end
679     assert_equal :continue_test, q.pop, "avoid race during test"
680     expect2 = %w(http://0/b)
681     assert_equal expect2, c.get_paths("b")
682     a.close
683     close_later = th.value
684     close_later.each { |io| assert_nil io.close }
685   end
687   def test_idempotent_command_response_truncated
688     ip = "127.0.0.1"
689     a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
690     hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
691     args = { :hosts => hosts, :domain => "foo" }
692     c = MogileFS::MogileFS.new(args)
693     received = []
694     th = Thread.new do
695       r = IO.select([a, b])
696       x = r[0][0].accept
697       received << x.gets
698       x.write("OK paths=2&path1=http://0/a&path2=http://0/")
699       x.close
701       r = IO.select([a, b])
702       x = r[0][0].accept
703       received << x.gets
704       x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
705       x.close
706     end
707     expect = %w(http://0/a http://0/b)
708     assert_equal expect, c.get_paths("f")
709     th.join
710     assert_equal 2, received.size
711     assert_equal received[0], received[1]
712   end
714   def test_non_idempotent_command_eof
715     ip = "127.0.0.1"
716     a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
717     hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
718     args = { :hosts => hosts, :domain => "foo" }
719     c = MogileFS::MogileFS.new(args)
720     received = []
721     th = Thread.new do
722       r = IO.select([a, b])
723       x = r[0][0].accept
724       received << x.gets
725       x.close
726     end
727     assert_raises(EOFError) { c.rename("a", "b") }
728     th.join
729     assert_equal 1, received.size
730   end
732   def test_list_keys_verbose_ordering # implementation detail
733     received = []
734     sock = TCPServer.new("127.0.0.1", 0)
735     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
736     c = MogileFS::MogileFS.new(nargs)
737     th = Thread.new do
738       a = sock.accept
739       %w(a b c d e).each do |key|
740         line = a.gets
741         cmd, args = line.split(/\s+/, 2)
742         args = c.backend.url_decode(args.strip)
743         assert_equal "file_info", cmd
744         assert_equal key, args["key"]
745       end
746       out = { "length" => 3, "devcount" => 6 }
747       %w(a b c d e).shuffle.each do |key|
748         out["key"] = key
749         a.write "OK #{c.backend.url_encode(out)}\r\n"
750       end
751       a.close
752     end
754     blk = lambda do |key, length, devcount|
755       received << [ key, length, devcount ]
756     end
757     c.list_keys_verbose(%w(a b c d e), blk)
758     th.join
759     received.map! { |(key,_,_)| key }
760     assert_equal %w(a b c d e), received
761     ensure
762       sock.close
763   end
765   def test_list_keys_verbose_retry_eof # implementation detail
766     received = []
767     sock = TCPServer.new("127.0.0.1", 0)
768     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
769     c = MogileFS::MogileFS.new(nargs)
770     th = Thread.new do
771       a = sock.accept
772       %w(a b c d e).each do |key|
773         line = a.gets
774         cmd, args = line.split(/\s+/, 2)
775         args = c.backend.url_decode(args.strip)
776         assert_equal "file_info", cmd
777         assert_equal key, args["key"]
778       end
779       out = { "length" => 3, "devcount" => 6 }
780       %w(d e).each do |key|
781         out["key"] = key
782         a.write "OK #{c.backend.url_encode(out)}\r\n"
783       end
784       a.close # trigger EOF
785       a = sock.accept # client will retry
786       %w(a b c).each do |key|
787         line = a.gets
788         cmd, args = line.split(/\s+/, 2)
789         args = c.backend.url_decode(args.strip)
790         assert_equal "file_info", cmd
791         assert_equal key, args["key"]
792         out["key"] = key
793         a.write "OK #{c.backend.url_encode(out)}\r\n"
794       end
795       a.close
796     end
798     blk = lambda do |key, length, devcount|
799       received << [ key, length, devcount ]
800     end
801     c.list_keys_verbose(%w(a b c d e), blk)
802     th.join
803     received.map! { |(key,_,_)| key }
804     assert_equal %w(a b c d e), received
805     ensure
806       sock.close
807   end
809   def test_list_keys_verbose_retry_truncated # implementation detail
810     received = []
811     sock = TCPServer.new("127.0.0.1", 0)
812     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
813     c = MogileFS::MogileFS.new(nargs)
814     th = Thread.new do
815       a = sock.accept
816       %w(a b c d e).each do |key|
817         line = a.gets
818         cmd, args = line.split(/\s+/, 2)
819         args = c.backend.url_decode(args.strip)
820         assert_equal "file_info", cmd
821         assert_equal key, args["key"]
822       end
823       out = { "length" => 3, "devcount" => 6 }
824       out["key"] = "a"
825       a.write "OK #{c.backend.url_encode(out)}\r\n"
826       out["key"] = "b"
827       a.write "OK #{c.backend.url_encode(out)}"
828       a.close # trigger EOF
830       a = sock.accept # client will retry
831       %w(b c d e).each do |key|
832         line = a.gets
833         cmd, args = line.split(/\s+/, 2)
834         args = c.backend.url_decode(args.strip)
835         assert_equal "file_info", cmd
836         assert_equal key, args["key"]
837         out["key"] = key
838         a.write "OK #{c.backend.url_encode(out)}\r\n"
839       end
840       a.close
841     end
843     blk = lambda do |key, length, devcount|
844       received << [ key, length, devcount ]
845     end
846     c.list_keys_verbose(%w(a b c d e), blk)
847     th.join
848     received.map! { |(key,_,_)| key }
849     assert_equal %w(a b c d e), received
850     ensure
851       sock.close
852   end
854   def test_sleep
855     @backend.sleep = {}
856     assert_equal({}, @client.sleep(2))
857   end
859   private
861     # tested with 1000, though it takes a while
862     def nr_chunks
863       ENV['NR_CHUNKS'] ? ENV['NR_CHUNKS'].to_i : 10
864     end