3 Usage: #$0 SRC_TRACKER_LIST/SRC_DOMAIN DST_TRACKER_LIST/DST_DOMAIN"
5 Thread.abort_on_exception = $stdout.sync = $stderr.sync = true
17 src_class = dst_class = nil
24 x.banner = usage.strip
26 x.on('-j', '--metadata-jobs JOBS', Integer,
27 'Number of metadata jobs to run in parallel') { |n|
30 x.on('-J', '--copy-jobs JOBS', Integer,
31 'Number of copy jobs to run in parallel') { |n|
35 x.on('-h', '--help', 'Show this help message.') { puts x; exit }
36 %w(get_file_data_timeout new_file_max_time fail_timeout timeout).each do |t|
37 x.on("--#{t.tr('_', '-')} SECONDS", Integer) { |n| opts[t.to_sym] = n }
39 x.on('-v', '--verbose') { @verbose += 1 }
40 x.on('-d', '--delete') do
44 tmp = Tempfile.new(%w(mog-sync-keep .gdbm))
45 at_exit { tmp.close! }
46 keep = GDBM.new(tmp.path)
48 warn "gdbm extension recommended for --delete: #{e.message} (#{e.class})"
52 x.on('-n', '--dry-run') { @dryrun = opts[:readonly] = true }
53 x.on('-p', '--prefix STRING') { |s| prefix = s }
54 x.on('--src-class STRING') { |s| src_class = s }
55 x.on('--dst-class STRING') { |s| dst_class = s }
56 x.on('--after STRING') { |s| after = s }
57 x.on('--max-size STRING') { |s|
61 elsif s.sub!(/\+1\z/, "")
67 /(?:K|KiB)\z/i => 1024,
68 /(?:M|MiB)\z/i => 1024 ** 2,
69 /(?:G|GiB)\z/i => 1024 ** 3,
79 src_maxlen = (s.to_i * mult) + off
81 x.on('-F', '--clobber-missing-checksum') { clobber_nsum = true }
85 @verbose = 1 if @verbose == 0 && @dryrun
86 ARGV.size == 2 or abort "Usage: #{usage}"
87 src_spec, dst_spec = ARGV
88 src_opts = opts.merge(readonly: true)
90 def client_for(str, opts = {})
91 trackers, domain = str.split('/', 2)
92 opts[:hosts] = trackers.split(/,/)
93 opts[:domain] = domain
94 MogileFS::MogileFS.new(opts)
97 # atomic for pipes/O_APPEND:
98 def warn(m); $stderr.syswrite("#{m}\n"); end
99 def echo(m); $stdout.syswrite("#{m}\n"); end
101 def copy(job_id, reason, src, dst, src_info, dst_info, dst_class)
102 key = src_info["key"]
103 length = src_info["length"]
107 class: dst_class || src_info["class"],
108 content_length: length,
111 # FIXME: test/support non-MD5 checksums
112 if /\AMD5:([a-fA-F0-9]{32})\z/ =~ src_info["checksum"]
113 md5 = [ $1 ].pack("H*")
114 opts[:content_md5] = [ md5 ].pack('m0').chomp
117 echo "new_file(#{key}, #{opts.inspect})"
119 dst.new_file(key, opts) do |dst_io|
120 src.get_file_data(key, dst_io)
124 echo("#{reason} #{key}")
125 if @verbose > 1 && dst_info
126 echo "I[#{job_id}] before #{dst_info.inspect}"
127 echo "I[#{job_id}] after #{src_info.inspect}"
130 Thread.current[:mog_sync_xfer] += length
132 warn "E[#{job_id}] #{e.message} (#{e.class}) (src=#{key})"
133 e.backtrace { |l| warn "E[#{job_id}] #{l}" }
136 copy_queue = SizedQueue.new(copy_jobs * 8)
137 copiers = copy_jobs.times.map do |i|
138 Thread.new(i) do |job_id|
139 Thread.current[:mog_sync_xfer] = 0
140 while copy_job = copy_queue.pop
141 copy(job_id, *copy_job)
146 queue = SizedQueue.new(jobs * 8)
147 consumers = jobs.times.map do |i|
148 Thread.new(i) do |job_id|
149 dst = client_for(dst_spec, opts)
150 src = client_for(src_spec, src_opts)
152 key = queue.pop or break
153 src_info = src.file_info(key)
154 next if src_class && src_class != src_info["class"]
155 src_checksum = src_info["checksum"]
156 next if src_maxlen && src_info["length"] > src_maxlen
159 # this may raise UnknownKeyError
160 dst_info = dst.file_info(key)
162 dst_checksum = dst_info["checksum"]
164 # maybe we need to wait for fsck to finish:
165 if dst_checksum == "MISSING"
166 warn "destination checksum broken #{dst_info.inspect} (skipped)"
167 next unless clobber_nsum
170 # tell user to fix source
171 if src_checksum == "MISSING"
172 warn "source checksum broken #{src_info.inspect} (skipped)"
177 next if dst_checksum == src_checksum
179 rescue MogileFS::Backend::UnknownKeyError # new file
181 # tell user to fix source
182 if src_checksum == "MISSING"
183 warn "source checksum broken #{src_info.inspect} (copying)"
188 copy_queue << [ reason, src, dst, src_info, dst_info, dst_class ]
190 warn "E[#{job_id}] #{e.message} (#{e.class}) (src=#{key})"
191 e.backtrace { |l| warn "E[#{job_id}] #{l}" }
196 # producer feeds consumers
198 main_src = client_for(src_spec, src_opts)
199 main_src.each_key(prefix, after: after) do |key|
200 keep[key] = "1" if keep
205 warn "Aborting due to source error: #{e.message} (#{e.class})"
206 e.backtrace { |l| warn "#{l}" }
209 # terminate producer threads
210 Thread.new { consumers.each { queue << nil } }
211 consumers.each { |t| t.join }
212 Thread.new { copiers.each { copy_queue << nil } }
213 copiers.each { |t| t.join }
214 bytes_sent = copiers.inject(0) { |sent,t| sent += t[:mog_sync_xfer] }
217 # delete is single-threaded, it is not I/O-bound and
218 # we can pipeline in the future
220 queue = SizedQueue.new(8)
221 deleter = Thread.new do
222 dst = client_for(dst_spec, opts)
223 while key = queue.pop
225 dst.delete(key) unless @dryrun
227 rescue MogileFS::Backend::UnknownKeyError
228 warn "#{key} disappeared before we could delete it"
232 main_dst = client_for(dst_spec, opts)
233 main_dst.each_file_info(prefix, after: after) do |info|
235 next if keep.include?(key)
237 bytes_deleted += info["length"]
239 queue << nil # terminate
243 echo "wrote #{bytes_sent} bytes, removed #{bytes_deleted} bytes"