optionally use net-http-persistent for StringIO
[ruby-mogilefs-client.git] / test / test_mogilefs.rb
blob6000781355d9eacfd4c54266b02cbc3f0088a7be
1 # -*- encoding: binary -*-
2 require './test/setup'
3 require 'stringio'
4 require 'tempfile'
5 require 'fileutils'
7 class TestMogileFS__MogileFS < TestMogileFS
8   def setup
9     @klass = MogileFS::MogileFS
10     super
11   end
13   def test_initialize
14     assert_equal 'test', @client.domain
16     assert_raises ArgumentError do
17       MogileFS::MogileFS.new :hosts => ['kaa:6001']
18     end
19   end
21   def test_get_file_data_http
22     tmp = Tempfile.new('accept')
23     accept = File.open(tmp.path, "ab")
24     svr = Proc.new do |serv, port|
25       client, _ = serv.accept
26       client.sync = true
27       readed = client.recv(4096, 0)
28       assert(readed =~ \
29             %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
30       accept.syswrite('.')
31       client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
32       client.close
33     end
34     t1 = TempServer.new(svr)
35     t2 = TempServer.new(svr)
36     path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
37     path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
39     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
41     assert_equal 'data!', @client.get_file_data('key')
42     assert_equal 1, accept.stat.size
43     ensure
44       TempServer.destroy_all!
45   end
47   def test_get_file_data_http_not_found_failover
48     tmp = Tempfile.new('accept')
49     accept = File.open(tmp.path, 'ab')
50     svr1 = Proc.new do |serv, port|
51       client, _ = serv.accept
52       client.sync = true
53       readed = client.recv(4096, 0)
54       assert(readed =~ \
55             %r{\AGET /dev1/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
56       accept.syswrite('.')
57       client.send("HTTP/1.0 404 Not Found\r\n\r\ndata!", 0)
58       client.close
59     end
61     svr2 = Proc.new do |serv, port|
62       client, _ = serv.accept
63       client.sync = true
64       readed = client.recv(4096, 0)
65       assert(readed =~ \
66             %r{\AGET /dev2/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
67       accept.syswrite('.')
68       client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
69       client.close
70     end
72     t1 = TempServer.new(svr1)
73     t2 = TempServer.new(svr2)
74     path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
75     path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
76     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
78     assert_equal 'data!', @client.get_file_data('key')
79     assert_equal 2, accept.stat.size
80     ensure
81       TempServer.destroy_all!
82   end
84   def test_get_file_data_http_block
85     tmpfp = Tempfile.new('test_mogilefs.open_data')
86     nr = nr_chunks
87     chunk_size = 1024 * 1024
88     expect_size = nr * chunk_size
89     header = "HTTP/1.0 200 OK\r\n" \
90              "Content-Length: #{expect_size}\r\n\r\n"
91     assert_equal header.size, tmpfp.syswrite(header)
92     nr.times { assert_equal chunk_size, tmpfp.syswrite(' ' * chunk_size) }
93     assert_equal expect_size + header.size, File.size(tmpfp.path)
94     tmpfp.sysseek(0)
96     accept = Tempfile.new('accept')
97     svr = Proc.new do |serv, port|
98       client, _ = serv.accept
99       client.sync = true
100       accept.syswrite('.')
101       readed = client.recv(4096, 0)
102       assert(readed =~ \
103             %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
104       MogileFS.io.copy_stream(tmpfp, client)
105       client.close
106       exit 0
107     end
108     t1 = TempServer.new(svr)
109     t2 = TempServer.new(svr)
110     path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
111     path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
113     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
115     data = Tempfile.new('test_mogilefs.dest_data')
116     read_nr = nr = 0
117     @client.get_file_data('key') do |fp|
118       buf = ''
119       loop do
120         begin
121           fp.sysread(16384, buf)
122           read_nr = buf.size
123           nr += read_nr
124           assert_equal read_nr, data.syswrite(buf), "partial write"
125         rescue Errno::EAGAIN
126           retry
127         rescue EOFError
128           break
129         end
130       end
131     end
132     assert_equal expect_size, nr, "size mismatch"
133     assert_equal 1, accept.stat.size
134   end
136   def test_get_paths
137     path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
138     path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
140     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
142     expected = [ path1, path2 ]
144     assert_equal expected, @client.get_paths('key').sort
145   end
147   def test_get_uris
148     path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
149     path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
151     @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
153     expected = [ URI.parse(path1), URI.parse(path2) ]
155     assert_equal expected, @client.get_uris('key')
156   end
159   def test_get_paths_unknown_key
160     @backend.get_paths = ['unknown_key', '']
162     assert_raises MogileFS::Backend::UnknownKeyError do
163       assert_equal nil, @client.get_paths('key')
164     end
165   end
167   def test_delete_existing
168     @backend.delete = { }
169     assert_nothing_raised do
170       @client.delete 'no_such_key'
171     end
172   end
174   def test_delete_nonexisting
175     @backend.delete = 'unknown_key', ''
176     assert_raises MogileFS::Backend::UnknownKeyError do
177       @client.delete('no_such_key')
178     end
179   end
181   def test_delete_readonly
182     @client.readonly = true
183     assert_raises MogileFS::ReadOnlyError do
184       @client.delete 'no_such_key'
185     end
186   end
188   def test_each_key
189     @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_2',
190                            'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
191     @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_4',
192                            'key_1' => 'new_key_3', 'key_2' => 'new_key_4' }
193     @backend.list_keys = { 'key_count' => 0, 'next_after' => 'new_key_4' }
194     keys = []
195     @client.each_key 'new' do |key|
196       keys << key
197     end
199     assert_equal %w[new_key_1 new_key_2 new_key_3 new_key_4], keys
200   end
202   def test_list_keys
203     @backend.list_keys = { 'key_count' => '2', 'next_after' => 'new_key_2',
204                            'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
206     keys, next_after = @client.list_keys 'new'
207     assert_equal ['new_key_1', 'new_key_2'], keys.sort
208     assert_equal 'new_key_2', next_after
209   end
211   def test_new_file_http
212     @client.readonly = true
213     assert_raises MogileFS::ReadOnlyError do
214       @client.new_file 'new_key', 'test'
215     end
216   end
218   def test_new_file_readonly
219     @client.readonly = true
220     assert_raises MogileFS::ReadOnlyError do
221       @client.new_file 'new_key', 'test'
222     end
223   end
225   def test_store_file_small_http
226     received = Tempfile.new('received')
227     to_store = Tempfile.new('small')
228     to_store.syswrite('data')
230     t = TempServer.new(Proc.new do |serv, accept|
231       client, _ = serv.accept
232       while buf = client.readpartial(666)
233         received.syswrite(buf)
234         break if buf =~ /data/
235       end
236       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
237       client.close
238     end)
240     @backend.create_open = {
241       'devid' => '1',
242       'path' => "http://127.0.0.1:#{t.port}/path",
243     }
244     nr = @client.store_file 'new_key', 'test', to_store.path
245     assert_equal 4, nr
246     received.sysseek(0)
248     a = received.sysread(999999).split(/\r\n/)
249     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
250     assert_equal("data", a[-1])
251     assert_equal("", a[-2])
252     assert a.grep(%r{\AContent-Length: 4\z})[0]
253     ensure
254       TempServer.destroy_all!
255   end
257   def test_store_content_http
258     received = Tempfile.new('received')
260     t = TempServer.new(Proc.new do |serv, accept|
261       client, _ = serv.accept
262       client.sync = true
263       seen = ""
264       while seen !~ /\r\n\r\ndata/
265         buf = client.readpartial(4096)
266         seen << buf
267         received.syswrite(buf)
268       end
269       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
270       client.close
271     end)
273     @backend.create_open = {
274       'devid' => '1',
275       'path' => "http://127.0.0.1:#{t.port}/path",
276     }
278     nr = @client.store_content 'new_key', 'test', 'data'
279     assert nr
280     assert_equal 4, nr
282     received.sysseek(0)
283     a = received.sysread(999999).split(/\r\n/)
284     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
285     assert_equal("data", a[-1])
286     assert_equal("", a[-2])
287     assert a.grep(%r{\AContent-Length: 4\z})[0]
288     ensure
289       TempServer.destroy_all!
290   end
293   def test_store_content_with_writer_callback
294     received = Tempfile.new('received')
295     expected = "PUT /path HTTP/1.0\r\nContent-Length: 40\r\n\r\n"
296     10.times do
297       expected += "data"
298     end
299     t = TempServer.new(Proc.new do |serv, accept|
300       client, _ = serv.accept
301       client.sync = true
302       nr = 0
303       seen = ''
304       loop do
305         buf = client.readpartial(8192) or break
306         break if buf.length == 0
307         assert_equal buf.length, received.syswrite(buf)
308         nr += buf.length
309         seen << buf
310         break if seen =~ /\r\n\r\n(?:data){10}/
311       end
312       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
313       client.close
314     end)
316     @backend.create_open = {
317       'devid' => '1',
318       'path' => "http://127.0.0.1:#{t.port}/path",
319     }
321     cbk = MogileFS::Util::StoreContent.new(40) do |write_callback|
322       10.times do
323         write_callback.call("data")
324       end
325     end
326     assert_equal 40, cbk.length
327     nr = @client.store_content('new_key', 'test', cbk)
328     assert_equal 40, nr
330     received.sysseek(0)
331     a = received.sysread(999999).split(/\r\n/)
332     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
333     assert_equal("data" * 10, a[-1])
334     assert_equal("", a[-2])
335     assert a.grep(%r{\AContent-Length: 40\z})[0]
336     ensure
337       TempServer.destroy_all!
338   end
340   def test_store_content_multi_dest_failover_path
341     test_store_content_multi_dest_failover(true)
342   end
344   def test_store_content_multi_dest_failover(big_io = false)
345     received1 = Tempfile.new('received')
346     received2 = Tempfile.new('received')
348     t1 = TempServer.new(Proc.new do |serv, accept|
349       client, _ = serv.accept
350       seen = ""
351       while seen !~ /\r\n\r\ndata/
352         buf = client.readpartial(4096)
353         seen << buf
354         received1.syswrite(buf)
355       end
356       client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
357       client.close
358     end)
360     t2 = TempServer.new(Proc.new do |serv, accept|
361       client, _ = serv.accept
362       seen = ""
363       while seen !~ /\r\n\r\ndata/
364         buf = client.readpartial(4096)
365         seen << buf
366         received2.syswrite(buf)
367       end
368       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
369       client.close
370     end)
372     @backend.create_open = {
373       'dev_count' => '2',
374       'devid_1' => '1',
375       'path_1' => "http://127.0.0.1:#{t1.port}/path",
376       'devid_2' => '2',
377       'path_2' => "http://127.0.0.1:#{t2.port}/path",
378     }
380     if big_io
381       tmp = Tempfile.new('data')
382       tmp.sync = true
383       tmp.write 'data'
384       nr = @client.store_file('new_key', 'test', tmp.path)
385       tmp.close!
386     else
387       nr = @client.store_content 'new_key', 'test', 'data'
388     end
389     assert_equal 4, nr
390     received1.sysseek(0)
391     received2.sysseek(0)
392     a = received1.sysread(4096).split(/\r\n/)
393     b = received2.sysread(4096).split(/\r\n/)
394     assert_equal a[0], b[0]
395     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
396     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, b[0])
397     assert_equal("data", a[-1])
398     assert_equal("data", b[-1])
399     assert_equal("", a[-2])
400     assert_equal("", b[-2])
401     assert a.grep(%r{\AContent-Length: 4\z})[0]
402     assert b.grep(%r{\AContent-Length: 4\z})[0]
403     ensure
404       TempServer.destroy_all!
405   end
407   def test_store_content_http_fail
408     t = TempServer.new(Proc.new do |serv, accept|
409       client, _ = serv.accept
410       client.sync = true
411       client.recv(4096, 0)
412       client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
413       client.close
414     end)
416     @backend.create_open = {
417       'devid' => '1',
418       'path' => "http://127.0.0.1:#{t.port}/path",
419     }
421     assert_raises MogileFS::HTTPFile::NoStorageNodesError do
422       @client.store_content 'new_key', 'test', 'data'
423     end
424   end
426   def test_store_content_http_empty
427     received = Tempfile.new('received')
428     t = TempServer.new(Proc.new do |serv, accept|
429       client, _ = serv.accept
430       client.sync = true
431       received.syswrite(client.recv(4096, 0))
432       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
433       client.close
434     end)
436     @backend.create_open = {
437       'devid' => '1',
438       'path' => "http://127.0.0.1:#{t.port}/path",
439     }
441     nr = @client.store_content 'new_key', 'test', ''
442     assert_equal 0, nr
443     received.sysseek(0)
444     a = received.sysread(4096).split(/\r\n/)
445     assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
446     assert a.grep(%r{\AContent-Length: 0\z})[0]
447   end
449   def test_store_content_nfs
450     @backend.create_open = {
451       'dev_count' => '1',
452       'devid_1' => '1',
453       'path_1' => '/path',
454     }
455     assert_raises MogileFS::UnsupportedPathError do
456       @client.store_content 'new_key', 'test', 'data'
457     end
458   end
460   def test_new_file_http_large
461     expect = Tempfile.new('test_mogilefs.expect')
462     to_put = Tempfile.new('test_mogilefs.to_put')
463     received = Tempfile.new('test_mogilefs.received')
465     nr = nr_chunks
466     chunk_size = 1024 * 1024
467     expect_size = nr * chunk_size
469     header = "PUT /path HTTP/1.0\r\n" \
470              "Content-Length: #{expect_size}\r\n\r\n"
471     assert_equal header.size, expect.syswrite(header)
472     nr.times do
473       assert_equal chunk_size, expect.syswrite(' ' * chunk_size)
474       assert_equal chunk_size, to_put.syswrite(' ' * chunk_size)
475     end
476     assert_equal expect_size + header.size, expect.stat.size
477     assert_equal expect_size, to_put.stat.size
479     readed = Tempfile.new('readed')
480     t = TempServer.new(Proc.new do |serv, accept|
481       client, _ = serv.accept
482       client.sync = true
483       nr = 0
484       loop do
485         buf = client.readpartial(8192) or break
486         break if buf.length == 0
487         assert_equal buf.length, received.syswrite(buf)
488         nr += buf.length
489         break if nr >= expect.stat.size
490       end
491       readed.syswrite("#{nr}")
492       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
493       client.close
494     end)
496     @backend.create_open = {
497       'devid' => '1',
498       'path' => "http://127.0.0.1:#{t.port}/path",
499     }
501     orig_size = to_put.size
502     nr = @client.store_file('new_key', 'test', to_put.path)
503     assert nr, nr.inspect
504     assert_equal orig_size, nr
505     assert_equal orig_size, to_put.size
506     readed.sysseek(0)
507     assert_equal expect.stat.size, readed.sysread(4096).to_i
509     ENV['PATH'].split(/:/).each do |path|
510       cmp_bin = "#{path}/cmp"
511       File.executable?(cmp_bin) or next
512       # puts "running #{cmp_bin} #{expect.path} #{received.path}"
513       assert( system(cmp_bin, expect.path, received.path) )
514       break
515     end
517     ensure
518       TempServer.destroy_all!
519   end
521   def test_store_content_readonly
522     @client.readonly = true
524     assert_raises MogileFS::ReadOnlyError do
525       @client.store_content 'new_key', 'test', nil
526     end
527   end
529   def test_store_file_readonly
530     @client.readonly = true
531     assert_raises MogileFS::ReadOnlyError do
532       @client.store_file 'new_key', 'test', nil
533     end
534   end
536   def test_rename_existing
537     @backend.rename = {}
539     assert_nil @client.rename('from_key', 'to_key')
540   end
542   def test_rename_nonexisting
543     @backend.rename = 'unknown_key', ''
545     assert_raises MogileFS::Backend::UnknownKeyError do
546       @client.rename('from_key', 'to_key')
547     end
548   end
550   def test_rename_no_key
551     @backend.rename = 'no_key', 'no_key'
553     e = assert_raises MogileFS::Backend::NoKeyError do
554       @client.rename 'new_key', 'test'
555     end
557     assert_equal 'no_key', e.message
558   end
560   def test_rename_readonly
561     @client.readonly = true
563     e = assert_raises MogileFS::ReadOnlyError do
564       @client.rename 'new_key', 'test'
565     end
567     assert_equal 'readonly mogilefs', e.message
568   end
570   def assert_get_paths_args(expect, *args)
571     sock = TCPServer.new("127.0.0.1", 0)
572     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
573     c = MogileFS::MogileFS.new(nargs)
574     received = []
575     th = Thread.new do
576       a = sock.accept
577       line = a.gets
578       received << line
579       a.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
580       a.close
581     end
582     paths_expect = %w(http://0/a http://0/b)
583     assert_equal paths_expect, c.get_paths("f", *args)
584     th.join
585     assert_equal 1, received.size
586     tmp = c.backend.url_decode(received[0].split(/\s+/)[1])
587     assert_equal "f", tmp.delete("key")
588     assert_equal "foo", tmp.delete("domain")
589     assert_equal expect, tmp
590     c.backend.shutdown
591     ensure
592       sock.close
593   end
595   def test_get_paths_args
596     assert_get_paths_args({"noverify"=>"1", "zone"=>""})
597     assert_get_paths_args({"noverify"=>"0", "zone"=>""}, false)
598     assert_get_paths_args({"noverify"=>"0", "zone"=>""}, :noverify=>false)
599     assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"}, true, "alt")
600     assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"},
601                           {:noverify => true, :zone => "alt"})
602     assert_get_paths_args({"noverify"=>"1", "zone"=>"alt","pathcount"=>"666"},
603                           {:noverify => true, :zone => "alt", :pathcount=>666})
604   end
606   def test_idempotent_command_eof
607     ip = "127.0.0.1"
608     a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
609     hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
610     args = { :hosts => hosts, :domain => "foo" }
611     c = MogileFS::MogileFS.new(args)
612     received = []
613     th = Thread.new do
614       r = IO.select([a, b])
615       x = r[0][0].accept
616       received << x.gets
617       x.close
619       r = IO.select([a, b])
620       x = r[0][0].accept
621       received << x.gets
622       x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
623       x.close
624     end
625     expect = %w(http://0/a http://0/b)
626     assert_equal expect, c.get_paths("f")
627     th.join
628     assert_equal 2, received.size
629     assert_equal received[0], received[1]
630   end
632   def test_idempotent_command_slow
633     ip = "127.0.0.1"
634     a = TCPServer.new(ip, 0)
635     hosts = [ "#{ip}:#{a.addr[1]}" ]
636     q = Queue.new
637     timeout = 1
638     args = { :hosts => hosts, :domain => "foo", :timeout => timeout }
639     c = MogileFS::MogileFS.new(args)
640     secs = timeout + 1
641     th = Thread.new do
642       close_later = []
643       x = a.accept
644       close_later << x
645       line = x.gets
646       %r{key=(\w+)} =~ line
648       sleep(secs) # cause the client to timeout:
650       begin
651         x.write("OK paths=1&path1=http://0/#{$1}\r\n")
652       rescue Errno::EPIPE
653         # EPIPE may or may not get raised due to timing issue,
654         # we don't care either way
655       rescue => e
656         flunk("#{e.message} (#{e.class})")
657       end
658       q << :continue_test
660       # client should start a new connection here
661       y = a.accept
662       close_later << y
663       line = y.gets
664       %r{key=(\w+)} =~ line
665       begin
666         y.write("OK paths=1&path1=http://0/#{$1}\r\n")
667       rescue => e
668         flunk("#{e.message} (#{e.class})")
669       end
671       # the client should've killed the old connection:
672       assert_raises(Errno::EPIPE) do
673         loop { x.write("OK paths=1&path1=http://0/#{$1}\r\n") }
674       end
676       close_later # main thread closes
677     end
678     assert_raises(MogileFS::UnreadableSocketError) do
679       c.get_paths("a")
680     end
681     assert_equal :continue_test, q.pop, "avoid race during test"
682     expect2 = %w(http://0/b)
683     assert_equal expect2, c.get_paths("b")
684     a.close
685     close_later = th.value
686     close_later.each { |io| assert_nil io.close }
687   end
689   def test_idempotent_command_response_truncated
690     ip = "127.0.0.1"
691     a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
692     hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
693     args = { :hosts => hosts, :domain => "foo" }
694     c = MogileFS::MogileFS.new(args)
695     received = []
696     th = Thread.new do
697       r = IO.select([a, b])
698       x = r[0][0].accept
699       received << x.gets
700       x.write("OK paths=2&path1=http://0/a&path2=http://0/")
701       x.close
703       r = IO.select([a, b])
704       x = r[0][0].accept
705       received << x.gets
706       x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
707       x.close
708     end
709     expect = %w(http://0/a http://0/b)
710     assert_equal expect, c.get_paths("f")
711     th.join
712     assert_equal 2, received.size
713     assert_equal received[0], received[1]
714   end
716   def test_non_idempotent_command_eof
717     ip = "127.0.0.1"
718     a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
719     hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
720     args = { :hosts => hosts, :domain => "foo" }
721     c = MogileFS::MogileFS.new(args)
722     received = []
723     th = Thread.new do
724       r = IO.select([a, b])
725       x = r[0][0].accept
726       received << x.gets
727       x.close
728     end
729     assert_raises(EOFError) { c.rename("a", "b") }
730     th.join
731     assert_equal 1, received.size
732   end
734   def test_list_keys_verbose_ordering # implementation detail
735     received = []
736     sock = TCPServer.new("127.0.0.1", 0)
737     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
738     c = MogileFS::MogileFS.new(nargs)
739     th = Thread.new do
740       a = sock.accept
741       %w(a b c d e).each do |key|
742         line = a.gets
743         cmd, args = line.split(/\s+/, 2)
744         args = c.backend.url_decode(args.strip)
745         assert_equal "file_info", cmd
746         assert_equal key, args["key"]
747       end
748       out = { "length" => 3, "devcount" => 6 }
749       %w(a b c d e).shuffle.each do |key|
750         out["key"] = key
751         a.write "OK #{c.backend.url_encode(out)}\r\n"
752       end
753       a.close
754     end
756     blk = lambda do |key, length, devcount|
757       received << [ key, length, devcount ]
758     end
759     c.list_keys_verbose(%w(a b c d e), blk)
760     th.join
761     received.map! { |(key,_,_)| key }
762     assert_equal %w(a b c d e), received
763     ensure
764       sock.close
765   end
767   def test_list_keys_verbose_retry_eof # implementation detail
768     received = []
769     sock = TCPServer.new("127.0.0.1", 0)
770     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
771     c = MogileFS::MogileFS.new(nargs)
772     th = Thread.new do
773       a = sock.accept
774       %w(a b c d e).each do |key|
775         line = a.gets
776         cmd, args = line.split(/\s+/, 2)
777         args = c.backend.url_decode(args.strip)
778         assert_equal "file_info", cmd
779         assert_equal key, args["key"]
780       end
781       out = { "length" => 3, "devcount" => 6 }
782       %w(d e).each do |key|
783         out["key"] = key
784         a.write "OK #{c.backend.url_encode(out)}\r\n"
785       end
786       a.close # trigger EOF
787       a = sock.accept # client will retry
788       %w(a b c).each do |key|
789         line = a.gets
790         cmd, args = line.split(/\s+/, 2)
791         args = c.backend.url_decode(args.strip)
792         assert_equal "file_info", cmd
793         assert_equal key, args["key"]
794         out["key"] = key
795         a.write "OK #{c.backend.url_encode(out)}\r\n"
796       end
797       a.close
798     end
800     blk = lambda do |key, length, devcount|
801       received << [ key, length, devcount ]
802     end
803     c.list_keys_verbose(%w(a b c d e), blk)
804     th.join
805     received.map! { |(key,_,_)| key }
806     assert_equal %w(a b c d e), received
807     ensure
808       sock.close
809   end
811   def test_list_keys_verbose_retry_truncated # implementation detail
812     received = []
813     sock = TCPServer.new("127.0.0.1", 0)
814     nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
815     c = MogileFS::MogileFS.new(nargs)
816     th = Thread.new do
817       a = sock.accept
818       %w(a b c d e).each do |key|
819         line = a.gets
820         cmd, args = line.split(/\s+/, 2)
821         args = c.backend.url_decode(args.strip)
822         assert_equal "file_info", cmd
823         assert_equal key, args["key"]
824       end
825       out = { "length" => 3, "devcount" => 6 }
826       out["key"] = "a"
827       a.write "OK #{c.backend.url_encode(out)}\r\n"
828       out["key"] = "b"
829       a.write "OK #{c.backend.url_encode(out)}"
830       a.close # trigger EOF
832       a = sock.accept # client will retry
833       %w(b c d e).each do |key|
834         line = a.gets
835         cmd, args = line.split(/\s+/, 2)
836         args = c.backend.url_decode(args.strip)
837         assert_equal "file_info", cmd
838         assert_equal key, args["key"]
839         out["key"] = key
840         a.write "OK #{c.backend.url_encode(out)}\r\n"
841       end
842       a.close
843     end
845     blk = lambda do |key, length, devcount|
846       received << [ key, length, devcount ]
847     end
848     c.list_keys_verbose(%w(a b c d e), blk)
849     th.join
850     received.map! { |(key,_,_)| key }
851     assert_equal %w(a b c d e), received
852     ensure
853       sock.close
854   end
856   def test_sleep
857     @backend.sleep = {}
858     assert_nothing_raised do
859       assert_equal({}, @client.sleep(2))
860     end
861   end
863   private
865     # tested with 1000, though it takes a while
866     def nr_chunks
867       ENV['NR_CHUNKS'] ? ENV['NR_CHUNKS'].to_i : 10
868     end