http_file: wait for the destination server to respond
[ruby-mogilefs-client.git] / examples / mogstored_rack.rb
blobcaa779ae2dc782aba365861e27c0ca368acf8026
1 # -*- encoding: binary -*-
2 require 'tempfile'
3 require 'digest/md5'
4 require 'rack'
6 # Rack application for handling HTTP PUT/DELETE/MKCOL operations needed
7 # for a MogileFS storage server.  GET requests are handled by
8 # Rack::File and Rack::Head _must_ be in the middleware stack for
9 # mogilefsd fsck to work properly with keepalive.
11 # Usage in rackup config file (config.ru):
13 #    require "./mogstored_rack"
14 #    use Rack::Head
15 #    run MogstoredRack.new("/var/mogdata")
16 class MogstoredRack
17   class ContentMD5 < Digest::MD5
18     def content_md5
19       [ digest ].pack("m").strip!
20     end
21   end
23   def initialize(root, opts = {})
24     @root = File.expand_path(root)
25     @rack_file = (opts[:app] || Rack::File.new(@root))
26     @fsync = !! opts[:fsync]
27     @creat_perms = opts[:creat_perms] || (~File.umask & 0666)
28     @mkdir_perms = opts[:mkdir_perms] || (~File.umask & 0777)
29     @reread_verify = !! opts[:reread_verify]
30   end
32   def call(env)
33     case env["REQUEST_METHOD"]
34     when "GET", "HEAD"
35       case env["PATH_INFO"]
36       when "/"
37         r(200, "") # MogileFS seems to need this...
38       else
39         @rack_file.call(env)
40       end
41     when "PUT"
42       put(env)
43     when "DELETE"
44       delete(env)
45     when "MKCOL"
46       mkcol(env)
47     else
48       r(405, "unsupported method", env)
49     end
50     rescue Errno::EPERM, Errno::EACCES => err
51       r(403, "#{err.message} (#{err.class})", env)
52     rescue => err
53       r(500, "#{err.message} (#{err.class})", env)
54   end
56   def mkcol(env)
57     path = server_path(env) or return r(400)
58     Dir.mkdir(path, @mkdir_perms)
59     r(204)
60     rescue Errno::EEXIST # succeed (204) on race condition
61       File.directory?(path) ? r(204) : r(409)
62   end
64   def delete(env)
65     path = server_path(env) or return r(400)
66     File.exist?(path) or return r(404)
67     File.directory?(path) ? Dir.rmdir(path) : File.unlink(path)
68     r(204)
69     rescue Errno::ENOENT # return 404 on race condition
70       File.exist?(path) ? r(500) : r(404)
71   end
73   def put(env)
74     path = server_path(env) or return r(400)
75     dir = File.dirname(path)
76     File.directory?(dir) or return r(403)
78     Tempfile.open([File.basename(path), ".tmp"], dir) do |tmp|
79       tmp = tmp.to_io # delegated method calls are slower
80       tmp.sync = true
81       tmp.binmode
82       buf = ""
83       received = put_loop(env["rack.input"], tmp, buf)
84       err = content_md5_fail?(env, received) and return err
85       if @reread_verify && err = reread_md5_fail?(env, tmp, received, buf)
86         return err
87       end
88       tmp.chmod(@creat_perms)
89       begin
90         File.link(tmp.path, path)
91       rescue Errno::EEXIST
92         err = rename_overwrite_fail?(tmp.path, path) and return err
93       end
94       fsync(dir, tmp) if @fsync
95       resp = r(201)
96       resp[1]["X-Received-Content-MD5"] = received
97       return resp
98     end
99   end
101   def put_loop(src, dst, buf)
102     md5 = ContentMD5.new
103     while src.read(0x4000, buf)
104       md5.update(buf)
105       dst.write(buf)
106     end
107     md5.content_md5
108   end
110   def server_path(env)
111     path = env['PATH_INFO'].squeeze('/')
112     path.split(%r{/}).include?("..") and return false
113     "#@root#{path}"
114   end
116   # returns a plain-text HTTP response
117   def r(code, msg = nil, env = nil)
118     if env && logger = env["rack.logger"]
119       logger.warn("#{env['REQUEST_METHOD']} #{env['PATH_INFO']} " \
120                   "#{code} #{msg.inspect}")
121     end
122     if Rack::Utils::STATUS_WITH_NO_ENTITY_BODY.include?(code)
123       [ code, {}, [] ]
124     else
125       msg ||= Rack::Utils::HTTP_STATUS_CODES[code] || ""
126       msg += "\n" if msg.size > 0
127       [ code,
128         { 'Content-Type' => 'text/plain', 'Content-Length' => msg.size.to_s },
129         [ msg ] ]
130     end
131   end
133   # Tries to detect filesystem/disk corruption.
134   # Unfortunately, posix_fadvise(2)/IO#advise is only advisory and
135   # can't guarantee we're not just reading the data in the kernel
136   # page cache.
137   def reread_md5_fail?(env, tmp, received, buf)
138     # try to force a reread from the storage device, not cache
139     tmp.fsync
140     tmp.rewind
141     tmp.advise(:dontneed) rescue nil # only in Ruby 1.9.3 and only advisory
143     md5 = ContentMD5.new
144     while tmp.read(0x4000, buf)
145       md5.update(buf)
146     end
147     reread = md5.content_md5
148     reread == received and return false # success
149     r(500, "reread MD5 mismatch\n" \
150            "received: #{received}\n" \
151            "  reread: #{reread}", env)
152   end
154   # Tries to detect network corruption by verifying the client-supplied
155   # Content-MD5 is correct.  It's highly unlikely the MD5 can be corrupted
156   # in a way that also allows corrupt data to pass through.
157   #
158   # The Rainbows!/Unicorn HTTP servers will populate the HTTP_CONTENT_MD5
159   # field in +env+ after env["rack.input"] is fully-consumed.  Clients
160   # may also send Content-MD5 as a header and this will still work.
161   def content_md5_fail?(env, received)
162     expected = env["HTTP_CONTENT_MD5"] or return false
163     expected = expected.strip
164     expected == received and return false # success
165     r(400, "Content-MD5 mismatch\n" \
166            "expected: #{expected}\n" \
167            "received: #{received}", env)
168   end
170   def rename_overwrite_fail?(src, dst)
171     10.times do
172       begin
173         tmp_dst = "#{dst}.#{rand}"
174         File.link(src, tmp_dst)
175       rescue Errno::EEXIST
176         next
177       end
178       File.rename(tmp_dst, dst)
179       return false # success!
180     end
181     r(409)
182   end
184   # fsync each and every directory component above us on the same device
185   def fsync(dir, tmp)
186     tmp.fsync
187     File.open(dir) { |io| io.fsync }
188   end