1 # -*- encoding: binary -*-
7 class TestMogileFS__MogileFS < TestMogileFS
9 @klass = MogileFS::MogileFS
14 assert_equal 'test', @client.domain
16 assert_raises ArgumentError do
17 MogileFS::MogileFS.new :hosts => ['kaa:6001']
21 def test_get_file_data_http
22 tmp = Tempfile.new('accept')
23 accept = File.open(tmp.path, "ab")
24 svr = Proc.new do |serv, port|
25 client, _ = serv.accept
27 readed = client.recv(4096, 0)
29 %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
31 client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
34 t1 = TempServer.new(svr)
35 t2 = TempServer.new(svr)
36 path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
37 path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
39 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
41 assert_equal 'data!', @client.get_file_data('key')
42 assert_equal 1, accept.stat.size
44 TempServer.destroy_all!
47 def test_get_file_data_http_not_found_failover
48 tmp = Tempfile.new('accept')
49 accept = File.open(tmp.path, 'ab')
50 svr1 = Proc.new do |serv, port|
51 client, _ = serv.accept
53 readed = client.recv(4096, 0)
55 %r{\AGET /dev1/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
57 client.send("HTTP/1.0 404 Not Found\r\n\r\ndata!", 0)
61 svr2 = Proc.new do |serv, port|
62 client, _ = serv.accept
64 readed = client.recv(4096, 0)
66 %r{\AGET /dev2/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
68 client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
72 t1 = TempServer.new(svr1)
73 t2 = TempServer.new(svr2)
74 path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
75 path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
76 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
78 assert_equal 'data!', @client.get_file_data('key')
79 assert_equal 2, accept.stat.size
81 TempServer.destroy_all!
84 def test_get_file_data_http_block
85 tmpfp = Tempfile.new('test_mogilefs.open_data')
87 chunk_size = 1024 * 1024
88 expect_size = nr * chunk_size
89 header = "HTTP/1.0 200 OK\r\n" \
90 "Content-Length: #{expect_size}\r\n\r\n"
91 assert_equal header.size, tmpfp.syswrite(header)
92 nr.times { assert_equal chunk_size, tmpfp.syswrite(' ' * chunk_size) }
93 assert_equal expect_size + header.size, File.size(tmpfp.path)
96 accept = Tempfile.new('accept')
97 svr = Proc.new do |serv, port|
98 client, _ = serv.accept
101 readed = client.recv(4096, 0)
103 %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
104 MogileFS.io.copy_stream(tmpfp, client)
108 t1 = TempServer.new(svr)
109 t2 = TempServer.new(svr)
110 path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
111 path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
113 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
115 data = Tempfile.new('test_mogilefs.dest_data')
117 @client.get_file_data('key') do |fp|
121 fp.sysread(16384, buf)
124 assert_equal read_nr, data.syswrite(buf), "partial write"
132 assert_equal expect_size, nr, "size mismatch"
133 assert_equal 1, accept.stat.size
137 path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
138 path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
140 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
142 expected = [ path1, path2 ]
144 assert_equal expected, @client.get_paths('key').sort
148 path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
149 path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
151 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
153 expected = [ URI.parse(path1), URI.parse(path2) ]
155 assert_equal expected, @client.get_uris('key')
159 def test_get_paths_unknown_key
160 @backend.get_paths = ['unknown_key', '']
162 assert_raises MogileFS::Backend::UnknownKeyError do
163 assert_equal nil, @client.get_paths('key')
167 def test_delete_existing
168 @backend.delete = { }
169 assert_nothing_raised do
170 @client.delete 'no_such_key'
174 def test_delete_nonexisting
175 @backend.delete = 'unknown_key', ''
176 assert_raises MogileFS::Backend::UnknownKeyError do
177 @client.delete('no_such_key')
181 def test_delete_readonly
182 @client.readonly = true
183 assert_raises MogileFS::ReadOnlyError do
184 @client.delete 'no_such_key'
189 @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_2',
190 'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
191 @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_4',
192 'key_1' => 'new_key_3', 'key_2' => 'new_key_4' }
193 @backend.list_keys = { 'key_count' => 0, 'next_after' => 'new_key_4' }
195 @client.each_key 'new' do |key|
199 assert_equal %w[new_key_1 new_key_2 new_key_3 new_key_4], keys
203 @backend.list_keys = { 'key_count' => '2', 'next_after' => 'new_key_2',
204 'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
206 keys, next_after = @client.list_keys 'new'
207 assert_equal ['new_key_1', 'new_key_2'], keys.sort
208 assert_equal 'new_key_2', next_after
211 def test_new_file_http
212 @client.readonly = true
213 assert_raises MogileFS::ReadOnlyError do
214 @client.new_file 'new_key', 'test'
218 def test_new_file_readonly
219 @client.readonly = true
220 assert_raises MogileFS::ReadOnlyError do
221 @client.new_file 'new_key', 'test'
225 def test_store_file_small_http
226 received = Tempfile.new('received')
227 to_store = Tempfile.new('small')
228 to_store.syswrite('data')
230 expected = "PUT /path HTTP/1.0\r\nContent-Length: 4\r\n\r\ndata"
231 t = TempServer.new(Proc.new do |serv, accept|
232 client, _ = serv.accept
234 received.syswrite(client.read(expected.bytesize))
235 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
239 @backend.create_open = {
241 'path' => "http://127.0.0.1:#{t.port}/path",
243 nr = @client.store_file 'new_key', 'test', to_store.path
246 assert_equal expected, received.sysread(4096)
248 TempServer.destroy_all!
251 def test_store_content_http
252 received = Tempfile.new('received')
253 expected = "PUT /path HTTP/1.0\r\nContent-Length: 4\r\n\r\ndata"
255 t = TempServer.new(Proc.new do |serv, accept|
256 client, _ = serv.accept
259 while seen !~ /\r\n\r\ndata/
260 buf = client.readpartial(4096)
262 received.syswrite(buf)
264 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
268 @backend.create_open = {
270 'path' => "http://127.0.0.1:#{t.port}/path",
273 nr = @client.store_content 'new_key', 'test', 'data'
278 assert_equal expected, received.sysread(4096)
280 TempServer.destroy_all!
284 def test_store_content_with_writer_callback
285 received = Tempfile.new('received')
286 expected = "PUT /path HTTP/1.0\r\nContent-Length: 40\r\n\r\n"
290 t = TempServer.new(Proc.new do |serv, accept|
291 client, _ = serv.accept
295 buf = client.readpartial(8192) or break
296 break if buf.length == 0
297 assert_equal buf.length, received.syswrite(buf)
299 break if nr >= expected.size
301 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
305 @backend.create_open = {
307 'path' => "http://127.0.0.1:#{t.port}/path",
310 cbk = MogileFS::Util::StoreContent.new(40) do |write_callback|
312 write_callback.call("data")
315 assert_equal 40, cbk.length
316 nr = @client.store_content('new_key', 'test', cbk)
320 assert_equal expected, received.sysread(4096)
322 TempServer.destroy_all!
325 def test_store_content_multi_dest_failover_path
326 test_store_content_multi_dest_failover(true)
329 def test_store_content_multi_dest_failover(big_io = false)
330 received1 = Tempfile.new('received')
331 received2 = Tempfile.new('received')
332 expected = "PUT /path HTTP/1.0\r\nContent-Length: 4\r\n\r\ndata"
334 t1 = TempServer.new(Proc.new do |serv, accept|
335 client, _ = serv.accept
337 while seen !~ /\r\n\r\ndata/
338 buf = client.readpartial(4096)
340 received1.syswrite(buf)
342 client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
346 t2 = TempServer.new(Proc.new do |serv, accept|
347 client, _ = serv.accept
349 while seen !~ /\r\n\r\ndata/
350 buf = client.readpartial(4096)
352 received2.syswrite(buf)
354 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
358 @backend.create_open = {
361 'path_1' => "http://127.0.0.1:#{t1.port}/path",
363 'path_2' => "http://127.0.0.1:#{t2.port}/path",
367 tmp = Tempfile.new('data')
370 nr = @client.store_file('new_key', 'test', tmp.path)
373 nr = @client.store_content 'new_key', 'test', 'data'
378 assert_equal expected, received1.sysread(4096)
379 assert_equal expected, received2.sysread(4096)
381 TempServer.destroy_all!
384 def test_store_content_http_fail
385 t = TempServer.new(Proc.new do |serv, accept|
386 client, _ = serv.accept
389 client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
393 @backend.create_open = {
395 'path' => "http://127.0.0.1:#{t.port}/path",
398 assert_raises MogileFS::HTTPFile::NoStorageNodesError do
399 @client.store_content 'new_key', 'test', 'data'
403 def test_store_content_http_empty
404 received = Tempfile.new('received')
405 expected = "PUT /path HTTP/1.0\r\nContent-Length: 0\r\n\r\n"
406 t = TempServer.new(Proc.new do |serv, accept|
407 client, _ = serv.accept
409 received.syswrite(client.recv(4096, 0))
410 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
414 @backend.create_open = {
416 'path' => "http://127.0.0.1:#{t.port}/path",
419 nr = @client.store_content 'new_key', 'test', ''
422 assert_equal expected, received.sysread(4096)
425 def test_store_content_nfs
426 @backend.create_open = {
431 assert_raises MogileFS::UnsupportedPathError do
432 @client.store_content 'new_key', 'test', 'data'
436 def test_new_file_http_large
437 expect = Tempfile.new('test_mogilefs.expect')
438 to_put = Tempfile.new('test_mogilefs.to_put')
439 received = Tempfile.new('test_mogilefs.received')
442 chunk_size = 1024 * 1024
443 expect_size = nr * chunk_size
445 header = "PUT /path HTTP/1.0\r\n" \
446 "Content-Length: #{expect_size}\r\n\r\n"
447 assert_equal header.size, expect.syswrite(header)
449 assert_equal chunk_size, expect.syswrite(' ' * chunk_size)
450 assert_equal chunk_size, to_put.syswrite(' ' * chunk_size)
452 assert_equal expect_size + header.size, expect.stat.size
453 assert_equal expect_size, to_put.stat.size
455 readed = Tempfile.new('readed')
456 t = TempServer.new(Proc.new do |serv, accept|
457 client, _ = serv.accept
461 buf = client.readpartial(8192) or break
462 break if buf.length == 0
463 assert_equal buf.length, received.syswrite(buf)
465 break if nr >= expect.stat.size
467 readed.syswrite("#{nr}")
468 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
472 @backend.create_open = {
474 'path' => "http://127.0.0.1:#{t.port}/path",
477 orig_size = to_put.size
478 nr = @client.store_file('new_key', 'test', to_put.path)
479 assert nr, nr.inspect
480 assert_equal orig_size, nr
481 assert_equal orig_size, to_put.size
483 assert_equal expect.stat.size, readed.sysread(4096).to_i
485 ENV['PATH'].split(/:/).each do |path|
486 cmp_bin = "#{path}/cmp"
487 File.executable?(cmp_bin) or next
488 # puts "running #{cmp_bin} #{expect.path} #{received.path}"
489 assert( system(cmp_bin, expect.path, received.path) )
494 TempServer.destroy_all!
497 def test_store_content_readonly
498 @client.readonly = true
500 assert_raises MogileFS::ReadOnlyError do
501 @client.store_content 'new_key', 'test', nil
505 def test_store_file_readonly
506 @client.readonly = true
507 assert_raises MogileFS::ReadOnlyError do
508 @client.store_file 'new_key', 'test', nil
512 def test_rename_existing
515 assert_nil @client.rename('from_key', 'to_key')
518 def test_rename_nonexisting
519 @backend.rename = 'unknown_key', ''
521 assert_raises MogileFS::Backend::UnknownKeyError do
522 @client.rename('from_key', 'to_key')
526 def test_rename_no_key
527 @backend.rename = 'no_key', 'no_key'
529 e = assert_raises MogileFS::Backend::NoKeyError do
530 @client.rename 'new_key', 'test'
533 assert_equal 'no_key', e.message
536 def test_rename_readonly
537 @client.readonly = true
539 e = assert_raises MogileFS::ReadOnlyError do
540 @client.rename 'new_key', 'test'
543 assert_equal 'readonly mogilefs', e.message
546 def assert_get_paths_args(expect, *args)
547 sock = TCPServer.new("127.0.0.1", 0)
548 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
549 c = MogileFS::MogileFS.new(nargs)
555 a.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
558 paths_expect = %w(http://0/a http://0/b)
559 assert_equal paths_expect, c.get_paths("f", *args)
561 assert_equal 1, received.size
562 tmp = c.backend.url_decode(received[0].split(/\s+/)[1])
563 assert_equal "f", tmp.delete("key")
564 assert_equal "foo", tmp.delete("domain")
565 assert_equal expect, tmp
571 def test_get_paths_args
572 assert_get_paths_args({"noverify"=>"1", "zone"=>""})
573 assert_get_paths_args({"noverify"=>"0", "zone"=>""}, false)
574 assert_get_paths_args({"noverify"=>"0", "zone"=>""}, :noverify=>false)
575 assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"}, true, "alt")
576 assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"},
577 {:noverify => true, :zone => "alt"})
578 assert_get_paths_args({"noverify"=>"1", "zone"=>"alt","pathcount"=>"666"},
579 {:noverify => true, :zone => "alt", :pathcount=>666})
582 def test_idempotent_command_eof
584 a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
585 hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
586 args = { :hosts => hosts, :domain => "foo" }
587 c = MogileFS::MogileFS.new(args)
590 r = IO.select([a, b])
595 r = IO.select([a, b])
598 x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
601 expect = %w(http://0/a http://0/b)
602 assert_equal expect, c.get_paths("f")
604 assert_equal 2, received.size
605 assert_equal received[0], received[1]
608 def test_idempotent_command_slow
610 a = TCPServer.new(ip, 0)
611 hosts = [ "#{ip}:#{a.addr[1]}" ]
614 args = { :hosts => hosts, :domain => "foo", :timeout => timeout }
615 c = MogileFS::MogileFS.new(args)
623 %r{key=(\w+)} =~ line
625 sleep(secs) # cause the client to timeout:
628 x.write("OK paths=1&path1=http://0/#{$1}\r\n")
630 # EPIPE may or may not get raised due to timing issue,
631 # we don't care either way
633 flunk("#{e.message} (#{e.class})")
637 # client should start a new connection here
641 %r{key=(\w+)} =~ line
643 y.write("OK paths=1&path1=http://0/#{$1}\r\n")
645 flunk("#{e.message} (#{e.class})")
648 # the client should've killed the old connection:
649 assert_raises(Errno::EPIPE) do
650 loop { x.write("OK paths=1&path1=http://0/#{$1}\r\n") }
653 close_later # main thread closes
655 assert_raises(MogileFS::UnreadableSocketError) do
658 assert_equal :continue_test, q.pop, "avoid race during test"
659 expect2 = %w(http://0/b)
660 assert_equal expect2, c.get_paths("b")
662 close_later = th.value
663 close_later.each { |io| assert_nil io.close }
666 def test_idempotent_command_response_truncated
668 a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
669 hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
670 args = { :hosts => hosts, :domain => "foo" }
671 c = MogileFS::MogileFS.new(args)
674 r = IO.select([a, b])
677 x.write("OK paths=2&path1=http://0/a&path2=http://0/")
680 r = IO.select([a, b])
683 x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
686 expect = %w(http://0/a http://0/b)
687 assert_equal expect, c.get_paths("f")
689 assert_equal 2, received.size
690 assert_equal received[0], received[1]
693 def test_non_idempotent_command_eof
695 a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
696 hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
697 args = { :hosts => hosts, :domain => "foo" }
698 c = MogileFS::MogileFS.new(args)
701 r = IO.select([a, b])
706 assert_raises(EOFError) { c.rename("a", "b") }
708 assert_equal 1, received.size
711 def test_list_keys_verbose_ordering # implementation detail
713 sock = TCPServer.new("127.0.0.1", 0)
714 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
715 c = MogileFS::MogileFS.new(nargs)
718 %w(a b c d e).each do |key|
720 cmd, args = line.split(/\s+/, 2)
721 args = c.backend.url_decode(args.strip)
722 assert_equal "file_info", cmd
723 assert_equal key, args["key"]
725 out = { "length" => 3, "devcount" => 6 }
726 %w(a b c d e).shuffle.each do |key|
728 a.write "OK #{c.backend.url_encode(out)}\r\n"
733 blk = lambda do |key, length, devcount|
734 received << [ key, length, devcount ]
736 c.list_keys_verbose(%w(a b c d e), blk)
738 received.map! { |(key,_,_)| key }
739 assert_equal %w(a b c d e), received
744 def test_list_keys_verbose_retry_eof # implementation detail
746 sock = TCPServer.new("127.0.0.1", 0)
747 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
748 c = MogileFS::MogileFS.new(nargs)
751 %w(a b c d e).each do |key|
753 cmd, args = line.split(/\s+/, 2)
754 args = c.backend.url_decode(args.strip)
755 assert_equal "file_info", cmd
756 assert_equal key, args["key"]
758 out = { "length" => 3, "devcount" => 6 }
759 %w(d e).each do |key|
761 a.write "OK #{c.backend.url_encode(out)}\r\n"
763 a.close # trigger EOF
764 a = sock.accept # client will retry
765 %w(a b c).each do |key|
767 cmd, args = line.split(/\s+/, 2)
768 args = c.backend.url_decode(args.strip)
769 assert_equal "file_info", cmd
770 assert_equal key, args["key"]
772 a.write "OK #{c.backend.url_encode(out)}\r\n"
777 blk = lambda do |key, length, devcount|
778 received << [ key, length, devcount ]
780 c.list_keys_verbose(%w(a b c d e), blk)
782 received.map! { |(key,_,_)| key }
783 assert_equal %w(a b c d e), received
788 def test_list_keys_verbose_retry_truncated # implementation detail
790 sock = TCPServer.new("127.0.0.1", 0)
791 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
792 c = MogileFS::MogileFS.new(nargs)
795 %w(a b c d e).each do |key|
797 cmd, args = line.split(/\s+/, 2)
798 args = c.backend.url_decode(args.strip)
799 assert_equal "file_info", cmd
800 assert_equal key, args["key"]
802 out = { "length" => 3, "devcount" => 6 }
804 a.write "OK #{c.backend.url_encode(out)}\r\n"
806 a.write "OK #{c.backend.url_encode(out)}"
807 a.close # trigger EOF
809 a = sock.accept # client will retry
810 %w(b c d e).each do |key|
812 cmd, args = line.split(/\s+/, 2)
813 args = c.backend.url_decode(args.strip)
814 assert_equal "file_info", cmd
815 assert_equal key, args["key"]
817 a.write "OK #{c.backend.url_encode(out)}\r\n"
822 blk = lambda do |key, length, devcount|
823 received << [ key, length, devcount ]
825 c.list_keys_verbose(%w(a b c d e), blk)
827 received.map! { |(key,_,_)| key }
828 assert_equal %w(a b c d e), received
835 assert_nothing_raised do
836 assert_equal({}, @client.sleep(2))
842 # tested with 1000, though it takes a while
844 ENV['NR_CHUNKS'] ? ENV['NR_CHUNKS'].to_i : 10