1 # -*- encoding: binary -*-
7 class TestMogileFS__MogileFS < TestMogileFS
9 @klass = MogileFS::MogileFS
14 assert_equal 'test', @client.domain
16 assert_raises ArgumentError do
17 MogileFS::MogileFS.new :hosts => ['kaa:6001']
21 def test_get_file_data_http
22 tmp = Tempfile.new('accept')
23 accept = File.open(tmp.path, "ab")
24 svr = Proc.new do |serv, port|
25 client, _ = serv.accept
27 readed = client.recv(4096, 0)
29 %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
31 client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
34 t1 = TempServer.new(svr)
35 t2 = TempServer.new(svr)
36 path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
37 path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
39 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
41 assert_equal 'data!', @client.get_file_data('key')
42 assert_equal 1, accept.stat.size
44 TempServer.destroy_all!
47 def test_get_file_data_http_not_found_failover
48 tmp = Tempfile.new('accept')
49 accept = File.open(tmp.path, 'ab')
50 svr1 = Proc.new do |serv, port|
51 client, _ = serv.accept
53 readed = client.recv(4096, 0)
55 %r{\AGET /dev1/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
57 client.send("HTTP/1.0 404 Not Found\r\n\r\ndata!", 0)
61 svr2 = Proc.new do |serv, port|
62 client, _ = serv.accept
64 readed = client.recv(4096, 0)
66 %r{\AGET /dev2/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
68 client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
72 t1 = TempServer.new(svr1)
73 t2 = TempServer.new(svr2)
74 path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
75 path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
76 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
78 assert_equal 'data!', @client.get_file_data('key')
79 assert_equal 2, accept.stat.size
81 TempServer.destroy_all!
84 def test_get_file_data_http_block
85 tmpfp = Tempfile.new('test_mogilefs.open_data')
87 chunk_size = 1024 * 1024
88 expect_size = nr * chunk_size
89 header = "HTTP/1.0 200 OK\r\n" \
90 "Content-Length: #{expect_size}\r\n\r\n"
91 assert_equal header.size, tmpfp.syswrite(header)
92 nr.times { assert_equal chunk_size, tmpfp.syswrite(' ' * chunk_size) }
93 assert_equal expect_size + header.size, File.size(tmpfp.path)
96 accept = Tempfile.new('accept')
97 svr = Proc.new do |serv, port|
98 client, _ = serv.accept
101 readed = client.recv(4096, 0)
103 %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
104 MogileFS.io.copy_stream(tmpfp, client)
108 t1 = TempServer.new(svr)
109 t2 = TempServer.new(svr)
110 path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
111 path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
113 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
115 data = Tempfile.new('test_mogilefs.dest_data')
117 @client.get_file_data('key') do |fp|
121 fp.sysread(16384, buf)
124 assert_equal read_nr, data.syswrite(buf), "partial write"
132 assert_equal expect_size, nr, "size mismatch"
133 assert_equal 1, accept.stat.size
137 path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
138 path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
140 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
142 expected = [ path1, path2 ]
144 assert_equal expected, @client.get_paths('key').sort
148 path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
149 path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
151 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
153 expected = [ URI.parse(path1), URI.parse(path2) ]
155 assert_equal expected, @client.get_uris('key')
159 def test_get_paths_unknown_key
160 @backend.get_paths = ['unknown_key', '']
162 assert_raises MogileFS::Backend::UnknownKeyError do
163 assert_equal nil, @client.get_paths('key')
167 def test_delete_existing
168 @backend.delete = { }
169 assert_nothing_raised do
170 @client.delete 'no_such_key'
174 def test_delete_nonexisting
175 @backend.delete = 'unknown_key', ''
176 assert_raises MogileFS::Backend::UnknownKeyError do
177 @client.delete('no_such_key')
181 def test_delete_readonly
182 @client.readonly = true
183 assert_raises MogileFS::ReadOnlyError do
184 @client.delete 'no_such_key'
189 @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_2',
190 'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
191 @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_4',
192 'key_1' => 'new_key_3', 'key_2' => 'new_key_4' }
193 @backend.list_keys = { 'key_count' => 0, 'next_after' => 'new_key_4' }
195 @client.each_key 'new' do |key|
199 assert_equal %w[new_key_1 new_key_2 new_key_3 new_key_4], keys
203 @backend.list_keys = { 'key_count' => '2', 'next_after' => 'new_key_2',
204 'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
206 keys, next_after = @client.list_keys 'new'
207 assert_equal ['new_key_1', 'new_key_2'], keys.sort
208 assert_equal 'new_key_2', next_after
211 def test_new_file_http
212 @client.readonly = true
213 assert_raises MogileFS::ReadOnlyError do
214 @client.new_file 'new_key', 'test'
218 def test_new_file_readonly
219 @client.readonly = true
220 assert_raises MogileFS::ReadOnlyError do
221 @client.new_file 'new_key', 'test'
225 def test_store_file_small_http
226 received = Tempfile.new('received')
227 to_store = Tempfile.new('small')
228 to_store.syswrite('data')
230 t = TempServer.new(Proc.new do |serv, accept|
231 client, _ = serv.accept
232 while buf = client.readpartial(666)
233 received.syswrite(buf)
234 break if buf =~ /data/
236 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
240 @backend.create_open = {
242 'path' => "http://127.0.0.1:#{t.port}/path",
244 nr = @client.store_file 'new_key', 'test', to_store.path
248 a = received.sysread(999999).split(/\r\n/)
249 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
250 assert_equal("data", a[-1])
251 assert_equal("", a[-2])
252 assert a.grep(%r{\AContent-Length: 4\z})[0]
254 TempServer.destroy_all!
257 def test_store_content_http
258 received = Tempfile.new('received')
260 t = TempServer.new(Proc.new do |serv, accept|
261 client, _ = serv.accept
264 while seen !~ /\r\n\r\ndata/
265 buf = client.readpartial(4096)
267 received.syswrite(buf)
269 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
273 @backend.create_open = {
275 'path' => "http://127.0.0.1:#{t.port}/path",
278 nr = @client.store_content 'new_key', 'test', 'data'
283 a = received.sysread(999999).split(/\r\n/)
284 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
285 assert_equal("data", a[-1])
286 assert_equal("", a[-2])
287 assert a.grep(%r{\AContent-Length: 4\z})[0]
289 TempServer.destroy_all!
293 def test_store_content_with_writer_callback
294 received = Tempfile.new('received')
295 expected = "PUT /path HTTP/1.0\r\nContent-Length: 40\r\n\r\n"
299 t = TempServer.new(Proc.new do |serv, accept|
300 client, _ = serv.accept
305 buf = client.readpartial(8192) or break
306 break if buf.length == 0
307 assert_equal buf.length, received.syswrite(buf)
310 break if seen =~ /\r\n\r\n(?:data){10}/
312 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
316 @backend.create_open = {
318 'path' => "http://127.0.0.1:#{t.port}/path",
321 cbk = MogileFS::Util::StoreContent.new(40) do |write_callback|
323 write_callback.call("data")
326 assert_equal 40, cbk.length
327 nr = @client.store_content('new_key', 'test', cbk)
331 a = received.sysread(999999).split(/\r\n/)
332 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
333 assert_equal("data" * 10, a[-1])
334 assert_equal("", a[-2])
335 assert a.grep(%r{\AContent-Length: 40\z})[0]
337 TempServer.destroy_all!
340 def test_store_content_multi_dest_failover_path
341 test_store_content_multi_dest_failover(true)
344 def test_store_content_multi_dest_failover(big_io = false)
345 received1 = Tempfile.new('received')
346 received2 = Tempfile.new('received')
348 t1 = TempServer.new(Proc.new do |serv, accept|
349 client, _ = serv.accept
351 while seen !~ /\r\n\r\ndata/
352 buf = client.readpartial(4096)
354 received1.syswrite(buf)
356 client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
360 t2 = TempServer.new(Proc.new do |serv, accept|
361 client, _ = serv.accept
363 while seen !~ /\r\n\r\ndata/
364 buf = client.readpartial(4096)
366 received2.syswrite(buf)
368 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
372 @backend.create_open = {
375 'path_1' => "http://127.0.0.1:#{t1.port}/path",
377 'path_2' => "http://127.0.0.1:#{t2.port}/path",
381 tmp = Tempfile.new('data')
384 nr = @client.store_file('new_key', 'test', tmp.path)
387 nr = @client.store_content 'new_key', 'test', 'data'
392 a = received1.sysread(4096).split(/\r\n/)
393 b = received2.sysread(4096).split(/\r\n/)
394 assert_equal a[0], b[0]
395 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
396 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, b[0])
397 assert_equal("data", a[-1])
398 assert_equal("data", b[-1])
399 assert_equal("", a[-2])
400 assert_equal("", b[-2])
401 assert a.grep(%r{\AContent-Length: 4\z})[0]
402 assert b.grep(%r{\AContent-Length: 4\z})[0]
404 TempServer.destroy_all!
407 def test_store_content_http_fail
408 t = TempServer.new(Proc.new do |serv, accept|
409 client, _ = serv.accept
412 client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
416 @backend.create_open = {
418 'path' => "http://127.0.0.1:#{t.port}/path",
421 assert_raises MogileFS::HTTPFile::NoStorageNodesError do
422 @client.store_content 'new_key', 'test', 'data'
426 def test_store_content_http_empty
427 received = Tempfile.new('received')
428 t = TempServer.new(Proc.new do |serv, accept|
429 client, _ = serv.accept
431 received.syswrite(client.recv(4096, 0))
432 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
436 @backend.create_open = {
438 'path' => "http://127.0.0.1:#{t.port}/path",
441 nr = @client.store_content 'new_key', 'test', ''
444 a = received.sysread(4096).split(/\r\n/)
445 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
446 assert a.grep(%r{\AContent-Length: 0\z})[0]
449 def test_store_content_nfs
450 @backend.create_open = {
455 assert_raises MogileFS::UnsupportedPathError do
456 @client.store_content 'new_key', 'test', 'data'
460 def test_new_file_http_large
461 expect = Tempfile.new('test_mogilefs.expect')
462 to_put = Tempfile.new('test_mogilefs.to_put')
463 received = Tempfile.new('test_mogilefs.received')
466 chunk_size = 1024 * 1024
467 expect_size = nr * chunk_size
469 header = "PUT /path HTTP/1.0\r\n" \
470 "Content-Length: #{expect_size}\r\n\r\n"
471 assert_equal header.size, expect.syswrite(header)
473 assert_equal chunk_size, expect.syswrite(' ' * chunk_size)
474 assert_equal chunk_size, to_put.syswrite(' ' * chunk_size)
476 assert_equal expect_size + header.size, expect.stat.size
477 assert_equal expect_size, to_put.stat.size
479 readed = Tempfile.new('readed')
480 t = TempServer.new(Proc.new do |serv, accept|
481 client, _ = serv.accept
485 buf = client.readpartial(8192) or break
486 break if buf.length == 0
487 assert_equal buf.length, received.syswrite(buf)
489 break if nr >= expect.stat.size
491 readed.syswrite("#{nr}")
492 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
496 @backend.create_open = {
498 'path' => "http://127.0.0.1:#{t.port}/path",
501 orig_size = to_put.size
502 nr = @client.store_file('new_key', 'test', to_put.path)
503 assert nr, nr.inspect
504 assert_equal orig_size, nr
505 assert_equal orig_size, to_put.size
507 assert_equal expect.stat.size, readed.sysread(4096).to_i
509 ENV['PATH'].split(/:/).each do |path|
510 cmp_bin = "#{path}/cmp"
511 File.executable?(cmp_bin) or next
512 # puts "running #{cmp_bin} #{expect.path} #{received.path}"
513 assert( system(cmp_bin, expect.path, received.path) )
518 TempServer.destroy_all!
521 def test_store_content_readonly
522 @client.readonly = true
524 assert_raises MogileFS::ReadOnlyError do
525 @client.store_content 'new_key', 'test', nil
529 def test_store_file_readonly
530 @client.readonly = true
531 assert_raises MogileFS::ReadOnlyError do
532 @client.store_file 'new_key', 'test', nil
536 def test_rename_existing
539 assert_nil @client.rename('from_key', 'to_key')
542 def test_rename_nonexisting
543 @backend.rename = 'unknown_key', ''
545 assert_raises MogileFS::Backend::UnknownKeyError do
546 @client.rename('from_key', 'to_key')
550 def test_rename_no_key
551 @backend.rename = 'no_key', 'no_key'
553 e = assert_raises MogileFS::Backend::NoKeyError do
554 @client.rename 'new_key', 'test'
557 assert_equal 'no_key', e.message
560 def test_rename_readonly
561 @client.readonly = true
563 e = assert_raises MogileFS::ReadOnlyError do
564 @client.rename 'new_key', 'test'
567 assert_equal 'readonly mogilefs', e.message
570 def assert_get_paths_args(expect, *args)
571 sock = TCPServer.new("127.0.0.1", 0)
572 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
573 c = MogileFS::MogileFS.new(nargs)
579 a.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
582 paths_expect = %w(http://0/a http://0/b)
583 assert_equal paths_expect, c.get_paths("f", *args)
585 assert_equal 1, received.size
586 tmp = c.backend.url_decode(received[0].split(/\s+/)[1])
587 assert_equal "f", tmp.delete("key")
588 assert_equal "foo", tmp.delete("domain")
589 assert_equal expect, tmp
595 def test_get_paths_args
596 assert_get_paths_args({"noverify"=>"1", "zone"=>""})
597 assert_get_paths_args({"noverify"=>"0", "zone"=>""}, false)
598 assert_get_paths_args({"noverify"=>"0", "zone"=>""}, :noverify=>false)
599 assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"}, true, "alt")
600 assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"},
601 {:noverify => true, :zone => "alt"})
602 assert_get_paths_args({"noverify"=>"1", "zone"=>"alt","pathcount"=>"666"},
603 {:noverify => true, :zone => "alt", :pathcount=>666})
606 def test_idempotent_command_eof
608 a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
609 hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
610 args = { :hosts => hosts, :domain => "foo" }
611 c = MogileFS::MogileFS.new(args)
614 r = IO.select([a, b])
619 r = IO.select([a, b])
622 x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
625 expect = %w(http://0/a http://0/b)
626 assert_equal expect, c.get_paths("f")
628 assert_equal 2, received.size
629 assert_equal received[0], received[1]
632 def test_idempotent_command_slow
634 a = TCPServer.new(ip, 0)
635 hosts = [ "#{ip}:#{a.addr[1]}" ]
638 args = { :hosts => hosts, :domain => "foo", :timeout => timeout }
639 c = MogileFS::MogileFS.new(args)
646 %r{key=(\w+)} =~ line
648 sleep(secs) # cause the client to timeout:
651 x.write("OK paths=1&path1=http://0/#{$1}\r\n")
653 # EPIPE may or may not get raised due to timing issue,
654 # we don't care either way
656 flunk("#{e.message} (#{e.class})")
660 # client should start a new connection here
664 %r{key=(\w+)} =~ line
666 y.write("OK paths=1&path1=http://0/#{$1}\r\n")
668 flunk("#{e.message} (#{e.class})")
671 # the client should've killed the old connection:
672 assert_raises(Errno::EPIPE) do
673 loop { x.write("OK paths=1&path1=http://0/#{$1}\r\n") }
676 close_later # main thread closes
678 assert_raises(MogileFS::UnreadableSocketError) do
681 assert_equal :continue_test, q.pop, "avoid race during test"
682 expect2 = %w(http://0/b)
683 assert_equal expect2, c.get_paths("b")
685 close_later = th.value
686 close_later.each { |io| assert_nil io.close }
689 def test_idempotent_command_response_truncated
691 a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
692 hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
693 args = { :hosts => hosts, :domain => "foo" }
694 c = MogileFS::MogileFS.new(args)
697 r = IO.select([a, b])
700 x.write("OK paths=2&path1=http://0/a&path2=http://0/")
703 r = IO.select([a, b])
706 x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
709 expect = %w(http://0/a http://0/b)
710 assert_equal expect, c.get_paths("f")
712 assert_equal 2, received.size
713 assert_equal received[0], received[1]
716 def test_non_idempotent_command_eof
718 a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
719 hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
720 args = { :hosts => hosts, :domain => "foo" }
721 c = MogileFS::MogileFS.new(args)
724 r = IO.select([a, b])
729 assert_raises(EOFError) { c.rename("a", "b") }
731 assert_equal 1, received.size
734 def test_list_keys_verbose_ordering # implementation detail
736 sock = TCPServer.new("127.0.0.1", 0)
737 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
738 c = MogileFS::MogileFS.new(nargs)
741 %w(a b c d e).each do |key|
743 cmd, args = line.split(/\s+/, 2)
744 args = c.backend.url_decode(args.strip)
745 assert_equal "file_info", cmd
746 assert_equal key, args["key"]
748 out = { "length" => 3, "devcount" => 6 }
749 %w(a b c d e).shuffle.each do |key|
751 a.write "OK #{c.backend.url_encode(out)}\r\n"
756 blk = lambda do |key, length, devcount|
757 received << [ key, length, devcount ]
759 c.list_keys_verbose(%w(a b c d e), blk)
761 received.map! { |(key,_,_)| key }
762 assert_equal %w(a b c d e), received
767 def test_list_keys_verbose_retry_eof # implementation detail
769 sock = TCPServer.new("127.0.0.1", 0)
770 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
771 c = MogileFS::MogileFS.new(nargs)
774 %w(a b c d e).each do |key|
776 cmd, args = line.split(/\s+/, 2)
777 args = c.backend.url_decode(args.strip)
778 assert_equal "file_info", cmd
779 assert_equal key, args["key"]
781 out = { "length" => 3, "devcount" => 6 }
782 %w(d e).each do |key|
784 a.write "OK #{c.backend.url_encode(out)}\r\n"
786 a.close # trigger EOF
787 a = sock.accept # client will retry
788 %w(a b c).each do |key|
790 cmd, args = line.split(/\s+/, 2)
791 args = c.backend.url_decode(args.strip)
792 assert_equal "file_info", cmd
793 assert_equal key, args["key"]
795 a.write "OK #{c.backend.url_encode(out)}\r\n"
800 blk = lambda do |key, length, devcount|
801 received << [ key, length, devcount ]
803 c.list_keys_verbose(%w(a b c d e), blk)
805 received.map! { |(key,_,_)| key }
806 assert_equal %w(a b c d e), received
811 def test_list_keys_verbose_retry_truncated # implementation detail
813 sock = TCPServer.new("127.0.0.1", 0)
814 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
815 c = MogileFS::MogileFS.new(nargs)
818 %w(a b c d e).each do |key|
820 cmd, args = line.split(/\s+/, 2)
821 args = c.backend.url_decode(args.strip)
822 assert_equal "file_info", cmd
823 assert_equal key, args["key"]
825 out = { "length" => 3, "devcount" => 6 }
827 a.write "OK #{c.backend.url_encode(out)}\r\n"
829 a.write "OK #{c.backend.url_encode(out)}"
830 a.close # trigger EOF
832 a = sock.accept # client will retry
833 %w(b c d e).each do |key|
835 cmd, args = line.split(/\s+/, 2)
836 args = c.backend.url_decode(args.strip)
837 assert_equal "file_info", cmd
838 assert_equal key, args["key"]
840 a.write "OK #{c.backend.url_encode(out)}\r\n"
845 blk = lambda do |key, length, devcount|
846 received << [ key, length, devcount ]
848 c.list_keys_verbose(%w(a b c d e), blk)
850 received.map! { |(key,_,_)| key }
851 assert_equal %w(a b c d e), received
858 assert_nothing_raised do
859 assert_equal({}, @client.sleep(2))
865 # tested with 1000, though it takes a while
867 ENV['NR_CHUNKS'] ? ENV['NR_CHUNKS'].to_i : 10