add mog-sync example script
[ruby-mogilefs-client.git] / examples / mog-sync.rb
blob2444fb10c5c1af3e4238be565b9f0bed40d6e3b1
1 #!/usr/bin/env ruby
2 usage = <<EOF
3 Usage: #$0 SRC_TRACKER_LIST/SRC_DOMAIN DST_TRACKER_LIST/DST_DOMAIN"
4 EOF
5 Thread.abort_on_exception = $stdout.sync = $stderr.sync = true
6 require 'uri'
7 require 'optparse'
8 require 'mogilefs'
9 require 'thread'
10 @verbose = 0
11 copy_jobs = 1
12 jobs = 1
13 keep = nil
14 @dryrun = false
15 opts = {}
16 prefix = ""
17 src_class = dst_class = nil
18 src_maxlen = nil
19 exit_ok = true
20 after = nil
21 clobber_nsum = false
23 ARGV.options do |x|
24   x.banner = usage.strip
25   x.separator ''
26   x.on('-j', '--metadata-jobs JOBS', Integer,
27        'Number of metadata jobs to run in parallel') { |n|
28     jobs = n
29   }
30   x.on('-J', '--copy-jobs JOBS', Integer,
31        'Number of copy jobs to run in parallel') { |n|
32     copy_jobs = n
33   }
35   x.on('-h', '--help', 'Show this help message.') { puts x; exit }
36   %w(get_file_data_timeout new_file_max_time fail_timeout timeout).each do |t|
37     x.on("--#{t.tr('_', '-')} SECONDS", Integer) { |n| opts[t.to_sym] = n }
38   end
39   x.on('-v', '--verbose') { @verbose += 1 }
40   x.on('-d', '--delete') do
41     begin
42       require 'gdbm'
43       require 'tempfile'
44       tmp = Tempfile.new(%w(mog-sync-keep .gdbm))
45       at_exit { tmp.close! }
46       keep = GDBM.new(tmp.path)
47     rescue LoadError
48       warn "gdbm extension recommended for --delete: #{e.message} (#{e.class})"
49       keep = {}
50     end
51   end
52   x.on('-n', '--dry-run') { @dryrun = opts[:readonly] = true }
53   x.on('-p', '--prefix STRING') { |s| prefix = s }
54   x.on('--src-class STRING') { |s| src_class = s }
55   x.on('--dst-class STRING') { |s| dst_class = s }
56   x.on('--after STRING') { |s| after = s }
57   x.on('--max-size STRING') { |s|
58     mult = 1
59     if s.sub!(/-1\z/, "")
60       off = -1
61     elsif s.sub!(/\+1\z/, "")
62       off = 1
63     else
64       off = 0
65     end
66     {
67       /(?:K|KiB)\z/i => 1024,
68       /(?:M|MiB)\z/i => 1024 ** 2,
69       /(?:G|GiB)\z/i => 1024 ** 3,
70       /KB\z/i => 1000,
71       /MB\z/i => 1000 ** 2,
72       /GB/i => 1000 ** 3,
73     }.each do |re, m|
74       if s.sub!(re, "")
75         mult = m
76         break
77       end
78     end
79     src_maxlen = (s.to_i * mult) + off
80   }
81   x.on('-F', '--clobber-missing-checksum') { clobber_nsum = true }
82   x.parse!
83 end
85 @verbose = 1 if @verbose == 0 && @dryrun
86 ARGV.size == 2 or abort "Usage: #{usage}"
87 src_spec, dst_spec = ARGV
88 src_opts = opts.merge(readonly: true)
90 def client_for(str, opts = {})
91   trackers, domain = str.split('/', 2)
92   opts[:hosts] = trackers.split(/,/)
93   opts[:domain] = domain
94   MogileFS::MogileFS.new(opts)
95 end
97 # atomic for pipes/O_APPEND:
98 def warn(m); $stderr.syswrite("#{m}\n"); end
99 def echo(m); $stdout.syswrite("#{m}\n"); end
101 def copy(job_id, reason, src, dst, src_info, dst_info, dst_class)
102   key = src_info["key"]
103   length = src_info["length"]
104   unless @dryrun
105     opts = {
106       largefile: true,
107       class: dst_class || src_info["class"],
108       content_length: length,
109     }
111     # FIXME: test/support non-MD5 checksums
112     if /\AMD5:([a-fA-F0-9]{32})\z/ =~ src_info["checksum"]
113       md5 = [ $1 ].pack("H*")
114       opts[:content_md5] = [ md5 ].pack('m0').chomp
115     end
116     if @verbose > 1
117       echo "new_file(#{key}, #{opts.inspect})"
118     end
119     dst.new_file(key, opts) do |dst_io|
120       src.get_file_data(key, dst_io)
121     end
122   end
123   if @verbose > 0
124     echo("#{reason} #{key}")
125     if @verbose > 1 && dst_info
126       echo "I[#{job_id}] before #{dst_info.inspect}"
127       echo "I[#{job_id}]  after #{src_info.inspect}"
128     end
129   end
130   Thread.current[:mog_sync_xfer] += length
131 rescue => e
132   warn "E[#{job_id}] #{e.message} (#{e.class}) (src=#{key})"
133   e.backtrace { |l| warn "E[#{job_id}] #{l}" }
136 copy_queue = SizedQueue.new(copy_jobs * 8)
137 copiers = copy_jobs.times.map do |i|
138   Thread.new(i) do |job_id|
139     Thread.current[:mog_sync_xfer] = 0
140     while copy_job = copy_queue.pop
141       copy(job_id, *copy_job)
142     end
143   end
146 queue = SizedQueue.new(jobs * 8)
147 consumers = jobs.times.map do |i|
148   Thread.new(i) do |job_id|
149     dst = client_for(dst_spec, opts)
150     src = client_for(src_spec, src_opts)
151     begin
152       key = queue.pop or break
153       src_info = src.file_info(key)
154       next if src_class && src_class != src_info["class"]
155       src_checksum = src_info["checksum"]
156       next if src_maxlen && src_info["length"] > src_maxlen
158       begin
159         # this may raise UnknownKeyError
160         dst_info = dst.file_info(key)
162         dst_checksum = dst_info["checksum"]
164         # maybe we need to wait for fsck to finish:
165         if dst_checksum == "MISSING"
166           warn "destination checksum broken #{dst_info.inspect} (skipped)"
167           next unless clobber_nsum
168         end
170         # tell user to fix source
171         if src_checksum == "MISSING"
172           warn "source checksum broken #{src_info.inspect} (skipped)"
173           exit_ok = false
174           next
175         end
177         next if dst_checksum == src_checksum
178         reason = "M"
179       rescue MogileFS::Backend::UnknownKeyError # new file
180         dst_info = nil
181         # tell user to fix source
182         if src_checksum == "MISSING"
183           warn "source checksum broken #{src_info.inspect} (copying)"
184           exit_ok = false
185         end
186         reason = "A"
187       end
188       copy_queue << [ reason, src, dst, src_info, dst_info, dst_class ]
189     rescue => e
190       warn "E[#{job_id}] #{e.message} (#{e.class}) (src=#{key})"
191       e.backtrace { |l| warn "E[#{job_id}] #{l}" }
192     end while true
193   end
196 # producer feeds consumers
197 begin
198   main_src = client_for(src_spec, src_opts)
199   main_src.each_key(prefix, after: after) do |key|
200     keep[key] = "1" if keep
201     queue << key
202   end
203 rescue => e
204   exit_ok = false
205   warn "Aborting due to source error: #{e.message} (#{e.class})"
206   e.backtrace { |l| warn "#{l}" }
209 # terminate producer threads
210 Thread.new { consumers.each { queue << nil } }
211 consumers.each { |t| t.join }
212 Thread.new { copiers.each { copy_queue << nil } }
213 copiers.each { |t| t.join }
214 bytes_sent = copiers.inject(0) { |sent,t| sent += t[:mog_sync_xfer] }
215 bytes_deleted = 0
217 # delete is single-threaded, it is not I/O-bound and
218 # we can pipeline in the future
219 if keep && exit_ok
220   queue = SizedQueue.new(8)
221   deleter = Thread.new do
222     dst = client_for(dst_spec, opts)
223     while key = queue.pop
224       begin
225         dst.delete(key) unless @dryrun
226         echo "D #{key}"
227       rescue MogileFS::Backend::UnknownKeyError
228         warn "#{key} disappeared before we could delete it"
229       end
230     end
231   end
232   main_dst = client_for(dst_spec, opts)
233   main_dst.each_file_info(prefix, after: after) do |info|
234     key = info["key"]
235     next if keep.include?(key)
236     queue << key
237     bytes_deleted += info["length"]
238   end
239   queue << nil # terminate
240   deleter.join
242 if @verbose
243   echo "wrote #{bytes_sent} bytes, removed #{bytes_deleted} bytes"
245 exit(exit_ok)