1 # -*- encoding: binary -*-
7 class TestMogileFS__MogileFS < TestMogileFS
9 @klass = MogileFS::MogileFS
13 def read_headers(client)
15 while line = client.gets
17 return headers if line == "\r\n"
22 assert_equal 'test', @client.domain
24 assert_raises ArgumentError do
25 MogileFS::MogileFS.new :hosts => ['kaa:6001']
29 def test_get_file_data_http
30 tmp = Tempfile.new('accept')
31 accept = File.open(tmp.path, "ab")
32 svr = Proc.new do |serv, port|
33 client, _ = serv.accept
35 readed = read_headers(client)
37 %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n},
40 client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
43 t1 = TempServer.new(svr)
44 t2 = TempServer.new(svr)
45 path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
46 path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
48 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
50 assert_equal 'data!', @client.get_file_data('key')
51 assert_equal 1, accept.stat.size
53 TempServer.destroy_all!
56 def test_get_file_data_http_not_found_failover
57 tmp = Tempfile.new('accept')
58 accept = File.open(tmp.path, 'ab')
59 svr1 = Proc.new do |serv, port|
60 client, _ = serv.accept
62 readed = read_headers(client)
64 %r{\AGET /dev1/0/000/000/0000000062\.fid HTTP/1.[01]\r\n},
67 client.send("HTTP/1.0 404 Not Found\r\n\r\ndata!", 0)
71 svr2 = Proc.new do |serv, port|
72 client, _ = serv.accept
74 readed = read_headers(client)
76 %r{\AGET /dev2/0/000/000/0000000062\.fid HTTP/1.[01].*\r\n},
79 client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
83 t1 = TempServer.new(svr1)
84 t2 = TempServer.new(svr2)
85 path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
86 path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
87 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
89 assert_equal 'data!', @client.get_file_data('key')
90 assert_equal 2, accept.stat.size
92 TempServer.destroy_all!
95 def test_get_file_data_http_block
96 tmpfp = Tempfile.new('test_mogilefs.open_data')
98 chunk_size = 1024 * 1024
99 expect_size = nr * chunk_size
100 header = "HTTP/1.0 200 OK\r\n" \
101 "Content-Length: #{expect_size}\r\n\r\n"
102 assert_equal header.size, tmpfp.syswrite(header)
103 nr.times { assert_equal chunk_size, tmpfp.syswrite(' ' * chunk_size) }
104 assert_equal expect_size + header.size, File.size(tmpfp.path)
107 accept = Tempfile.new('accept')
108 svr = Proc.new do |serv, port|
109 client, _ = serv.accept
112 readed = read_headers(client)
114 %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n},
116 MogileFS.io.copy_stream(tmpfp, client)
120 t1 = TempServer.new(svr)
121 t2 = TempServer.new(svr)
122 path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
123 path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
125 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
127 data = Tempfile.new('test_mogilefs.dest_data')
129 @client.get_file_data('key') do |fp|
133 fp.sysread(16384, buf)
136 assert_equal read_nr, data.syswrite(buf), "partial write"
144 assert_equal expect_size, nr, "size mismatch"
145 assert_equal 1, accept.stat.size
149 path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
150 path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
152 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
154 expected = [ path1, path2 ]
156 assert_equal expected, @client.get_paths('key').sort
160 path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
161 path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
163 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
165 expected = [ URI.parse(path1), URI.parse(path2) ]
167 assert_equal expected, @client.get_uris('key')
171 def test_get_paths_unknown_key
172 @backend.get_paths = ['unknown_key', '']
174 assert_raises MogileFS::Backend::UnknownKeyError do
175 assert_equal nil, @client.get_paths('key')
179 def test_delete_existing
180 @backend.delete = { }
181 @client.delete 'no_such_key'
184 def test_delete_nonexisting
185 @backend.delete = 'unknown_key', ''
186 assert_raises MogileFS::Backend::UnknownKeyError do
187 @client.delete('no_such_key')
191 def test_delete_readonly
192 @client.readonly = true
193 assert_raises MogileFS::ReadOnlyError do
194 @client.delete 'no_such_key'
199 @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_2',
200 'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
201 @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_4',
202 'key_1' => 'new_key_3', 'key_2' => 'new_key_4' }
203 @backend.list_keys = { 'key_count' => 0, 'next_after' => 'new_key_4' }
205 @client.each_key 'new' do |key|
209 assert_equal %w[new_key_1 new_key_2 new_key_3 new_key_4], keys
213 @backend.list_keys = { 'key_count' => '2', 'next_after' => 'new_key_2',
214 'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
216 keys, next_after = @client.list_keys 'new'
217 assert_equal ['new_key_1', 'new_key_2'], keys.sort
218 assert_equal 'new_key_2', next_after
221 def test_new_file_http
222 @client.readonly = true
223 assert_raises MogileFS::ReadOnlyError do
224 @client.new_file 'new_key', 'test'
228 def test_new_file_readonly
229 @client.readonly = true
230 assert_raises MogileFS::ReadOnlyError do
231 @client.new_file 'new_key', 'test'
235 def test_store_file_small_http
236 received = Tempfile.new('received')
237 to_store = Tempfile.new('small')
238 to_store.syswrite('data')
240 t = TempServer.new(Proc.new do |serv, accept|
241 client, _ = serv.accept
242 while buf = client.readpartial(666)
243 received.syswrite(buf)
244 break if buf =~ /data/
246 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
250 @backend.create_open = {
252 'path' => "http://127.0.0.1:#{t.port}/path",
254 nr = @client.store_file 'new_key', 'test', to_store.path
258 a = received.sysread(999999).split(/\r\n/)
259 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
260 assert_equal("data", a[-1])
261 assert_equal("", a[-2])
262 assert a.grep(%r{\AContent-Length: 4\z})[0]
264 TempServer.destroy_all!
267 def test_store_content_http
268 received = Tempfile.new('received')
270 t = TempServer.new(Proc.new do |serv, accept|
271 client, _ = serv.accept
274 while seen !~ /\r\n\r\ndata/
275 buf = client.readpartial(4096)
277 received.syswrite(buf)
279 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
283 @backend.create_open = {
285 'path' => "http://127.0.0.1:#{t.port}/path",
288 nr = @client.store_content 'new_key', 'test', 'data'
293 a = received.sysread(999999).split(/\r\n/)
294 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
295 assert_equal("data", a[-1])
296 assert_equal("", a[-2])
297 assert a.grep(%r{\AContent-Length: 4\z})[0]
299 TempServer.destroy_all!
303 def test_store_content_with_writer_callback
304 received = Tempfile.new('received')
305 expected = "PUT /path HTTP/1.0\r\nContent-Length: 40\r\n\r\n"
309 t = TempServer.new(Proc.new do |serv, accept|
310 client, _ = serv.accept
315 buf = client.readpartial(8192) or break
316 break if buf.length == 0
317 assert_equal buf.length, received.syswrite(buf)
320 break if seen =~ /\r\n\r\n(?:data){10}/
322 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
326 @backend.create_open = {
328 'path' => "http://127.0.0.1:#{t.port}/path",
331 cbk = MogileFS::Util::StoreContent.new(40) do |write_callback|
333 write_callback.call("data")
336 assert_equal 40, cbk.length
337 nr = @client.store_content('new_key', 'test', cbk)
341 a = received.sysread(999999).split(/\r\n/)
342 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
343 assert_equal("data" * 10, a[-1])
344 assert_equal("", a[-2])
345 assert a.grep(%r{\AContent-Length: 40\z})[0]
347 TempServer.destroy_all!
350 def test_store_content_multi_dest_failover_path
351 test_store_content_multi_dest_failover(true)
354 def test_store_content_multi_dest_failover(big_io = false)
355 received1 = Tempfile.new('received')
356 received2 = Tempfile.new('received')
358 t1 = TempServer.new(Proc.new do |serv, accept|
359 client, _ = serv.accept
361 while seen !~ /\r\n\r\ndata/
362 buf = client.readpartial(4096)
364 received1.syswrite(buf)
366 client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
370 t2 = TempServer.new(Proc.new do |serv, accept|
371 client, _ = serv.accept
373 while seen !~ /\r\n\r\ndata/
374 buf = client.readpartial(4096)
376 received2.syswrite(buf)
378 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
382 @backend.create_open = {
385 'path_1' => "http://127.0.0.1:#{t1.port}/path",
387 'path_2' => "http://127.0.0.1:#{t2.port}/path",
391 tmp = Tempfile.new('data')
394 nr = @client.store_file('new_key', 'test', tmp.path)
397 nr = @client.store_content 'new_key', 'test', 'data'
402 a = received1.sysread(4096).split(/\r\n/)
403 b = received2.sysread(4096).split(/\r\n/)
404 assert_equal a[0], b[0]
405 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
406 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, b[0])
407 assert_equal("data", a[-1])
408 assert_equal("data", b[-1])
409 assert_equal("", a[-2])
410 assert_equal("", b[-2])
411 assert a.grep(%r{\AContent-Length: 4\z})[0]
412 assert b.grep(%r{\AContent-Length: 4\z})[0]
414 TempServer.destroy_all!
417 def test_store_content_http_fail
418 t = TempServer.new(Proc.new do |serv, accept|
419 client, _ = serv.accept
422 client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
426 @backend.create_open = {
428 'path' => "http://127.0.0.1:#{t.port}/path",
431 assert_raises MogileFS::HTTPFile::NoStorageNodesError do
432 @client.store_content 'new_key', 'test', 'data'
436 def test_store_content_http_empty
437 received = Tempfile.new('received')
438 t = TempServer.new(Proc.new do |serv, accept|
439 client, _ = serv.accept
441 received.syswrite(client.recv(4096, 0))
442 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
446 @backend.create_open = {
448 'path' => "http://127.0.0.1:#{t.port}/path",
451 nr = @client.store_content 'new_key', 'test', ''
454 a = received.sysread(4096).split(/\r\n/)
455 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
456 assert a.grep(%r{\AContent-Length: 0\z})[0]
459 def test_store_content_nfs
460 @backend.create_open = {
465 assert_raises MogileFS::UnsupportedPathError do
466 @client.store_content 'new_key', 'test', 'data'
470 def test_new_file_http_large
471 expect = Tempfile.new('test_mogilefs.expect')
472 to_put = Tempfile.new('test_mogilefs.to_put')
473 received = Tempfile.new('test_mogilefs.received')
476 chunk_size = 1024 * 1024
477 expect_size = nr * chunk_size
479 header = "PUT /path HTTP/1.0\r\n" \
480 "Content-Length: #{expect_size}\r\n\r\n"
481 assert_equal header.size, expect.syswrite(header)
483 assert_equal chunk_size, expect.syswrite(' ' * chunk_size)
484 assert_equal chunk_size, to_put.syswrite(' ' * chunk_size)
486 assert_equal expect_size + header.size, expect.stat.size
487 assert_equal expect_size, to_put.stat.size
489 readed = Tempfile.new('readed')
490 t = TempServer.new(Proc.new do |serv, accept|
491 client, _ = serv.accept
495 buf = client.readpartial(8192) or break
496 break if buf.length == 0
497 assert_equal buf.length, received.syswrite(buf)
499 break if nr >= expect.stat.size
501 readed.syswrite("#{nr}")
502 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
506 @backend.create_open = {
508 'path' => "http://127.0.0.1:#{t.port}/path",
511 orig_size = to_put.size
512 nr = @client.store_file('new_key', 'test', to_put.path)
513 assert nr, nr.inspect
514 assert_equal orig_size, nr
515 assert_equal orig_size, to_put.size
517 assert_equal expect.stat.size, readed.sysread(4096).to_i
519 ENV['PATH'].split(/:/).each do |path|
520 cmp_bin = "#{path}/cmp"
521 File.executable?(cmp_bin) or next
522 # puts "running #{cmp_bin} #{expect.path} #{received.path}"
523 assert( system(cmp_bin, expect.path, received.path) )
528 TempServer.destroy_all!
531 def test_store_content_readonly
532 @client.readonly = true
534 assert_raises MogileFS::ReadOnlyError do
535 @client.store_content 'new_key', 'test', nil
539 def test_store_file_readonly
540 @client.readonly = true
541 assert_raises MogileFS::ReadOnlyError do
542 @client.store_file 'new_key', 'test', nil
546 def test_rename_existing
549 assert_nil @client.rename('from_key', 'to_key')
552 def test_rename_nonexisting
553 @backend.rename = 'unknown_key', ''
555 assert_raises MogileFS::Backend::UnknownKeyError do
556 @client.rename('from_key', 'to_key')
560 def test_rename_no_key
561 @backend.rename = 'no_key', 'no_key'
563 e = assert_raises MogileFS::Backend::NoKeyError do
564 @client.rename 'new_key', 'test'
567 assert_equal 'no_key', e.message
570 def test_rename_readonly
571 @client.readonly = true
573 e = assert_raises MogileFS::ReadOnlyError do
574 @client.rename 'new_key', 'test'
577 assert_equal 'readonly mogilefs', e.message
580 def assert_get_paths_args(expect, *args)
581 sock = TCPServer.new("127.0.0.1", 0)
582 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
583 c = MogileFS::MogileFS.new(nargs)
589 a.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
592 paths_expect = %w(http://0/a http://0/b)
593 assert_equal paths_expect, c.get_paths("f", *args)
595 assert_equal 1, received.size
596 tmp = c.backend.url_decode(received[0].split(/\s+/)[1])
597 assert_equal "f", tmp.delete("key")
598 assert_equal "foo", tmp.delete("domain")
599 assert_equal expect, tmp
605 def test_get_paths_args
606 assert_get_paths_args({"noverify"=>"1", "zone"=>""})
607 assert_get_paths_args({"noverify"=>"0", "zone"=>""}, false)
608 assert_get_paths_args({"noverify"=>"0", "zone"=>""}, :noverify=>false)
609 assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"}, true, "alt")
610 assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"},
611 {:noverify => true, :zone => "alt"})
612 assert_get_paths_args({"noverify"=>"1", "zone"=>"alt","pathcount"=>"666"},
613 {:noverify => true, :zone => "alt", :pathcount=>666})
616 def test_idempotent_command_eof
618 a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
619 hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
620 args = { :hosts => hosts, :domain => "foo" }
621 c = MogileFS::MogileFS.new(args)
624 r = IO.select([a, b])
629 r = IO.select([a, b])
632 x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
635 expect = %w(http://0/a http://0/b)
636 assert_equal expect, c.get_paths("f")
638 assert_equal 2, received.size
639 assert_equal received[0], received[1]
642 def test_idempotent_command_slow
644 a = TCPServer.new(ip, 0)
645 hosts = [ "#{ip}:#{a.addr[1]}" ]
648 args = { :hosts => hosts, :domain => "foo", :timeout => timeout }
649 c = MogileFS::MogileFS.new(args)
656 %r{key=(\w+)} =~ line
658 sleep(secs) # cause the client to timeout:
661 x.write("OK paths=1&path1=http://0/#{$1}\r\n")
663 # EPIPE may or may not get raised due to timing issue,
664 # we don't care either way
666 flunk("#{e.message} (#{e.class})")
670 # client should start a new connection here
674 %r{key=(\w+)} =~ line
676 y.write("OK paths=1&path1=http://0/#{$1}\r\n")
678 flunk("#{e.message} (#{e.class})")
681 # the client should've killed the old connection:
682 assert_raises(Errno::EPIPE) do
683 loop { x.write("OK paths=1&path1=http://0/#{$1}\r\n") }
686 close_later # main thread closes
688 assert_raises(MogileFS::UnreadableSocketError) do
691 assert_equal :continue_test, q.pop, "avoid race during test"
692 expect2 = %w(http://0/b)
693 assert_equal expect2, c.get_paths("b")
695 close_later = th.value
696 close_later.each { |io| assert_nil io.close }
699 def test_idempotent_command_response_truncated
701 a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
702 hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
703 args = { :hosts => hosts, :domain => "foo" }
704 c = MogileFS::MogileFS.new(args)
707 r = IO.select([a, b])
710 x.write("OK paths=2&path1=http://0/a&path2=http://0/")
713 r = IO.select([a, b])
716 x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
719 expect = %w(http://0/a http://0/b)
720 assert_equal expect, c.get_paths("f")
722 assert_equal 2, received.size
723 assert_equal received[0], received[1]
726 def test_non_idempotent_command_eof
728 a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
729 hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
730 args = { :hosts => hosts, :domain => "foo" }
731 c = MogileFS::MogileFS.new(args)
734 r = IO.select([a, b])
739 assert_raises(EOFError) { c.rename("a", "b") }
741 assert_equal 1, received.size
744 def test_list_keys_verbose_ordering # implementation detail
746 sock = TCPServer.new("127.0.0.1", 0)
747 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
748 c = MogileFS::MogileFS.new(nargs)
751 %w(a b c d e).each do |key|
753 cmd, args = line.split(/\s+/, 2)
754 args = c.backend.url_decode(args.strip)
755 assert_equal "file_info", cmd
756 assert_equal key, args["key"]
758 out = { "length" => 3, "devcount" => 6 }
759 %w(a b c d e).shuffle.each do |key|
761 a.write "OK #{c.backend.url_encode(out)}\r\n"
766 blk = lambda do |key, length, devcount|
767 received << [ key, length, devcount ]
769 c.list_keys_verbose(%w(a b c d e), blk)
771 received.map! { |(key,_,_)| key }
772 assert_equal %w(a b c d e), received
777 def test_list_keys_verbose_retry_eof # implementation detail
779 sock = TCPServer.new("127.0.0.1", 0)
780 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
781 c = MogileFS::MogileFS.new(nargs)
784 %w(a b c d e).each do |key|
786 cmd, args = line.split(/\s+/, 2)
787 args = c.backend.url_decode(args.strip)
788 assert_equal "file_info", cmd
789 assert_equal key, args["key"]
791 out = { "length" => 3, "devcount" => 6 }
792 %w(d e).each do |key|
794 a.write "OK #{c.backend.url_encode(out)}\r\n"
796 a.close # trigger EOF
797 a = sock.accept # client will retry
798 %w(a b c).each do |key|
800 cmd, args = line.split(/\s+/, 2)
801 args = c.backend.url_decode(args.strip)
802 assert_equal "file_info", cmd
803 assert_equal key, args["key"]
805 a.write "OK #{c.backend.url_encode(out)}\r\n"
810 blk = lambda do |key, length, devcount|
811 received << [ key, length, devcount ]
813 c.list_keys_verbose(%w(a b c d e), blk)
815 received.map! { |(key,_,_)| key }
816 assert_equal %w(a b c d e), received
821 def test_list_keys_verbose_retry_truncated # implementation detail
823 sock = TCPServer.new("127.0.0.1", 0)
824 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
825 c = MogileFS::MogileFS.new(nargs)
828 %w(a b c d e).each do |key|
830 cmd, args = line.split(/\s+/, 2)
831 args = c.backend.url_decode(args.strip)
832 assert_equal "file_info", cmd
833 assert_equal key, args["key"]
835 out = { "length" => 3, "devcount" => 6 }
837 a.write "OK #{c.backend.url_encode(out)}\r\n"
839 a.write "OK #{c.backend.url_encode(out)}"
840 a.close # trigger EOF
842 a = sock.accept # client will retry
843 %w(b c d e).each do |key|
845 cmd, args = line.split(/\s+/, 2)
846 args = c.backend.url_decode(args.strip)
847 assert_equal "file_info", cmd
848 assert_equal key, args["key"]
850 a.write "OK #{c.backend.url_encode(out)}\r\n"
855 blk = lambda do |key, length, devcount|
856 received << [ key, length, devcount ]
858 c.list_keys_verbose(%w(a b c d e), blk)
860 received.map! { |(key,_,_)| key }
861 assert_equal %w(a b c d e), received
868 assert_equal({}, @client.sleep(2))
873 # tested with 1000, though it takes a while
875 ENV['NR_CHUNKS'] ? ENV['NR_CHUNKS'].to_i : 10