Ruby mogilefs-client 3.12.2
[ruby-mogilefs-client.git] / test / test_mogilefs.rb
blob45d4452f95b80032b06386fb86c222a57f8a34a2
1 # -*- encoding: binary -*-
2 require './test/setup'
3 require 'stringio'
4 require 'tempfile'
5 require 'fileutils'
7 class TestMogileFS__MogileFS < TestMogileFS
8   def setup
9     @tmpsrv = []
10     @klass = MogileFS::MogileFS
11     super
12   end
14   def tmpsrv(blk)
15     t = TempServer.new(blk)
16     @tmpsrv << t
17     t
18   end
20   def read_headers(client)
21     headers = ""
22     while line = client.gets
23       headers << line
24       return headers if line == "\r\n"
25     end
26   end
28   def test_initialize
29     assert_equal 'test', @client.domain
31     assert_raises ArgumentError do
32       MogileFS::MogileFS.new :hosts => ['kaa:6001']
33     end
34   end
36   def test_get_file_data_http
37     tmp = Tempfile.new('accept')
38     accept = File.open(tmp.path, "ab")
39     svr = Proc.new do |serv, port|
40       client, _ = serv.accept
41       client.sync = true
42       readed = read_headers(client)
43       assert_match(
44         %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n},
45         readed)
46       accept.syswrite('.')
47       client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
48       client.close
49     end
50     t1 = tmpsrv(svr)
51     t2 = tmpsrv(svr)
52     path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
53     path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
55     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
57     assert_equal 'data!', @client.get_file_data('key')
58     assert_equal 1, accept.stat.size
59   end
61   def test_get_file_data_http_not_found_failover
62     tmp = Tempfile.new('accept')
63     accept = File.open(tmp.path, 'ab')
64     svr1 = Proc.new do |serv, port|
65       client, _ = serv.accept
66       client.sync = true
67       readed = read_headers(client)
68       assert_match(
69         %r{\AGET /dev1/0/000/000/0000000062\.fid HTTP/1.[01]\r\n},
70         readed)
71       accept.syswrite('.')
72       client.send("HTTP/1.0 404 Not Found\r\n\r\ndata!", 0)
73       client.close
74     end
76     svr2 = Proc.new do |serv, port|
77       client, _ = serv.accept
78       client.sync = true
79       readed = read_headers(client)
80       assert_match(
81         %r{\AGET /dev2/0/000/000/0000000062\.fid HTTP/1.[01].*\r\n},
82         readed)
83       accept.syswrite('.')
84       client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
85       client.close
86     end
88     t1 = tmpsrv(svr1)
89     t2 = tmpsrv(svr2)
90     path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
91     path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
92     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
94     assert_equal 'data!', @client.get_file_data('key')
95     assert_equal 2, accept.stat.size
96   end
98   def test_get_file_data_http_block
99     tmpfp = Tempfile.new('test_mogilefs.open_data')
100     nr = nr_chunks
101     chunk_size = 1024 * 1024
102     expect_size = nr * chunk_size
103     header = "HTTP/1.0 200 OK\r\n" \
104              "Content-Length: #{expect_size}\r\n\r\n"
105     assert_equal header.size, tmpfp.syswrite(header)
106     nr.times { assert_equal chunk_size, tmpfp.syswrite(' ' * chunk_size) }
107     assert_equal expect_size + header.size, File.size(tmpfp.path)
108     tmpfp.sysseek(0)
110     accept = Tempfile.new('accept')
111     svr = Proc.new do |serv, port|
112       client, _ = serv.accept
113       client.sync = true
114       accept.syswrite('.')
115       readed = read_headers(client)
116       assert_match(
117         %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n},
118         readed)
119       MogileFS.io.copy_stream(tmpfp, client)
120       client.close
121       exit 0
122     end
123     t1 = tmpsrv(svr)
124     t2 = tmpsrv(svr)
125     path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
126     path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
128     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
130     data = Tempfile.new('test_mogilefs.dest_data')
131     read_nr = nr = 0
132     @client.get_file_data('key') do |fp|
133       buf = ''
134       loop do
135         begin
136           fp.sysread(16384, buf)
137           read_nr = buf.size
138           nr += read_nr
139           assert_equal read_nr, data.syswrite(buf), "partial write"
140         rescue Errno::EAGAIN
141           retry
142         rescue EOFError
143           break
144         end
145       end
146     end
147     assert_equal expect_size, nr, "size mismatch"
148     assert_equal 1, accept.stat.size
149   end
151   def test_get_paths
152     path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
153     path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
155     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
157     expected = [ path1, path2 ]
159     assert_equal expected, @client.get_paths('key').sort
160   end
162   def test_get_uris
163     path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
164     path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
166     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
168     expected = [ URI.parse(path1), URI.parse(path2) ]
170     assert_equal expected, @client.get_uris('key')
171   end
174   def test_get_paths_unknown_key
175     @backend.get_paths = ['unknown_key', '']
177     assert_raises MogileFS::Backend::UnknownKeyError do
178       assert_equal nil, @client.get_paths('key')
179     end
180   end
182   def test_delete_existing
183     @backend.delete = { }
184     @client.delete 'no_such_key'
185   end
187   def test_delete_nonexisting
188     @backend.delete = 'unknown_key', ''
189     assert_raises MogileFS::Backend::UnknownKeyError do
190       @client.delete('no_such_key')
191     end
192   end
194   def test_delete_readonly
195     @client.readonly = true
196     assert_raises MogileFS::ReadOnlyError do
197       @client.delete 'no_such_key'
198     end
199   end
201   def test_each_key
202     @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_2',
203                            'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
204     @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_4',
205                            'key_1' => 'new_key_3', 'key_2' => 'new_key_4' }
206     @backend.list_keys = { 'key_count' => 0, 'next_after' => 'new_key_4' }
207     keys = []
208     @client.each_key 'new' do |key|
209       keys << key
210     end
212     assert_equal %w[new_key_1 new_key_2 new_key_3 new_key_4], keys
213   end
215   def test_list_keys
216     @backend.list_keys = { 'key_count' => '2', 'next_after' => 'new_key_2',
217                            'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
219     keys, next_after = @client.list_keys 'new'
220     assert_equal ['new_key_1', 'new_key_2'], keys.sort
221     assert_equal 'new_key_2', next_after
222   end
224   def test_new_file_http
225     @client.readonly = true
226     assert_raises MogileFS::ReadOnlyError do
227       @client.new_file 'new_key', 'test'
228     end
229   end
231   def test_new_file_readonly
232     @client.readonly = true
233     assert_raises MogileFS::ReadOnlyError do
234       @client.new_file 'new_key', 'test'
235     end
236   end
238   def test_store_file_small_http
239     received = Tempfile.new('received')
240     to_store = Tempfile.new('small')
241     to_store.syswrite('data')
243     t = tmpsrv(Proc.new do |serv, accept|
244       client, _ = serv.accept
245       while buf = client.readpartial(666)
246         received.syswrite(buf)
247         break if buf =~ /data/
248       end
249       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
250       client.close
251     end)
253     @backend.create_open = {
254       'devid' => '1',
255       'path' => "http://127.0.0.1:#{t.port}/path",
256     }
257     nr = @client.store_file 'new_key', 'test', to_store.path
258     assert_equal 4, nr
259     received.sysseek(0)
261     a = received.sysread(999999).split(/\r\n/)
262     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
263     assert_equal("data", a[-1])
264     assert_equal("", a[-2])
265     assert a.grep(%r{\AContent-Length: 4\z})[0]
266   end
268   def test_store_content_http
269     received = Tempfile.new('received')
271     t = tmpsrv(Proc.new do |serv, accept|
272       client, _ = serv.accept
273       client.sync = true
274       seen = ""
275       while seen !~ /\r\n\r\ndata/
276         buf = client.readpartial(4096)
277         seen << buf
278         received.syswrite(buf)
279       end
280       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
281       client.close
282     end)
284     @backend.create_open = {
285       'devid' => '1',
286       'path' => "http://127.0.0.1:#{t.port}/path",
287     }
289     nr = @client.store_content 'new_key', 'test', 'data'
290     assert nr
291     assert_equal 4, nr
293     received.sysseek(0)
294     a = received.sysread(999999).split(/\r\n/)
295     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
296     assert_equal("data", a[-1])
297     assert_equal("", a[-2])
298     assert a.grep(%r{\AContent-Length: 4\z})[0]
299   end
302   def test_store_content_with_writer_callback
303     received = Tempfile.new('received')
304     expected = "PUT /path HTTP/1.0\r\nContent-Length: 40\r\n\r\n"
305     10.times do
306       expected += "data"
307     end
308     t = tmpsrv(Proc.new do |serv, accept|
309       client, _ = serv.accept
310       client.sync = true
311       nr = 0
312       seen = ''
313       loop do
314         buf = client.readpartial(8192) or break
315         break if buf.length == 0
316         assert_equal buf.length, received.syswrite(buf)
317         nr += buf.length
318         seen << buf
319         break if seen =~ /\r\n\r\n(?:data){10}/
320       end
321       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
322       client.close
323     end)
325     @backend.create_open = {
326       'devid' => '1',
327       'path' => "http://127.0.0.1:#{t.port}/path",
328     }
330     cbk = MogileFS::Util::StoreContent.new(40) do |write_callback|
331       10.times do
332         write_callback.call("data")
333       end
334     end
335     assert_equal 40, cbk.length
336     nr = @client.store_content('new_key', 'test', cbk)
337     assert_equal 40, nr
339     received.sysseek(0)
340     a = received.sysread(999999).split(/\r\n/)
341     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
342     assert_equal("data" * 10, a[-1])
343     assert_equal("", a[-2])
344     assert a.grep(%r{\AContent-Length: 40\z})[0]
345   end
347   def test_store_content_multi_dest_failover_path
348     test_store_content_multi_dest_failover(true)
349   end
351   def test_store_content_multi_dest_failover(big_io = false)
352     received1 = Tempfile.new('received')
353     received2 = Tempfile.new('received')
355     t1 = tmpsrv(Proc.new do |serv, accept|
356       client, _ = serv.accept
357       seen = ""
358       while seen !~ /\r\n\r\ndata/
359         buf = client.readpartial(4096)
360         seen << buf
361         received1.syswrite(buf)
362       end
363       client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
364       client.close
365     end)
367     t2 = tmpsrv(Proc.new do |serv, accept|
368       client, _ = serv.accept
369       seen = ""
370       while seen !~ /\r\n\r\ndata/
371         buf = client.readpartial(4096)
372         seen << buf
373         received2.syswrite(buf)
374       end
375       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
376       client.close
377     end)
379     @backend.create_open = {
380       'dev_count' => '2',
381       'devid_1' => '1',
382       'path_1' => "http://127.0.0.1:#{t1.port}/path",
383       'devid_2' => '2',
384       'path_2' => "http://127.0.0.1:#{t2.port}/path",
385     }
387     if big_io
388       tmp = Tempfile.new('data')
389       tmp.sync = true
390       tmp.write 'data'
391       nr = @client.store_file('new_key', 'test', tmp.path)
392       tmp.close!
393     else
394       nr = @client.store_content 'new_key', 'test', 'data'
395     end
396     assert_equal 4, nr
397     received1.sysseek(0)
398     received2.sysseek(0)
399     a = received1.sysread(4096).split(/\r\n/)
400     b = received2.sysread(4096).split(/\r\n/)
401     assert_equal a[0], b[0]
402     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
403     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, b[0])
404     assert_equal("data", a[-1])
405     assert_equal("data", b[-1])
406     assert_equal("", a[-2])
407     assert_equal("", b[-2])
408     assert a.grep(%r{\AContent-Length: 4\z})[0]
409     assert b.grep(%r{\AContent-Length: 4\z})[0]
410   end
412   def test_store_content_http_fail
413     t = tmpsrv(Proc.new do |serv, accept|
414       client, _ = serv.accept
415       client.sync = true
416       read_headers(client)
417       client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
418       client.close
419     end)
421     @backend.create_open = {
422       'devid' => '1',
423       'path' => "http://127.0.0.1:#{t.port}/path",
424     }
426     assert_raises MogileFS::HTTPFile::NoStorageNodesError do
427       @client.store_content 'new_key', 'test', 'data'
428     end
429   end
431   def test_store_content_http_empty
432     received = Tempfile.new('received')
433     t = tmpsrv(Proc.new do |serv, accept|
434       client, _ = serv.accept
435       client.sync = true
436       received.syswrite(client.recv(4096, 0))
437       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
438       client.close
439     end)
441     @backend.create_open = {
442       'devid' => '1',
443       'path' => "http://127.0.0.1:#{t.port}/path",
444     }
446     nr = @client.store_content 'new_key', 'test', ''
447     assert_equal 0, nr
448     received.sysseek(0)
449     a = received.sysread(4096).split(/\r\n/)
450     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
451     assert a.grep(%r{\AContent-Length: 0\z})[0]
452   end
454   def test_store_content_nfs
455     @backend.create_open = {
456       'dev_count' => '1',
457       'devid_1' => '1',
458       'path_1' => '/path',
459     }
460     assert_raises MogileFS::UnsupportedPathError do
461       @client.store_content 'new_key', 'test', 'data'
462     end
463   end
465   def test_new_file_http_large
466     expect = Tempfile.new('test_mogilefs.expect')
467     to_put = Tempfile.new('test_mogilefs.to_put')
468     received = Tempfile.new('test_mogilefs.received')
470     nr = nr_chunks
471     chunk_size = 1024 * 1024
472     expect_size = nr * chunk_size
474     header = "PUT /path HTTP/1.0\r\n" \
475              "Content-Length: #{expect_size}\r\n\r\n"
476     assert_equal header.size, expect.syswrite(header)
477     nr.times do
478       assert_equal chunk_size, expect.syswrite(' ' * chunk_size)
479       assert_equal chunk_size, to_put.syswrite(' ' * chunk_size)
480     end
481     assert_equal expect_size + header.size, expect.stat.size
482     assert_equal expect_size, to_put.stat.size
484     readed = Tempfile.new('readed')
485     t = tmpsrv(Proc.new do |serv, accept|
486       client, _ = serv.accept
487       client.sync = true
488       nr = 0
489       loop do
490         buf = client.readpartial(8192) or break
491         break if buf.length == 0
492         assert_equal buf.length, received.syswrite(buf)
493         nr += buf.length
494         break if nr >= expect.stat.size
495       end
496       readed.syswrite("#{nr}")
497       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
498       client.close
499     end)
501     @backend.create_open = {
502       'devid' => '1',
503       'path' => "http://127.0.0.1:#{t.port}/path",
504     }
506     orig_size = to_put.size
507     nr = @client.store_file('new_key', 'test', to_put.path)
508     assert nr, nr.inspect
509     assert_equal orig_size, nr
510     assert_equal orig_size, to_put.size
511     readed.sysseek(0)
512     assert_equal expect.stat.size, readed.sysread(4096).to_i
514     ENV['PATH'].split(/:/).each do |path|
515       cmp_bin = "#{path}/cmp"
516       File.executable?(cmp_bin) or next
517       # puts "running #{cmp_bin} #{expect.path} #{received.path}"
518       assert( system(cmp_bin, expect.path, received.path) )
519       break
520     end
521   end
523   def test_store_content_readonly
524     @client.readonly = true
526     assert_raises MogileFS::ReadOnlyError do
527       @client.store_content 'new_key', 'test', nil
528     end
529   end
531   def test_store_file_readonly
532     @client.readonly = true
533     assert_raises MogileFS::ReadOnlyError do
534       @client.store_file 'new_key', 'test', nil
535     end
536   end
538   def test_rename_existing
539     @backend.rename = {}
541     assert_nil @client.rename('from_key', 'to_key')
542   end
544   def test_rename_nonexisting
545     @backend.rename = 'unknown_key', ''
547     assert_raises MogileFS::Backend::UnknownKeyError do
548       @client.rename('from_key', 'to_key')
549     end
550   end
552   def test_rename_no_key
553     @backend.rename = 'no_key', 'no_key'
555     e = assert_raises MogileFS::Backend::NoKeyError do
556       @client.rename 'new_key', 'test'
557     end
559     assert_equal 'no_key', e.message
560   end
562   def test_rename_readonly
563     @client.readonly = true
565     e = assert_raises MogileFS::ReadOnlyError do
566       @client.rename 'new_key', 'test'
567     end
569     assert_equal 'readonly mogilefs', e.message
570   end
572   def assert_get_paths_args(expect, *args)
573     sock = TCPServer.new("127.0.0.1", 0)
574     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
575     c = MogileFS::MogileFS.new(nargs)
576     received = []
577     th = Thread.new do
578       a = sock.accept
579       line = a.gets
580       received << line
581       a.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
582       a.close
583     end
584     paths_expect = %w(http://0/a http://0/b)
585     assert_equal paths_expect, c.get_paths("f", *args)
586     th.join
587     assert_equal 1, received.size
588     tmp = c.backend.url_decode(received[0].split(/\s+/)[1])
589     assert_equal "f", tmp.delete("key")
590     assert_equal "foo", tmp.delete("domain")
591     assert_equal expect, tmp
592     c.backend.shutdown
593   ensure
594     sock.close
595   end
597   def test_get_paths_args
598     assert_get_paths_args({"noverify"=>"1", "zone"=>""})
599     assert_get_paths_args({"noverify"=>"0", "zone"=>""}, false)
600     assert_get_paths_args({"noverify"=>"0", "zone"=>""}, :noverify=>false)
601     assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"}, true, "alt")
602     assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"},
603                           {:noverify => true, :zone => "alt"})
604     assert_get_paths_args({"noverify"=>"1", "zone"=>"alt","pathcount"=>"666"},
605                           {:noverify => true, :zone => "alt", :pathcount=>666})
606   end
608   def test_idempotent_command_eof
609     ip = "127.0.0.1"
610     a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
611     hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
612     args = { :hosts => hosts, :domain => "foo" }
613     c = MogileFS::MogileFS.new(args)
614     received = []
615     th = Thread.new do
616       r = IO.select([a, b])
617       x = r[0][0].accept
618       received << x.gets
619       x.close
621       r = IO.select([a, b])
622       x = r[0][0].accept
623       received << x.gets
624       x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
625       x.close
626     end
627     expect = %w(http://0/a http://0/b)
628     assert_equal expect, c.get_paths("f")
629     th.join
630     assert_equal 2, received.size
631     assert_equal received[0], received[1]
632   end
634   def test_idempotent_command_slow
635     ip = "127.0.0.1"
636     a = TCPServer.new(ip, 0)
637     hosts = [ "#{ip}:#{a.addr[1]}" ]
638     q = Queue.new
639     timeout = 1
640     args = { :hosts => hosts, :domain => "foo", :timeout => timeout }
641     c = MogileFS::MogileFS.new(args)
642     secs = timeout + 1
643     th = Thread.new do
644       close_later = []
645       x = a.accept
646       close_later << x
647       line = x.gets
648       %r{key=(\w+)} =~ line
650       sleep(secs) # cause the client to timeout:
652       begin
653         x.write("OK paths=1&path1=http://0/#{$1}\r\n")
654       rescue Errno::EPIPE
655         # EPIPE may or may not get raised due to timing issue,
656         # we don't care either way
657       rescue => e
658         flunk("#{e.message} (#{e.class})")
659       end
660       q << :continue_test
662       # client should start a new connection here
663       y = a.accept
664       close_later << y
665       line = y.gets
666       %r{key=(\w+)} =~ line
667       begin
668         y.write("OK paths=1&path1=http://0/#{$1}\r\n")
669       rescue => e
670         flunk("#{e.message} (#{e.class})")
671       end
673       # the client should've killed the old connection:
674       assert_raises(Errno::EPIPE) do
675         loop { x.write("OK paths=1&path1=http://0/#{$1}\r\n") }
676       end
678       close_later # main thread closes
679     end
680     assert_raises(MogileFS::UnreadableSocketError) do
681       c.get_paths("a")
682     end
683     assert_equal :continue_test, q.pop, "avoid race during test"
684     expect2 = %w(http://0/b)
685     assert_equal expect2, c.get_paths("b")
686     a.close
687     close_later = th.value
688     close_later.each { |io| assert_nil io.close }
689   end
691   def test_idempotent_command_response_truncated
692     ip = "127.0.0.1"
693     a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
694     hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
695     args = { :hosts => hosts, :domain => "foo" }
696     c = MogileFS::MogileFS.new(args)
697     received = []
698     th = Thread.new do
699       r = IO.select([a, b])
700       x = r[0][0].accept
701       received << x.gets
702       x.write("OK paths=2&path1=http://0/a&path2=http://0/")
703       x.close
705       r = IO.select([a, b])
706       x = r[0][0].accept
707       received << x.gets
708       x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
709       x.close
710     end
711     expect = %w(http://0/a http://0/b)
712     assert_equal expect, c.get_paths("f")
713     th.join
714     assert_equal 2, received.size
715     assert_equal received[0], received[1]
716   end
718   def test_non_idempotent_command_eof
719     ip = "127.0.0.1"
720     a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
721     hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
722     args = { :hosts => hosts, :domain => "foo" }
723     c = MogileFS::MogileFS.new(args)
724     received = []
725     th = Thread.new do
726       r = IO.select([a, b])
727       x = r[0][0].accept
728       received << x.gets
729       x.close
730     end
731     assert_raises(EOFError) { c.rename("a", "b") }
732     th.join
733     assert_equal 1, received.size
734   end
736   def test_list_keys_verbose_ordering # implementation detail
737     received = []
738     sock = TCPServer.new("127.0.0.1", 0)
739     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
740     c = MogileFS::MogileFS.new(nargs)
741     th = Thread.new do
742       a = sock.accept
743       %w(a b c d e).each do |key|
744         line = a.gets
745         cmd, args = line.split(/\s+/, 2)
746         args = c.backend.url_decode(args.strip)
747         assert_equal "file_info", cmd
748         assert_equal key, args["key"]
749       end
750       out = { "length" => 3, "devcount" => 6 }
751       %w(a b c d e).shuffle.each do |key|
752         out["key"] = key
753         a.write "OK #{c.backend.url_encode(out)}\r\n"
754       end
755       a.close
756     end
758     blk = lambda do |key, length, devcount|
759       received << [ key, length, devcount ]
760     end
761     c.list_keys_verbose(%w(a b c d e), blk)
762     th.join
763     received.map! { |(key,_,_)| key }
764     assert_equal %w(a b c d e), received
765   ensure
766     sock.close
767   end
769   def test_list_keys_verbose_retry_eof # implementation detail
770     received = []
771     sock = TCPServer.new("127.0.0.1", 0)
772     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
773     c = MogileFS::MogileFS.new(nargs)
774     th = Thread.new do
775       a = sock.accept
776       %w(a b c d e).each do |key|
777         line = a.gets
778         cmd, args = line.split(/\s+/, 2)
779         args = c.backend.url_decode(args.strip)
780         assert_equal "file_info", cmd
781         assert_equal key, args["key"]
782       end
783       out = { "length" => 3, "devcount" => 6 }
784       %w(d e).each do |key|
785         out["key"] = key
786         a.write "OK #{c.backend.url_encode(out)}\r\n"
787       end
788       a.close # trigger EOF
789       a = sock.accept # client will retry
790       %w(a b c).each do |key|
791         line = a.gets
792         cmd, args = line.split(/\s+/, 2)
793         args = c.backend.url_decode(args.strip)
794         assert_equal "file_info", cmd
795         assert_equal key, args["key"]
796         out["key"] = key
797         a.write "OK #{c.backend.url_encode(out)}\r\n"
798       end
799       a.close
800     end
802     blk = lambda do |key, length, devcount|
803       received << [ key, length, devcount ]
804     end
805     c.list_keys_verbose(%w(a b c d e), blk)
806     th.join
807     received.map! { |(key,_,_)| key }
808     assert_equal %w(a b c d e), received
809   ensure
810     sock.close
811   end
813   def test_list_keys_verbose_retry_truncated # implementation detail
814     received = []
815     sock = TCPServer.new("127.0.0.1", 0)
816     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
817     c = MogileFS::MogileFS.new(nargs)
818     th = Thread.new do
819       a = sock.accept
820       %w(a b c d e).each do |key|
821         line = a.gets
822         cmd, args = line.split(/\s+/, 2)
823         args = c.backend.url_decode(args.strip)
824         assert_equal "file_info", cmd
825         assert_equal key, args["key"]
826       end
827       out = { "length" => 3, "devcount" => 6 }
828       out["key"] = "a"
829       a.write "OK #{c.backend.url_encode(out)}\r\n"
830       out["key"] = "b"
831       a.write "OK #{c.backend.url_encode(out)}"
832       a.close # trigger EOF
834       a = sock.accept # client will retry
835       %w(b c d e).each do |key|
836         line = a.gets
837         cmd, args = line.split(/\s+/, 2)
838         args = c.backend.url_decode(args.strip)
839         assert_equal "file_info", cmd
840         assert_equal key, args["key"]
841         out["key"] = key
842         a.write "OK #{c.backend.url_encode(out)}\r\n"
843       end
844       a.close
845     end
847     blk = lambda do |key, length, devcount|
848       received << [ key, length, devcount ]
849     end
850     c.list_keys_verbose(%w(a b c d e), blk)
851     th.join
852     received.map! { |(key,_,_)| key }
853     assert_equal %w(a b c d e), received
854   ensure
855     sock.close
856   end
858   def test_sleep
859     @backend.sleep = {}
860     assert_equal({}, @client.sleep(2))
861   end
863   def teardown
864     @tmpsrv.each { |t| t.destroy! }
865   end
867   private
869     # tested with 1000, though it takes a while
870     def nr_chunks
871       ENV['NR_CHUNKS'] ? ENV['NR_CHUNKS'].to_i : 10
872     end