1 # -*- encoding: binary -*-
7 class TestMogileFS__MogileFS < TestMogileFS
9 @klass = MogileFS::MogileFS
14 assert_equal 'test', @client.domain
16 assert_raises ArgumentError do
17 MogileFS::MogileFS.new :hosts => ['kaa:6001']
21 def test_get_file_data_http
22 tmp = Tempfile.new('accept')
23 accept = File.open(tmp.path, "ab")
24 svr = Proc.new do |serv, port|
25 client, _ = serv.accept
27 readed = client.recv(4096, 0)
29 %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
31 client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
34 t1 = TempServer.new(svr)
35 t2 = TempServer.new(svr)
36 path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
37 path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
39 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
41 assert_equal 'data!', @client.get_file_data('key')
42 assert_equal 1, accept.stat.size
44 TempServer.destroy_all!
47 def test_get_file_data_http_not_found_failover
48 tmp = Tempfile.new('accept')
49 accept = File.open(tmp.path, 'ab')
50 svr1 = Proc.new do |serv, port|
51 client, _ = serv.accept
53 readed = client.recv(4096, 0)
55 %r{\AGET /dev1/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
57 client.send("HTTP/1.0 404 Not Found\r\n\r\ndata!", 0)
61 svr2 = Proc.new do |serv, port|
62 client, _ = serv.accept
64 readed = client.recv(4096, 0)
66 %r{\AGET /dev2/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
68 client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
72 t1 = TempServer.new(svr1)
73 t2 = TempServer.new(svr2)
74 path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
75 path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
76 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
78 assert_equal 'data!', @client.get_file_data('key')
79 assert_equal 2, accept.stat.size
81 TempServer.destroy_all!
84 def test_get_file_data_http_block
85 tmpfp = Tempfile.new('test_mogilefs.open_data')
87 chunk_size = 1024 * 1024
88 expect_size = nr * chunk_size
89 header = "HTTP/1.0 200 OK\r\n" \
90 "Content-Length: #{expect_size}\r\n\r\n"
91 assert_equal header.size, tmpfp.syswrite(header)
92 nr.times { assert_equal chunk_size, tmpfp.syswrite(' ' * chunk_size) }
93 assert_equal expect_size + header.size, File.size(tmpfp.path)
96 accept = Tempfile.new('accept')
97 svr = Proc.new do |serv, port|
98 client, _ = serv.accept
101 readed = client.recv(4096, 0)
103 %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
104 MogileFS.io.copy_stream(tmpfp, client)
108 t1 = TempServer.new(svr)
109 t2 = TempServer.new(svr)
110 path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
111 path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
113 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
115 data = Tempfile.new('test_mogilefs.dest_data')
117 @client.get_file_data('key') do |fp|
121 fp.sysread(16384, buf)
124 assert_equal read_nr, data.syswrite(buf), "partial write"
132 assert_equal expect_size, nr, "size mismatch"
133 assert_equal 1, accept.stat.size
137 path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
138 path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
140 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
142 expected = [ path1, path2 ]
144 assert_equal expected, @client.get_paths('key').sort
148 path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
149 path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
151 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
153 expected = [ URI.parse(path1), URI.parse(path2) ]
155 assert_equal expected, @client.get_uris('key')
159 def test_get_paths_unknown_key
160 @backend.get_paths = ['unknown_key', '']
162 assert_raises MogileFS::Backend::UnknownKeyError do
163 assert_equal nil, @client.get_paths('key')
167 def test_delete_existing
168 @backend.delete = { }
169 @client.delete 'no_such_key'
172 def test_delete_nonexisting
173 @backend.delete = 'unknown_key', ''
174 assert_raises MogileFS::Backend::UnknownKeyError do
175 @client.delete('no_such_key')
179 def test_delete_readonly
180 @client.readonly = true
181 assert_raises MogileFS::ReadOnlyError do
182 @client.delete 'no_such_key'
187 @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_2',
188 'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
189 @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_4',
190 'key_1' => 'new_key_3', 'key_2' => 'new_key_4' }
191 @backend.list_keys = { 'key_count' => 0, 'next_after' => 'new_key_4' }
193 @client.each_key 'new' do |key|
197 assert_equal %w[new_key_1 new_key_2 new_key_3 new_key_4], keys
201 @backend.list_keys = { 'key_count' => '2', 'next_after' => 'new_key_2',
202 'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
204 keys, next_after = @client.list_keys 'new'
205 assert_equal ['new_key_1', 'new_key_2'], keys.sort
206 assert_equal 'new_key_2', next_after
209 def test_new_file_http
210 @client.readonly = true
211 assert_raises MogileFS::ReadOnlyError do
212 @client.new_file 'new_key', 'test'
216 def test_new_file_readonly
217 @client.readonly = true
218 assert_raises MogileFS::ReadOnlyError do
219 @client.new_file 'new_key', 'test'
223 def test_store_file_small_http
224 received = Tempfile.new('received')
225 to_store = Tempfile.new('small')
226 to_store.syswrite('data')
228 t = TempServer.new(Proc.new do |serv, accept|
229 client, _ = serv.accept
230 while buf = client.readpartial(666)
231 received.syswrite(buf)
232 break if buf =~ /data/
234 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
238 @backend.create_open = {
240 'path' => "http://127.0.0.1:#{t.port}/path",
242 nr = @client.store_file 'new_key', 'test', to_store.path
246 a = received.sysread(999999).split(/\r\n/)
247 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
248 assert_equal("data", a[-1])
249 assert_equal("", a[-2])
250 assert a.grep(%r{\AContent-Length: 4\z})[0]
252 TempServer.destroy_all!
255 def test_store_content_http
256 received = Tempfile.new('received')
258 t = TempServer.new(Proc.new do |serv, accept|
259 client, _ = serv.accept
262 while seen !~ /\r\n\r\ndata/
263 buf = client.readpartial(4096)
265 received.syswrite(buf)
267 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
271 @backend.create_open = {
273 'path' => "http://127.0.0.1:#{t.port}/path",
276 nr = @client.store_content 'new_key', 'test', 'data'
281 a = received.sysread(999999).split(/\r\n/)
282 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
283 assert_equal("data", a[-1])
284 assert_equal("", a[-2])
285 assert a.grep(%r{\AContent-Length: 4\z})[0]
287 TempServer.destroy_all!
291 def test_store_content_with_writer_callback
292 received = Tempfile.new('received')
293 expected = "PUT /path HTTP/1.0\r\nContent-Length: 40\r\n\r\n"
297 t = TempServer.new(Proc.new do |serv, accept|
298 client, _ = serv.accept
303 buf = client.readpartial(8192) or break
304 break if buf.length == 0
305 assert_equal buf.length, received.syswrite(buf)
308 break if seen =~ /\r\n\r\n(?:data){10}/
310 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
314 @backend.create_open = {
316 'path' => "http://127.0.0.1:#{t.port}/path",
319 cbk = MogileFS::Util::StoreContent.new(40) do |write_callback|
321 write_callback.call("data")
324 assert_equal 40, cbk.length
325 nr = @client.store_content('new_key', 'test', cbk)
329 a = received.sysread(999999).split(/\r\n/)
330 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
331 assert_equal("data" * 10, a[-1])
332 assert_equal("", a[-2])
333 assert a.grep(%r{\AContent-Length: 40\z})[0]
335 TempServer.destroy_all!
338 def test_store_content_multi_dest_failover_path
339 test_store_content_multi_dest_failover(true)
342 def test_store_content_multi_dest_failover(big_io = false)
343 received1 = Tempfile.new('received')
344 received2 = Tempfile.new('received')
346 t1 = TempServer.new(Proc.new do |serv, accept|
347 client, _ = serv.accept
349 while seen !~ /\r\n\r\ndata/
350 buf = client.readpartial(4096)
352 received1.syswrite(buf)
354 client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
358 t2 = TempServer.new(Proc.new do |serv, accept|
359 client, _ = serv.accept
361 while seen !~ /\r\n\r\ndata/
362 buf = client.readpartial(4096)
364 received2.syswrite(buf)
366 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
370 @backend.create_open = {
373 'path_1' => "http://127.0.0.1:#{t1.port}/path",
375 'path_2' => "http://127.0.0.1:#{t2.port}/path",
379 tmp = Tempfile.new('data')
382 nr = @client.store_file('new_key', 'test', tmp.path)
385 nr = @client.store_content 'new_key', 'test', 'data'
390 a = received1.sysread(4096).split(/\r\n/)
391 b = received2.sysread(4096).split(/\r\n/)
392 assert_equal a[0], b[0]
393 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
394 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, b[0])
395 assert_equal("data", a[-1])
396 assert_equal("data", b[-1])
397 assert_equal("", a[-2])
398 assert_equal("", b[-2])
399 assert a.grep(%r{\AContent-Length: 4\z})[0]
400 assert b.grep(%r{\AContent-Length: 4\z})[0]
402 TempServer.destroy_all!
405 def test_store_content_http_fail
406 t = TempServer.new(Proc.new do |serv, accept|
407 client, _ = serv.accept
410 client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
414 @backend.create_open = {
416 'path' => "http://127.0.0.1:#{t.port}/path",
419 assert_raises MogileFS::HTTPFile::NoStorageNodesError do
420 @client.store_content 'new_key', 'test', 'data'
424 def test_store_content_http_empty
425 received = Tempfile.new('received')
426 t = TempServer.new(Proc.new do |serv, accept|
427 client, _ = serv.accept
429 received.syswrite(client.recv(4096, 0))
430 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
434 @backend.create_open = {
436 'path' => "http://127.0.0.1:#{t.port}/path",
439 nr = @client.store_content 'new_key', 'test', ''
442 a = received.sysread(4096).split(/\r\n/)
443 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
444 assert a.grep(%r{\AContent-Length: 0\z})[0]
447 def test_store_content_nfs
448 @backend.create_open = {
453 assert_raises MogileFS::UnsupportedPathError do
454 @client.store_content 'new_key', 'test', 'data'
458 def test_new_file_http_large
459 expect = Tempfile.new('test_mogilefs.expect')
460 to_put = Tempfile.new('test_mogilefs.to_put')
461 received = Tempfile.new('test_mogilefs.received')
464 chunk_size = 1024 * 1024
465 expect_size = nr * chunk_size
467 header = "PUT /path HTTP/1.0\r\n" \
468 "Content-Length: #{expect_size}\r\n\r\n"
469 assert_equal header.size, expect.syswrite(header)
471 assert_equal chunk_size, expect.syswrite(' ' * chunk_size)
472 assert_equal chunk_size, to_put.syswrite(' ' * chunk_size)
474 assert_equal expect_size + header.size, expect.stat.size
475 assert_equal expect_size, to_put.stat.size
477 readed = Tempfile.new('readed')
478 t = TempServer.new(Proc.new do |serv, accept|
479 client, _ = serv.accept
483 buf = client.readpartial(8192) or break
484 break if buf.length == 0
485 assert_equal buf.length, received.syswrite(buf)
487 break if nr >= expect.stat.size
489 readed.syswrite("#{nr}")
490 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
494 @backend.create_open = {
496 'path' => "http://127.0.0.1:#{t.port}/path",
499 orig_size = to_put.size
500 nr = @client.store_file('new_key', 'test', to_put.path)
501 assert nr, nr.inspect
502 assert_equal orig_size, nr
503 assert_equal orig_size, to_put.size
505 assert_equal expect.stat.size, readed.sysread(4096).to_i
507 ENV['PATH'].split(/:/).each do |path|
508 cmp_bin = "#{path}/cmp"
509 File.executable?(cmp_bin) or next
510 # puts "running #{cmp_bin} #{expect.path} #{received.path}"
511 assert( system(cmp_bin, expect.path, received.path) )
516 TempServer.destroy_all!
519 def test_store_content_readonly
520 @client.readonly = true
522 assert_raises MogileFS::ReadOnlyError do
523 @client.store_content 'new_key', 'test', nil
527 def test_store_file_readonly
528 @client.readonly = true
529 assert_raises MogileFS::ReadOnlyError do
530 @client.store_file 'new_key', 'test', nil
534 def test_rename_existing
537 assert_nil @client.rename('from_key', 'to_key')
540 def test_rename_nonexisting
541 @backend.rename = 'unknown_key', ''
543 assert_raises MogileFS::Backend::UnknownKeyError do
544 @client.rename('from_key', 'to_key')
548 def test_rename_no_key
549 @backend.rename = 'no_key', 'no_key'
551 e = assert_raises MogileFS::Backend::NoKeyError do
552 @client.rename 'new_key', 'test'
555 assert_equal 'no_key', e.message
558 def test_rename_readonly
559 @client.readonly = true
561 e = assert_raises MogileFS::ReadOnlyError do
562 @client.rename 'new_key', 'test'
565 assert_equal 'readonly mogilefs', e.message
568 def assert_get_paths_args(expect, *args)
569 sock = TCPServer.new("127.0.0.1", 0)
570 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
571 c = MogileFS::MogileFS.new(nargs)
577 a.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
580 paths_expect = %w(http://0/a http://0/b)
581 assert_equal paths_expect, c.get_paths("f", *args)
583 assert_equal 1, received.size
584 tmp = c.backend.url_decode(received[0].split(/\s+/)[1])
585 assert_equal "f", tmp.delete("key")
586 assert_equal "foo", tmp.delete("domain")
587 assert_equal expect, tmp
593 def test_get_paths_args
594 assert_get_paths_args({"noverify"=>"1", "zone"=>""})
595 assert_get_paths_args({"noverify"=>"0", "zone"=>""}, false)
596 assert_get_paths_args({"noverify"=>"0", "zone"=>""}, :noverify=>false)
597 assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"}, true, "alt")
598 assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"},
599 {:noverify => true, :zone => "alt"})
600 assert_get_paths_args({"noverify"=>"1", "zone"=>"alt","pathcount"=>"666"},
601 {:noverify => true, :zone => "alt", :pathcount=>666})
604 def test_idempotent_command_eof
606 a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
607 hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
608 args = { :hosts => hosts, :domain => "foo" }
609 c = MogileFS::MogileFS.new(args)
612 r = IO.select([a, b])
617 r = IO.select([a, b])
620 x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
623 expect = %w(http://0/a http://0/b)
624 assert_equal expect, c.get_paths("f")
626 assert_equal 2, received.size
627 assert_equal received[0], received[1]
630 def test_idempotent_command_slow
632 a = TCPServer.new(ip, 0)
633 hosts = [ "#{ip}:#{a.addr[1]}" ]
636 args = { :hosts => hosts, :domain => "foo", :timeout => timeout }
637 c = MogileFS::MogileFS.new(args)
644 %r{key=(\w+)} =~ line
646 sleep(secs) # cause the client to timeout:
649 x.write("OK paths=1&path1=http://0/#{$1}\r\n")
651 # EPIPE may or may not get raised due to timing issue,
652 # we don't care either way
654 flunk("#{e.message} (#{e.class})")
658 # client should start a new connection here
662 %r{key=(\w+)} =~ line
664 y.write("OK paths=1&path1=http://0/#{$1}\r\n")
666 flunk("#{e.message} (#{e.class})")
669 # the client should've killed the old connection:
670 assert_raises(Errno::EPIPE) do
671 loop { x.write("OK paths=1&path1=http://0/#{$1}\r\n") }
674 close_later # main thread closes
676 assert_raises(MogileFS::UnreadableSocketError) do
679 assert_equal :continue_test, q.pop, "avoid race during test"
680 expect2 = %w(http://0/b)
681 assert_equal expect2, c.get_paths("b")
683 close_later = th.value
684 close_later.each { |io| assert_nil io.close }
687 def test_idempotent_command_response_truncated
689 a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
690 hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
691 args = { :hosts => hosts, :domain => "foo" }
692 c = MogileFS::MogileFS.new(args)
695 r = IO.select([a, b])
698 x.write("OK paths=2&path1=http://0/a&path2=http://0/")
701 r = IO.select([a, b])
704 x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
707 expect = %w(http://0/a http://0/b)
708 assert_equal expect, c.get_paths("f")
710 assert_equal 2, received.size
711 assert_equal received[0], received[1]
714 def test_non_idempotent_command_eof
716 a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
717 hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
718 args = { :hosts => hosts, :domain => "foo" }
719 c = MogileFS::MogileFS.new(args)
722 r = IO.select([a, b])
727 assert_raises(EOFError) { c.rename("a", "b") }
729 assert_equal 1, received.size
732 def test_list_keys_verbose_ordering # implementation detail
734 sock = TCPServer.new("127.0.0.1", 0)
735 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
736 c = MogileFS::MogileFS.new(nargs)
739 %w(a b c d e).each do |key|
741 cmd, args = line.split(/\s+/, 2)
742 args = c.backend.url_decode(args.strip)
743 assert_equal "file_info", cmd
744 assert_equal key, args["key"]
746 out = { "length" => 3, "devcount" => 6 }
747 %w(a b c d e).shuffle.each do |key|
749 a.write "OK #{c.backend.url_encode(out)}\r\n"
754 blk = lambda do |key, length, devcount|
755 received << [ key, length, devcount ]
757 c.list_keys_verbose(%w(a b c d e), blk)
759 received.map! { |(key,_,_)| key }
760 assert_equal %w(a b c d e), received
765 def test_list_keys_verbose_retry_eof # implementation detail
767 sock = TCPServer.new("127.0.0.1", 0)
768 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
769 c = MogileFS::MogileFS.new(nargs)
772 %w(a b c d e).each do |key|
774 cmd, args = line.split(/\s+/, 2)
775 args = c.backend.url_decode(args.strip)
776 assert_equal "file_info", cmd
777 assert_equal key, args["key"]
779 out = { "length" => 3, "devcount" => 6 }
780 %w(d e).each do |key|
782 a.write "OK #{c.backend.url_encode(out)}\r\n"
784 a.close # trigger EOF
785 a = sock.accept # client will retry
786 %w(a b c).each do |key|
788 cmd, args = line.split(/\s+/, 2)
789 args = c.backend.url_decode(args.strip)
790 assert_equal "file_info", cmd
791 assert_equal key, args["key"]
793 a.write "OK #{c.backend.url_encode(out)}\r\n"
798 blk = lambda do |key, length, devcount|
799 received << [ key, length, devcount ]
801 c.list_keys_verbose(%w(a b c d e), blk)
803 received.map! { |(key,_,_)| key }
804 assert_equal %w(a b c d e), received
809 def test_list_keys_verbose_retry_truncated # implementation detail
811 sock = TCPServer.new("127.0.0.1", 0)
812 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
813 c = MogileFS::MogileFS.new(nargs)
816 %w(a b c d e).each do |key|
818 cmd, args = line.split(/\s+/, 2)
819 args = c.backend.url_decode(args.strip)
820 assert_equal "file_info", cmd
821 assert_equal key, args["key"]
823 out = { "length" => 3, "devcount" => 6 }
825 a.write "OK #{c.backend.url_encode(out)}\r\n"
827 a.write "OK #{c.backend.url_encode(out)}"
828 a.close # trigger EOF
830 a = sock.accept # client will retry
831 %w(b c d e).each do |key|
833 cmd, args = line.split(/\s+/, 2)
834 args = c.backend.url_decode(args.strip)
835 assert_equal "file_info", cmd
836 assert_equal key, args["key"]
838 a.write "OK #{c.backend.url_encode(out)}\r\n"
843 blk = lambda do |key, length, devcount|
844 received << [ key, length, devcount ]
846 c.list_keys_verbose(%w(a b c d e), blk)
848 received.map! { |(key,_,_)| key }
849 assert_equal %w(a b c d e), received
856 assert_equal({}, @client.sleep(2))
861 # tested with 1000, though it takes a while
863 ENV['NR_CHUNKS'] ? ENV['NR_CHUNKS'].to_i : 10