client: small speedup for list_keys_verbose
[ruby-mogilefs-client.git] / test / test_mogilefs.rb
blob247feca0b91c2b02b69255146a7fdb9a22f9801f
1 # -*- encoding: binary -*-
2 require './test/setup'
3 require 'stringio'
4 require 'tempfile'
5 require 'fileutils'
7 class TestMogileFS__MogileFS < TestMogileFS
8   def setup
9     @klass = MogileFS::MogileFS
10     super
11   end
13   def test_initialize
14     assert_equal 'test', @client.domain
16     assert_raises ArgumentError do
17       MogileFS::MogileFS.new :hosts => ['kaa:6001']
18     end
19   end
21   def test_get_file_data_http
22     tmp = Tempfile.new('accept')
23     accept = File.open(tmp.path, "ab")
24     svr = Proc.new do |serv, port|
25       client, _ = serv.accept
26       client.sync = true
27       readed = client.recv(4096, 0)
28       assert(readed =~ \
29             %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
30       accept.syswrite('.')
31       client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
32       client.close
33     end
34     t1 = TempServer.new(svr)
35     t2 = TempServer.new(svr)
36     path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
37     path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
39     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
41     assert_equal 'data!', @client.get_file_data('key')
42     assert_equal 1, accept.stat.size
43     ensure
44       TempServer.destroy_all!
45   end
47   def test_get_file_data_http_not_found_failover
48     tmp = Tempfile.new('accept')
49     accept = File.open(tmp.path, 'ab')
50     svr1 = Proc.new do |serv, port|
51       client, _ = serv.accept
52       client.sync = true
53       readed = client.recv(4096, 0)
54       assert(readed =~ \
55             %r{\AGET /dev1/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
56       accept.syswrite('.')
57       client.send("HTTP/1.0 404 Not Found\r\n\r\ndata!", 0)
58       client.close
59     end
61     svr2 = Proc.new do |serv, port|
62       client, _ = serv.accept
63       client.sync = true
64       readed = client.recv(4096, 0)
65       assert(readed =~ \
66             %r{\AGET /dev2/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
67       accept.syswrite('.')
68       client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
69       client.close
70     end
72     t1 = TempServer.new(svr1)
73     t2 = TempServer.new(svr2)
74     path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
75     path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
76     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
78     assert_equal 'data!', @client.get_file_data('key')
79     assert_equal 2, accept.stat.size
80     ensure
81       TempServer.destroy_all!
82   end
84   def test_get_file_data_http_block
85     tmpfp = Tempfile.new('test_mogilefs.open_data')
86     nr = nr_chunks
87     chunk_size = 1024 * 1024
88     expect_size = nr * chunk_size
89     header = "HTTP/1.0 200 OK\r\n" \
90              "Content-Length: #{expect_size}\r\n\r\n"
91     assert_equal header.size, tmpfp.syswrite(header)
92     nr.times { assert_equal chunk_size, tmpfp.syswrite(' ' * chunk_size) }
93     assert_equal expect_size + header.size, File.size(tmpfp.path)
94     tmpfp.sysseek(0)
96     accept = Tempfile.new('accept')
97     svr = Proc.new do |serv, port|
98       client, _ = serv.accept
99       client.sync = true
100       accept.syswrite('.')
101       readed = client.recv(4096, 0)
102       assert(readed =~ \
103             %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
104       MogileFS.io.copy_stream(tmpfp, client)
105       client.close
106       exit 0
107     end
108     t1 = TempServer.new(svr)
109     t2 = TempServer.new(svr)
110     path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
111     path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
113     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
115     data = Tempfile.new('test_mogilefs.dest_data')
116     read_nr = nr = 0
117     @client.get_file_data('key') do |fp|
118       buf = ''
119       loop do
120         begin
121           fp.sysread(16384, buf)
122           read_nr = buf.size
123           nr += read_nr
124           assert_equal read_nr, data.syswrite(buf), "partial write"
125         rescue Errno::EAGAIN
126           retry
127         rescue EOFError
128           break
129         end
130       end
131     end
132     assert_equal expect_size, nr, "size mismatch"
133     assert_equal 1, accept.stat.size
134   end
136   def test_get_paths
137     path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
138     path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
140     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
142     expected = [ path1, path2 ]
144     assert_equal expected, @client.get_paths('key').sort
145   end
147   def test_get_uris
148     path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
149     path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
151     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
153     expected = [ URI.parse(path1), URI.parse(path2) ]
155     assert_equal expected, @client.get_uris('key')
156   end
159   def test_get_paths_unknown_key
160     @backend.get_paths = ['unknown_key', '']
162     assert_raises MogileFS::Backend::UnknownKeyError do
163       assert_equal nil, @client.get_paths('key')
164     end
165   end
167   def test_delete_existing
168     @backend.delete = { }
169     assert_nothing_raised do
170       @client.delete 'no_such_key'
171     end
172   end
174   def test_delete_nonexisting
175     @backend.delete = 'unknown_key', ''
176     assert_raises MogileFS::Backend::UnknownKeyError do
177       @client.delete('no_such_key')
178     end
179   end
181   def test_delete_readonly
182     @client.readonly = true
183     assert_raises MogileFS::ReadOnlyError do
184       @client.delete 'no_such_key'
185     end
186   end
188   def test_each_key
189     @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_2',
190                            'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
191     @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_4',
192                            'key_1' => 'new_key_3', 'key_2' => 'new_key_4' }
193     @backend.list_keys = { 'key_count' => 0, 'next_after' => 'new_key_4' }
194     keys = []
195     @client.each_key 'new' do |key|
196       keys << key
197     end
199     assert_equal %w[new_key_1 new_key_2 new_key_3 new_key_4], keys
200   end
202   def test_list_keys
203     @backend.list_keys = { 'key_count' => '2', 'next_after' => 'new_key_2',
204                            'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
206     keys, next_after = @client.list_keys 'new'
207     assert_equal ['new_key_1', 'new_key_2'], keys.sort
208     assert_equal 'new_key_2', next_after
209   end
211   def test_new_file_http
212     @client.readonly = true
213     assert_raises MogileFS::ReadOnlyError do
214       @client.new_file 'new_key', 'test'
215     end
216   end
218   def test_new_file_readonly
219     @client.readonly = true
220     assert_raises MogileFS::ReadOnlyError do
221       @client.new_file 'new_key', 'test'
222     end
223   end
225   def test_store_file_small_http
226     received = Tempfile.new('received')
227     to_store = Tempfile.new('small')
228     to_store.syswrite('data')
230     expected = "PUT /path HTTP/1.0\r\nContent-Length: 4\r\n\r\ndata"
231     t = TempServer.new(Proc.new do |serv, accept|
232       client, _ = serv.accept
233       client.sync = true
234       received.syswrite(client.read(expected.bytesize))
235       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
236       client.close
237     end)
239     @backend.create_open = {
240       'devid' => '1',
241       'path' => "http://127.0.0.1:#{t.port}/path",
242     }
243     nr = @client.store_file 'new_key', 'test', to_store.path
244     assert_equal 4, nr
245     received.sysseek(0)
246     assert_equal expected, received.sysread(4096)
247     ensure
248       TempServer.destroy_all!
249   end
251   def test_store_content_http
252     received = Tempfile.new('received')
253     expected = "PUT /path HTTP/1.0\r\nContent-Length: 4\r\n\r\ndata"
255     t = TempServer.new(Proc.new do |serv, accept|
256       client, _ = serv.accept
257       client.sync = true
258       seen = ""
259       while seen !~ /\r\n\r\ndata/
260         buf = client.readpartial(4096)
261         seen << buf
262         received.syswrite(buf)
263       end
264       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
265       client.close
266     end)
268     @backend.create_open = {
269       'devid' => '1',
270       'path' => "http://127.0.0.1:#{t.port}/path",
271     }
273     nr = @client.store_content 'new_key', 'test', 'data'
274     assert nr
275     assert_equal 4, nr
277     received.sysseek(0)
278     assert_equal expected, received.sysread(4096)
279     ensure
280       TempServer.destroy_all!
281   end
284   def test_store_content_with_writer_callback
285     received = Tempfile.new('received')
286     expected = "PUT /path HTTP/1.0\r\nContent-Length: 40\r\n\r\n"
287     10.times do
288       expected += "data"
289     end
290     t = TempServer.new(Proc.new do |serv, accept|
291       client, _ = serv.accept
292       client.sync = true
293       nr = 0
294       loop do
295         buf = client.readpartial(8192) or break
296         break if buf.length == 0
297         assert_equal buf.length, received.syswrite(buf)
298         nr += buf.length
299         break if nr >= expected.size
300       end
301       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
302       client.close
303     end)
305     @backend.create_open = {
306       'devid' => '1',
307       'path' => "http://127.0.0.1:#{t.port}/path",
308     }
310     cbk = MogileFS::Util::StoreContent.new(40) do |write_callback|
311       10.times do
312         write_callback.call("data")
313       end
314     end
315     assert_equal 40, cbk.length
316     nr = @client.store_content('new_key', 'test', cbk)
317     assert_equal 40, nr
319     received.sysseek(0)
320     assert_equal expected, received.sysread(4096)
321     ensure
322       TempServer.destroy_all!
323   end
325   def test_store_content_multi_dest_failover_path
326     test_store_content_multi_dest_failover(true)
327   end
329   def test_store_content_multi_dest_failover(big_io = false)
330     received1 = Tempfile.new('received')
331     received2 = Tempfile.new('received')
332     expected = "PUT /path HTTP/1.0\r\nContent-Length: 4\r\n\r\ndata"
334     t1 = TempServer.new(Proc.new do |serv, accept|
335       client, _ = serv.accept
336       seen = ""
337       while seen !~ /\r\n\r\ndata/
338         buf = client.readpartial(4096)
339         seen << buf
340         received1.syswrite(buf)
341       end
342       client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
343       client.close
344     end)
346     t2 = TempServer.new(Proc.new do |serv, accept|
347       client, _ = serv.accept
348       seen = ""
349       while seen !~ /\r\n\r\ndata/
350         buf = client.readpartial(4096)
351         seen << buf
352         received2.syswrite(buf)
353       end
354       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
355       client.close
356     end)
358     @backend.create_open = {
359       'dev_count' => '2',
360       'devid_1' => '1',
361       'path_1' => "http://127.0.0.1:#{t1.port}/path",
362       'devid_2' => '2',
363       'path_2' => "http://127.0.0.1:#{t2.port}/path",
364     }
366     if big_io
367       tmp = Tempfile.new('data')
368       tmp.sync = true
369       tmp.write 'data'
370       nr = @client.store_file('new_key', 'test', tmp.path)
371       tmp.close!
372     else
373       nr = @client.store_content 'new_key', 'test', 'data'
374     end
375     assert_equal 4, nr
376     received1.sysseek(0)
377     received2.sysseek(0)
378     assert_equal expected, received1.sysread(4096)
379     assert_equal expected, received2.sysread(4096)
380     ensure
381       TempServer.destroy_all!
382   end
384   def test_store_content_http_fail
385     t = TempServer.new(Proc.new do |serv, accept|
386       client, _ = serv.accept
387       client.sync = true
388       client.recv(4096, 0)
389       client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
390       client.close
391     end)
393     @backend.create_open = {
394       'devid' => '1',
395       'path' => "http://127.0.0.1:#{t.port}/path",
396     }
398     assert_raises MogileFS::HTTPFile::NoStorageNodesError do
399       @client.store_content 'new_key', 'test', 'data'
400     end
401   end
403   def test_store_content_http_empty
404     received = Tempfile.new('received')
405     expected = "PUT /path HTTP/1.0\r\nContent-Length: 0\r\n\r\n"
406     t = TempServer.new(Proc.new do |serv, accept|
407       client, _ = serv.accept
408       client.sync = true
409       received.syswrite(client.recv(4096, 0))
410       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
411       client.close
412     end)
414     @backend.create_open = {
415       'devid' => '1',
416       'path' => "http://127.0.0.1:#{t.port}/path",
417     }
419     nr = @client.store_content 'new_key', 'test', ''
420     assert_equal 0, nr
421     received.sysseek(0)
422     assert_equal expected, received.sysread(4096)
423   end
425   def test_store_content_nfs
426     @backend.create_open = {
427       'dev_count' => '1',
428       'devid_1' => '1',
429       'path_1' => '/path',
430     }
431     assert_raises MogileFS::UnsupportedPathError do
432       @client.store_content 'new_key', 'test', 'data'
433     end
434   end
436   def test_new_file_http_large
437     expect = Tempfile.new('test_mogilefs.expect')
438     to_put = Tempfile.new('test_mogilefs.to_put')
439     received = Tempfile.new('test_mogilefs.received')
441     nr = nr_chunks
442     chunk_size = 1024 * 1024
443     expect_size = nr * chunk_size
445     header = "PUT /path HTTP/1.0\r\n" \
446              "Content-Length: #{expect_size}\r\n\r\n"
447     assert_equal header.size, expect.syswrite(header)
448     nr.times do
449       assert_equal chunk_size, expect.syswrite(' ' * chunk_size)
450       assert_equal chunk_size, to_put.syswrite(' ' * chunk_size)
451     end
452     assert_equal expect_size + header.size, expect.stat.size
453     assert_equal expect_size, to_put.stat.size
455     readed = Tempfile.new('readed')
456     t = TempServer.new(Proc.new do |serv, accept|
457       client, _ = serv.accept
458       client.sync = true
459       nr = 0
460       loop do
461         buf = client.readpartial(8192) or break
462         break if buf.length == 0
463         assert_equal buf.length, received.syswrite(buf)
464         nr += buf.length
465         break if nr >= expect.stat.size
466       end
467       readed.syswrite("#{nr}")
468       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
469       client.close
470     end)
472     @backend.create_open = {
473       'devid' => '1',
474       'path' => "http://127.0.0.1:#{t.port}/path",
475     }
477     orig_size = to_put.size
478     nr = @client.store_file('new_key', 'test', to_put.path)
479     assert nr, nr.inspect
480     assert_equal orig_size, nr
481     assert_equal orig_size, to_put.size
482     readed.sysseek(0)
483     assert_equal expect.stat.size, readed.sysread(4096).to_i
485     ENV['PATH'].split(/:/).each do |path|
486       cmp_bin = "#{path}/cmp"
487       File.executable?(cmp_bin) or next
488       # puts "running #{cmp_bin} #{expect.path} #{received.path}"
489       assert( system(cmp_bin, expect.path, received.path) )
490       break
491     end
493     ensure
494       TempServer.destroy_all!
495   end
497   def test_store_content_readonly
498     @client.readonly = true
500     assert_raises MogileFS::ReadOnlyError do
501       @client.store_content 'new_key', 'test', nil
502     end
503   end
505   def test_store_file_readonly
506     @client.readonly = true
507     assert_raises MogileFS::ReadOnlyError do
508       @client.store_file 'new_key', 'test', nil
509     end
510   end
512   def test_rename_existing
513     @backend.rename = {}
515     assert_nil @client.rename('from_key', 'to_key')
516   end
518   def test_rename_nonexisting
519     @backend.rename = 'unknown_key', ''
521     assert_raises MogileFS::Backend::UnknownKeyError do
522       @client.rename('from_key', 'to_key')
523     end
524   end
526   def test_rename_no_key
527     @backend.rename = 'no_key', 'no_key'
529     e = assert_raises MogileFS::Backend::NoKeyError do
530       @client.rename 'new_key', 'test'
531     end
533     assert_equal 'no_key', e.message
534   end
536   def test_rename_readonly
537     @client.readonly = true
539     e = assert_raises MogileFS::ReadOnlyError do
540       @client.rename 'new_key', 'test'
541     end
543     assert_equal 'readonly mogilefs', e.message
544   end
546   def assert_get_paths_args(expect, *args)
547     sock = TCPServer.new("127.0.0.1", 0)
548     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
549     c = MogileFS::MogileFS.new(nargs)
550     received = []
551     th = Thread.new do
552       a = sock.accept
553       line = a.gets
554       received << line
555       a.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
556       a.close
557     end
558     paths_expect = %w(http://0/a http://0/b)
559     assert_equal paths_expect, c.get_paths("f", *args)
560     th.join
561     assert_equal 1, received.size
562     tmp = c.backend.url_decode(received[0].split(/\s+/)[1])
563     assert_equal "f", tmp.delete("key")
564     assert_equal "foo", tmp.delete("domain")
565     assert_equal expect, tmp
566     c.backend.shutdown
567     ensure
568       sock.close
569   end
571   def test_get_paths_args
572     assert_get_paths_args({"noverify"=>"1", "zone"=>""})
573     assert_get_paths_args({"noverify"=>"0", "zone"=>""}, false)
574     assert_get_paths_args({"noverify"=>"0", "zone"=>""}, :noverify=>false)
575     assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"}, true, "alt")
576     assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"},
577                           {:noverify => true, :zone => "alt"})
578     assert_get_paths_args({"noverify"=>"1", "zone"=>"alt","pathcount"=>"666"},
579                           {:noverify => true, :zone => "alt", :pathcount=>666})
580   end
582   def test_idempotent_command_eof
583     ip = "127.0.0.1"
584     a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
585     hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
586     args = { :hosts => hosts, :domain => "foo" }
587     c = MogileFS::MogileFS.new(args)
588     received = []
589     th = Thread.new do
590       r = IO.select([a, b])
591       x = r[0][0].accept
592       received << x.gets
593       x.close
595       r = IO.select([a, b])
596       x = r[0][0].accept
597       received << x.gets
598       x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
599       x.close
600     end
601     expect = %w(http://0/a http://0/b)
602     assert_equal expect, c.get_paths("f")
603     th.join
604     assert_equal 2, received.size
605     assert_equal received[0], received[1]
606   end
608   def test_idempotent_command_slow
609     ip = "127.0.0.1"
610     a = TCPServer.new(ip, 0)
611     hosts = [ "#{ip}:#{a.addr[1]}" ]
612     q = Queue.new
613     timeout = 1
614     args = { :hosts => hosts, :domain => "foo", :timeout => timeout }
615     c = MogileFS::MogileFS.new(args)
616     received = []
617     secs = timeout + 1
618     th = Thread.new do
619       close_later = []
620       x = a.accept
621       close_later << x
622       line = x.gets
623       %r{key=(\w+)} =~ line
625       sleep(secs) # cause the client to timeout:
627       begin
628         x.write("OK paths=1&path1=http://0/#{$1}\r\n")
629       rescue Errno::EPIPE
630         # EPIPE may or may not get raised due to timing issue,
631         # we don't care either way
632       rescue => e
633         flunk("#{e.message} (#{e.class})")
634       end
635       q << :continue_test
637       # client should start a new connection here
638       y = a.accept
639       close_later << y
640       line = y.gets
641       %r{key=(\w+)} =~ line
642       begin
643         y.write("OK paths=1&path1=http://0/#{$1}\r\n")
644       rescue => e
645         flunk("#{e.message} (#{e.class})")
646       end
648       # the client should've killed the old connection:
649       assert_raises(Errno::EPIPE) do
650         loop { x.write("OK paths=1&path1=http://0/#{$1}\r\n") }
651       end
653       close_later # main thread closes
654     end
655     assert_raises(MogileFS::UnreadableSocketError) do
656       c.get_paths("a")
657     end
658     assert_equal :continue_test, q.pop, "avoid race during test"
659     expect2 = %w(http://0/b)
660     assert_equal expect2, c.get_paths("b")
661     a.close
662     close_later = th.value
663     close_later.each { |io| assert_nil io.close }
664   end
666   def test_idempotent_command_response_truncated
667     ip = "127.0.0.1"
668     a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
669     hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
670     args = { :hosts => hosts, :domain => "foo" }
671     c = MogileFS::MogileFS.new(args)
672     received = []
673     th = Thread.new do
674       r = IO.select([a, b])
675       x = r[0][0].accept
676       received << x.gets
677       x.write("OK paths=2&path1=http://0/a&path2=http://0/")
678       x.close
680       r = IO.select([a, b])
681       x = r[0][0].accept
682       received << x.gets
683       x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
684       x.close
685     end
686     expect = %w(http://0/a http://0/b)
687     assert_equal expect, c.get_paths("f")
688     th.join
689     assert_equal 2, received.size
690     assert_equal received[0], received[1]
691   end
693   def test_non_idempotent_command_eof
694     ip = "127.0.0.1"
695     a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
696     hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
697     args = { :hosts => hosts, :domain => "foo" }
698     c = MogileFS::MogileFS.new(args)
699     received = []
700     th = Thread.new do
701       r = IO.select([a, b])
702       x = r[0][0].accept
703       received << x.gets
704       x.close
705     end
706     assert_raises(EOFError) { c.rename("a", "b") }
707     th.join
708     assert_equal 1, received.size
709   end
711   def test_list_keys_verbose_ordering # implementation detail
712     received = []
713     sock = TCPServer.new("127.0.0.1", 0)
714     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
715     c = MogileFS::MogileFS.new(nargs)
716     th = Thread.new do
717       a = sock.accept
718       %w(a b c d e).each do |key|
719         line = a.gets
720         cmd, args = line.split(/\s+/, 2)
721         args = c.backend.url_decode(args.strip)
722         assert_equal "file_info", cmd
723         assert_equal key, args["key"]
724       end
725       out = { "length" => 3, "devcount" => 6 }
726       %w(a b c d e).shuffle.each do |key|
727         out["key"] = key
728         a.write "OK #{c.backend.url_encode(out)}\r\n"
729       end
730       a.close
731     end
733     blk = lambda do |key, length, devcount|
734       received << [ key, length, devcount ]
735     end
736     c.list_keys_verbose(%w(a b c d e), blk)
737     th.join
738     received.map! { |(key,_,_)| key }
739     assert_equal %w(a b c d e), received
740     ensure
741       sock.close
742   end
744   def test_list_keys_verbose_retry_eof # implementation detail
745     received = []
746     sock = TCPServer.new("127.0.0.1", 0)
747     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
748     c = MogileFS::MogileFS.new(nargs)
749     th = Thread.new do
750       a = sock.accept
751       %w(a b c d e).each do |key|
752         line = a.gets
753         cmd, args = line.split(/\s+/, 2)
754         args = c.backend.url_decode(args.strip)
755         assert_equal "file_info", cmd
756         assert_equal key, args["key"]
757       end
758       out = { "length" => 3, "devcount" => 6 }
759       %w(d e).each do |key|
760         out["key"] = key
761         a.write "OK #{c.backend.url_encode(out)}\r\n"
762       end
763       a.close # trigger EOF
764       a = sock.accept # client will retry
765       %w(a b c).each do |key|
766         line = a.gets
767         cmd, args = line.split(/\s+/, 2)
768         args = c.backend.url_decode(args.strip)
769         assert_equal "file_info", cmd
770         assert_equal key, args["key"]
771         out["key"] = key
772         a.write "OK #{c.backend.url_encode(out)}\r\n"
773       end
774       a.close
775     end
777     blk = lambda do |key, length, devcount|
778       received << [ key, length, devcount ]
779     end
780     c.list_keys_verbose(%w(a b c d e), blk)
781     th.join
782     received.map! { |(key,_,_)| key }
783     assert_equal %w(a b c d e), received
784     ensure
785       sock.close
786   end
788   def test_list_keys_verbose_retry_truncated # implementation detail
789     received = []
790     sock = TCPServer.new("127.0.0.1", 0)
791     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
792     c = MogileFS::MogileFS.new(nargs)
793     th = Thread.new do
794       a = sock.accept
795       %w(a b c d e).each do |key|
796         line = a.gets
797         cmd, args = line.split(/\s+/, 2)
798         args = c.backend.url_decode(args.strip)
799         assert_equal "file_info", cmd
800         assert_equal key, args["key"]
801       end
802       out = { "length" => 3, "devcount" => 6 }
803       out["key"] = "a"
804       a.write "OK #{c.backend.url_encode(out)}\r\n"
805       out["key"] = "b"
806       a.write "OK #{c.backend.url_encode(out)}"
807       a.close # trigger EOF
809       a = sock.accept # client will retry
810       %w(b c d e).each do |key|
811         line = a.gets
812         cmd, args = line.split(/\s+/, 2)
813         args = c.backend.url_decode(args.strip)
814         assert_equal "file_info", cmd
815         assert_equal key, args["key"]
816         out["key"] = key
817         a.write "OK #{c.backend.url_encode(out)}\r\n"
818       end
819       a.close
820     end
822     blk = lambda do |key, length, devcount|
823       received << [ key, length, devcount ]
824     end
825     c.list_keys_verbose(%w(a b c d e), blk)
826     th.join
827     received.map! { |(key,_,_)| key }
828     assert_equal %w(a b c d e), received
829     ensure
830       sock.close
831   end
833   def test_sleep
834     @backend.sleep = {}
835     assert_nothing_raised do
836       assert_equal({}, @client.sleep(2))
837     end
838   end
840   private
842     # tested with 1000, though it takes a while
843     def nr_chunks
844       ENV['NR_CHUNKS'] ? ENV['NR_CHUNKS'].to_i : 10
845     end