1 # -*- encoding: binary -*-
7 class TestMogileFS__MogileFS < TestMogileFS
9 @klass = MogileFS::MogileFS
14 assert_equal 'test', @client.domain
16 assert_raises ArgumentError do
17 MogileFS::MogileFS.new :hosts => ['kaa:6001']
21 def test_get_file_data_http
22 tmp = Tempfile.new('accept')
23 accept = File.open(tmp.path, "ab")
24 svr = Proc.new do |serv, port|
25 client, _ = serv.accept
27 readed = client.recv(4096, 0)
29 %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
31 client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
34 t1 = TempServer.new(svr)
35 t2 = TempServer.new(svr)
36 path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
37 path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
39 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
41 assert_equal 'data!', @client.get_file_data('key')
42 assert_equal 1, accept.stat.size
44 TempServer.destroy_all!
47 def test_get_file_data_http_not_found_failover
48 tmp = Tempfile.new('accept')
49 accept = File.open(tmp.path, 'ab')
50 svr1 = Proc.new do |serv, port|
51 client, _ = serv.accept
53 readed = client.recv(4096, 0)
55 %r{\AGET /dev1/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
57 client.send("HTTP/1.0 404 Not Found\r\n\r\ndata!", 0)
61 svr2 = Proc.new do |serv, port|
62 client, _ = serv.accept
64 readed = client.recv(4096, 0)
66 %r{\AGET /dev2/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
68 client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
72 t1 = TempServer.new(svr1)
73 t2 = TempServer.new(svr2)
74 path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
75 path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
76 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
78 assert_equal 'data!', @client.get_file_data('key')
79 assert_equal 2, accept.stat.size
81 TempServer.destroy_all!
84 def test_get_file_data_http_block
85 tmpfp = Tempfile.new('test_mogilefs.open_data')
87 chunk_size = 1024 * 1024
88 expect_size = nr * chunk_size
89 header = "HTTP/1.0 200 OK\r\n" \
90 "Content-Length: #{expect_size}\r\n\r\n"
91 assert_equal header.size, tmpfp.syswrite(header)
92 nr.times { assert_equal chunk_size, tmpfp.syswrite(' ' * chunk_size) }
93 assert_equal expect_size + header.size, File.size(tmpfp.path)
96 accept = Tempfile.new('accept')
97 svr = Proc.new do |serv, port|
98 client, _ = serv.accept
101 readed = client.recv(4096, 0)
103 %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
104 MogileFS.io.copy_stream(tmpfp, client)
108 t1 = TempServer.new(svr)
109 t2 = TempServer.new(svr)
110 path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
111 path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
113 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
115 data = Tempfile.new('test_mogilefs.dest_data')
117 @client.get_file_data('key') do |fp|
121 fp.sysread(16384, buf)
124 assert_equal read_nr, data.syswrite(buf), "partial write"
132 assert_equal expect_size, nr, "size mismatch"
133 assert_equal 1, accept.stat.size
137 path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
138 path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
140 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
142 expected = [ path1, path2 ]
144 assert_equal expected, @client.get_paths('key').sort
148 path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
149 path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
151 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
153 expected = [ URI.parse(path1), URI.parse(path2) ]
155 assert_equal expected, @client.get_uris('key')
159 def test_get_paths_unknown_key
160 @backend.get_paths = ['unknown_key', '']
162 assert_raises MogileFS::Backend::UnknownKeyError do
163 assert_equal nil, @client.get_paths('key')
167 def test_delete_existing
168 @backend.delete = { }
169 assert_nothing_raised do
170 @client.delete 'no_such_key'
174 def test_delete_nonexisting
175 @backend.delete = 'unknown_key', ''
176 assert_raises MogileFS::Backend::UnknownKeyError do
177 @client.delete('no_such_key')
181 def test_delete_readonly
182 @client.readonly = true
183 assert_raises MogileFS::ReadOnlyError do
184 @client.delete 'no_such_key'
189 @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_2',
190 'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
191 @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_4',
192 'key_1' => 'new_key_3', 'key_2' => 'new_key_4' }
193 @backend.list_keys = { 'key_count' => 0, 'next_after' => 'new_key_4' }
195 @client.each_key 'new' do |key|
199 assert_equal %w[new_key_1 new_key_2 new_key_3 new_key_4], keys
203 @backend.list_keys = { 'key_count' => '2', 'next_after' => 'new_key_2',
204 'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
206 keys, next_after = @client.list_keys 'new'
207 assert_equal ['new_key_1', 'new_key_2'], keys.sort
208 assert_equal 'new_key_2', next_after
211 def test_new_file_http
212 @client.readonly = true
213 assert_raises MogileFS::ReadOnlyError do
214 @client.new_file 'new_key', 'test'
218 def test_new_file_readonly
219 @client.readonly = true
220 assert_raises MogileFS::ReadOnlyError do
221 @client.new_file 'new_key', 'test'
225 def test_store_file_small_http
226 received = Tempfile.new('received')
227 to_store = Tempfile.new('small')
228 to_store.syswrite('data')
230 expected = "PUT /path HTTP/1.0\r\nContent-Length: 4\r\n\r\ndata"
231 t = TempServer.new(Proc.new do |serv, accept|
232 client, _ = serv.accept
234 received.syswrite(client.read(expected.bytesize))
235 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
239 @backend.create_open = {
241 'path' => "http://127.0.0.1:#{t.port}/path",
243 nr = @client.store_file 'new_key', 'test', to_store.path
246 assert_equal expected, received.sysread(4096)
248 TempServer.destroy_all!
251 def test_store_content_http
252 received = Tempfile.new('received')
253 expected = "PUT /path HTTP/1.0\r\nContent-Length: 4\r\n\r\ndata"
255 t = TempServer.new(Proc.new do |serv, accept|
256 client, _ = serv.accept
259 while seen !~ /\r\n\r\ndata/
260 buf = client.readpartial(4096)
262 received.syswrite(buf)
264 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
268 @backend.create_open = {
270 'path' => "http://127.0.0.1:#{t.port}/path",
273 nr = @client.store_content 'new_key', 'test', 'data'
278 assert_equal expected, received.sysread(4096)
280 TempServer.destroy_all!
284 def test_store_content_with_writer_callback
285 received = Tempfile.new('received')
286 expected = "PUT /path HTTP/1.0\r\nContent-Length: 40\r\n\r\n"
290 t = TempServer.new(Proc.new do |serv, accept|
291 client, _ = serv.accept
295 buf = client.readpartial(8192) or break
296 break if buf.length == 0
297 assert_equal buf.length, received.syswrite(buf)
299 break if nr >= expected.size
301 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
305 @backend.create_open = {
307 'path' => "http://127.0.0.1:#{t.port}/path",
310 cbk = MogileFS::Util::StoreContent.new(40) do |write_callback|
312 write_callback.call("data")
315 assert_equal 40, cbk.length
316 nr = @client.store_content('new_key', 'test', cbk)
320 assert_equal expected, received.sysread(4096)
322 TempServer.destroy_all!
325 def test_store_content_multi_dest_failover_path
326 test_store_content_multi_dest_failover(true)
329 def test_store_content_multi_dest_failover(big_io = false)
330 received1 = Tempfile.new('received')
331 received2 = Tempfile.new('received')
332 expected = "PUT /path HTTP/1.0\r\nContent-Length: 4\r\n\r\ndata"
334 t1 = TempServer.new(Proc.new do |serv, accept|
335 client, _ = serv.accept
337 while seen !~ /\r\n\r\ndata/
338 buf = client.readpartial(4096)
340 received1.syswrite(buf)
342 client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
346 t2 = TempServer.new(Proc.new do |serv, accept|
347 client, _ = serv.accept
349 while seen !~ /\r\n\r\ndata/
350 buf = client.readpartial(4096)
352 received2.syswrite(buf)
354 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
358 @backend.create_open = {
361 'path_1' => "http://127.0.0.1:#{t1.port}/path",
363 'path_2' => "http://127.0.0.1:#{t2.port}/path",
367 tmp = Tempfile.new('data')
370 nr = @client.store_file('new_key', 'test', tmp.path)
373 nr = @client.store_content 'new_key', 'test', 'data'
378 assert_equal expected, received1.sysread(4096)
379 assert_equal expected, received2.sysread(4096)
381 TempServer.destroy_all!
384 def test_store_content_http_fail
385 t = TempServer.new(Proc.new do |serv, accept|
386 client, _ = serv.accept
389 client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
393 @backend.create_open = {
395 'path' => "http://127.0.0.1:#{t.port}/path",
398 assert_raises MogileFS::HTTPFile::NoStorageNodesError do
399 @client.store_content 'new_key', 'test', 'data'
403 def test_store_content_http_empty
404 received = Tempfile.new('received')
405 expected = "PUT /path HTTP/1.0\r\nContent-Length: 0\r\n\r\n"
406 t = TempServer.new(Proc.new do |serv, accept|
407 client, _ = serv.accept
409 received.syswrite(client.recv(4096, 0))
410 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
414 @backend.create_open = {
416 'path' => "http://127.0.0.1:#{t.port}/path",
419 nr = @client.store_content 'new_key', 'test', ''
422 assert_equal expected, received.sysread(4096)
425 def test_store_content_nfs
426 @backend.create_open = {
431 assert_raises MogileFS::UnsupportedPathError do
432 @client.store_content 'new_key', 'test', 'data'
436 def test_new_file_http_large
437 expect = Tempfile.new('test_mogilefs.expect')
438 to_put = Tempfile.new('test_mogilefs.to_put')
439 received = Tempfile.new('test_mogilefs.received')
442 chunk_size = 1024 * 1024
443 expect_size = nr * chunk_size
445 header = "PUT /path HTTP/1.0\r\n" \
446 "Content-Length: #{expect_size}\r\n\r\n"
447 assert_equal header.size, expect.syswrite(header)
449 assert_equal chunk_size, expect.syswrite(' ' * chunk_size)
450 assert_equal chunk_size, to_put.syswrite(' ' * chunk_size)
452 assert_equal expect_size + header.size, expect.stat.size
453 assert_equal expect_size, to_put.stat.size
455 readed = Tempfile.new('readed')
456 t = TempServer.new(Proc.new do |serv, accept|
457 client, _ = serv.accept
461 buf = client.readpartial(8192) or break
462 break if buf.length == 0
463 assert_equal buf.length, received.syswrite(buf)
465 break if nr >= expect.stat.size
467 readed.syswrite("#{nr}")
468 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
472 @backend.create_open = {
474 'path' => "http://127.0.0.1:#{t.port}/path",
477 orig_size = to_put.size
478 nr = @client.store_file('new_key', 'test', to_put.path)
479 assert nr, nr.inspect
480 assert_equal orig_size, nr
481 assert_equal orig_size, to_put.size
483 assert_equal expect.stat.size, readed.sysread(4096).to_i
485 ENV['PATH'].split(/:/).each do |path|
486 cmp_bin = "#{path}/cmp"
487 File.executable?(cmp_bin) or next
488 # puts "running #{cmp_bin} #{expect.path} #{received.path}"
489 assert( system(cmp_bin, expect.path, received.path) )
494 TempServer.destroy_all!
497 def test_store_content_readonly
498 @client.readonly = true
500 assert_raises MogileFS::ReadOnlyError do
501 @client.store_content 'new_key', 'test', nil
505 def test_store_file_readonly
506 @client.readonly = true
507 assert_raises MogileFS::ReadOnlyError do
508 @client.store_file 'new_key', 'test', nil
512 def test_rename_existing
515 assert_nil @client.rename('from_key', 'to_key')
518 def test_rename_nonexisting
519 @backend.rename = 'unknown_key', ''
521 assert_raises MogileFS::Backend::UnknownKeyError do
522 @client.rename('from_key', 'to_key')
526 def test_rename_no_key
527 @backend.rename = 'no_key', 'no_key'
529 e = assert_raises MogileFS::Backend::NoKeyError do
530 @client.rename 'new_key', 'test'
533 assert_equal 'no_key', e.message
536 def test_rename_readonly
537 @client.readonly = true
539 e = assert_raises MogileFS::ReadOnlyError do
540 @client.rename 'new_key', 'test'
543 assert_equal 'readonly mogilefs', e.message
546 def assert_get_paths_args(expect, *args)
547 sock = TCPServer.new("127.0.0.1", 0)
548 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
549 c = MogileFS::MogileFS.new(nargs)
555 a.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
558 paths_expect = %w(http://0/a http://0/b)
559 assert_equal paths_expect, c.get_paths("f", *args)
561 assert_equal 1, received.size
562 tmp = c.backend.url_decode(received[0].split(/\s+/)[1])
563 assert_equal "f", tmp.delete("key")
564 assert_equal "foo", tmp.delete("domain")
565 assert_equal expect, tmp
571 def test_get_paths_args
572 assert_get_paths_args({"noverify"=>"1", "zone"=>""})
573 assert_get_paths_args({"noverify"=>"0", "zone"=>""}, false)
574 assert_get_paths_args({"noverify"=>"0", "zone"=>""}, :noverify=>false)
575 assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"}, true, "alt")
576 assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"},
577 {:noverify => true, :zone => "alt"})
578 assert_get_paths_args({"noverify"=>"1", "zone"=>"alt","pathcount"=>"666"},
579 {:noverify => true, :zone => "alt", :pathcount=>666})
582 def test_idempotent_command_eof
584 a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
585 hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
586 args = { :hosts => hosts, :domain => "foo" }
587 c = MogileFS::MogileFS.new(args)
590 r = IO.select([a, b])
595 r = IO.select([a, b])
598 x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
601 expect = %w(http://0/a http://0/b)
602 assert_equal expect, c.get_paths("f")
604 assert_equal 2, received.size
605 assert_equal received[0], received[1]
608 def test_idempotent_command_slow
610 a = TCPServer.new(ip, 0)
611 hosts = [ "#{ip}:#{a.addr[1]}" ]
614 args = { :hosts => hosts, :domain => "foo", :timeout => timeout }
615 c = MogileFS::MogileFS.new(args)
621 %r{key=(\w+)} =~ line
624 x.write("OK paths=1&path1=http://0/#{$1}\r\n")
626 # EPIPE may or may not get raised due to timing issue,
627 # we don't care either way
629 flunk("#{e.message} (#{e.class})")
637 %r{key=(\w+)} =~ line
639 x.write("OK paths=1&path1=http://0/#{$1}\r\n")
641 flunk("#{e.message} (#{e.class})")
646 assert_raises(MogileFS::UnreadableSocketError) do
649 assert_equal :continue_test, q.pop, "avoid race during test"
650 expect2 = %w(http://0/b)
651 assert_equal expect2, c.get_paths("b")
656 def test_idempotent_command_response_truncated
658 a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
659 hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
660 args = { :hosts => hosts, :domain => "foo" }
661 c = MogileFS::MogileFS.new(args)
664 r = IO.select([a, b])
667 x.write("OK paths=2&path1=http://0/a&path2=http://0/")
670 r = IO.select([a, b])
673 x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
676 expect = %w(http://0/a http://0/b)
677 assert_equal expect, c.get_paths("f")
679 assert_equal 2, received.size
680 assert_equal received[0], received[1]
683 def test_non_idempotent_command_eof
685 a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
686 hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
687 args = { :hosts => hosts, :domain => "foo" }
688 c = MogileFS::MogileFS.new(args)
691 r = IO.select([a, b])
696 assert_raises(EOFError) { c.rename("a", "b") }
698 assert_equal 1, received.size
701 def test_list_keys_verbose_ordering # implementation detail
703 sock = TCPServer.new("127.0.0.1", 0)
704 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
705 c = MogileFS::MogileFS.new(nargs)
708 %w(a b c d e).each do |key|
710 cmd, args = line.split(/\s+/, 2)
711 args = c.backend.url_decode(args.strip)
712 assert_equal "file_info", cmd
713 assert_equal key, args["key"]
715 out = { "length" => 3, "devcount" => 6 }
716 %w(a b c d e).shuffle.each do |key|
718 a.write "OK #{c.backend.url_encode(out)}\r\n"
723 blk = lambda do |key, length, devcount|
724 received << [ key, length, devcount ]
726 c.list_keys_verbose(%w(a b c d e), blk)
728 received.map! { |(key,_,_)| key }
729 assert_equal %w(a b c d e), received
734 def test_list_keys_verbose_retry_eof # implementation detail
736 sock = TCPServer.new("127.0.0.1", 0)
737 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
738 c = MogileFS::MogileFS.new(nargs)
741 %w(a b c d e).each do |key|
743 cmd, args = line.split(/\s+/, 2)
744 args = c.backend.url_decode(args.strip)
745 assert_equal "file_info", cmd
746 assert_equal key, args["key"]
748 out = { "length" => 3, "devcount" => 6 }
749 %w(d e).each do |key|
751 a.write "OK #{c.backend.url_encode(out)}\r\n"
753 a.close # trigger EOF
754 a = sock.accept # client will retry
755 %w(a b c).each do |key|
757 cmd, args = line.split(/\s+/, 2)
758 args = c.backend.url_decode(args.strip)
759 assert_equal "file_info", cmd
760 assert_equal key, args["key"]
762 a.write "OK #{c.backend.url_encode(out)}\r\n"
767 blk = lambda do |key, length, devcount|
768 received << [ key, length, devcount ]
770 c.list_keys_verbose(%w(a b c d e), blk)
772 received.map! { |(key,_,_)| key }
773 assert_equal %w(a b c d e), received
778 def test_list_keys_verbose_retry_truncated # implementation detail
780 sock = TCPServer.new("127.0.0.1", 0)
781 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
782 c = MogileFS::MogileFS.new(nargs)
785 %w(a b c d e).each do |key|
787 cmd, args = line.split(/\s+/, 2)
788 args = c.backend.url_decode(args.strip)
789 assert_equal "file_info", cmd
790 assert_equal key, args["key"]
792 out = { "length" => 3, "devcount" => 6 }
794 a.write "OK #{c.backend.url_encode(out)}\r\n"
796 a.write "OK #{c.backend.url_encode(out)}"
797 a.close # trigger EOF
799 a = sock.accept # client will retry
800 %w(b c d e).each do |key|
802 cmd, args = line.split(/\s+/, 2)
803 args = c.backend.url_decode(args.strip)
804 assert_equal "file_info", cmd
805 assert_equal key, args["key"]
807 a.write "OK #{c.backend.url_encode(out)}\r\n"
812 blk = lambda do |key, length, devcount|
813 received << [ key, length, devcount ]
815 c.list_keys_verbose(%w(a b c d e), blk)
817 received.map! { |(key,_,_)| key }
818 assert_equal %w(a b c d e), received
825 assert_nothing_raised do
826 assert_equal({}, @client.sleep(2))
832 # tested with 1000, though it takes a while
834 ENV['NR_CHUNKS'] ? ENV['NR_CHUNKS'].to_i : 10