modified: spffq.py
[GalaxyCodeBases.git] / python / dedup / dedup.py
blobefd0c53684658a10dfa0817f566e033c83da6d91
1 #!/usr/bin/env python2.7
3 # Identify duplicate files to possibly replace with hard links to save space:
5 # find /Users/boris -type f | ./dedup.py --scan > boris.txt
6 # cat boris.txt | ./dedup.py --analyze
8 # The candidate list generated by those steps should be carefully reviewed
9 # for safety. For example, any files under version control or subject to
10 # modification must be left alone, even if they appear to be duplicates.
12 # When satisfied that only safe files are to be deduped,
14 # cat boris.txt | ./dedup.py --implement
16 # Note that in the "implement" stage, deduplication messages will be echoed
17 # in arbitrary order due to parallel execution.
20 import os
21 import sys
22 import threading
23 import subprocess
24 from collections import defaultdict
27 # scan params
28 XARGS_COUNT = 50
29 MAX_SUBPROCS = 8
31 # analysis params
32 MIN_FILE_SIZE = 256*1024
33 MIN_DEDUP = 2*1024*1024
36 ts_lock = threading.RLock()
39 def tsout(msg):
40 with ts_lock:
41 sys.stdout.write(str(msg))
42 sys.stdout.write("\n")
45 def tserr(msg, lock=threading.RLock()):
46 with ts_lock:
47 sys.stderr.write(str(msg))
48 sys.stderr.write("\n")
51 subprocs = threading.Semaphore(MAX_SUBPROCS)
54 SCAN_RESULTS = []
57 def process(chunk):
58 try:
59 md5_command = "/sbin/md5 -r".split() + chunk
60 md5_raw_results = subprocess.check_output(md5_command)
61 md5_results = []
62 for i, line in enumerate(md5_raw_results.split("\n")):
63 if not line:
64 continue
65 md5_hash, file_name = line.lstrip().split(None, 1)
66 orig_file_name = chunk[i]
67 if file_name != orig_file_name:
68 tserr("WARNING: Filename not preserved by md5: before '{}', after '{}'. Skipping.".format(orig_file_name, file_name))
69 continue
70 sr = os.stat(orig_file_name)
71 r = (md5_hash, sr.st_nlink, sr.st_size, orig_file_name)
72 md5_results.append(r)
73 tsout("{} {} {} {}".format(*r))
74 with ts_lock:
75 SCAN_RESULTS.extend(md5_results)
76 finally:
77 subprocs.release()
80 def enqueue_scan(chunk):
81 subprocs.acquire()
82 threading.Thread(target=process, args=[chunk]).start()
85 def scan():
86 tserr("Reading filenames from stdin.")
87 line_count = 0
88 chunk = []
89 for line in sys.stdin:
90 assert line
91 line_count += 1
92 line = line[:-1]
93 if len(chunk) == XARGS_COUNT:
94 enqueue_scan(chunk)
95 chunk = []
96 chunk.append(line)
97 enqueue_scan(chunk)
98 for i in xrange(MAX_SUBPROCS):
99 subprocs.acquire()
100 tserr("Scanned {} files.".format(len(SCAN_RESULTS)))
103 class MD5Hash(object):
105 def __init__(self):
106 self.md5_hash = None
107 self.files = []
108 self.current_size = 0.0
109 self.ideal_size = None
111 def add_file(self, md5_hash, n_link, size, file_name):
112 assert not self.md5_hash or self.md5_hash == md5_hash
113 self.md5_hash = md5_hash
114 assert not self.ideal_size or self.ideal_size == size
115 self.ideal_size = size
116 self.files.append((md5_hash, n_link, size, file_name))
117 self.current_size += (float(size) / n_link)
119 def dedup(self):
120 try:
121 src = self.files[0]
122 for dst in self.files[1:]:
123 src_filename = src[3]
124 dst_filename = dst[3]
125 try:
126 subprocess.check_output(["/usr/bin/cmp", src_filename, dst_filename])
127 except:
128 tserr("Skipping due to mismatch: '{}' '{}'".format(src_filename, dst_filename))
129 else:
130 subprocess.check_output(["/bin/ln", "-f", src_filename, dst_filename])
131 tsout("Deduplicated '{}' => '{}'".format(dst_filename, src_filename))
132 finally:
133 subprocs.release()
136 def enqueue_dedup(md5):
137 subprocs.acquire()
138 threading.Thread(target=md5.dedup).start()
141 def optimize(dry_run):
142 all_files = defaultdict(MD5Hash)
143 files = defaultdict(MD5Hash)
144 line_count = 0
145 for line in sys.stdin:
146 assert line
147 line_count += 1
148 line = line[:-1]
149 md5_hash, n_link, size, file_name = line.split(None, 3)
150 n_link = int(n_link)
151 size = int(size)
152 all_files[md5_hash].add_file(md5_hash, n_link, size, file_name)
153 if size < MIN_FILE_SIZE:
154 continue
155 files[md5_hash].add_file(md5_hash, n_link, size, file_name)
156 mb = 1024*1024.0
157 operated_upon = []
158 for md5 in sorted(files.itervalues(), key=lambda f: f.current_size - f.ideal_size):
159 if md5.current_size - md5.ideal_size < MIN_DEDUP:
160 continue
161 operated_upon.append(md5)
162 tsout("")
163 tsout("Save {:.1f} MB by deduping\n{}".format((md5.current_size - md5.ideal_size) / mb, "".join(["\n " + f[3] for f in md5.files])))
164 tsout("")
165 if not dry_run:
166 enqueue_dedup(md5)
167 if not dry_run:
168 for i in xrange(MAX_SUBPROCS):
169 subprocs.acquire()
170 ideal = sum(f.ideal_size for f in operated_upon) / mb
171 current = sum(f.current_size for f in operated_upon) / mb
172 total = sum(f.current_size for f in all_files.itervalues()) / mb
173 eligible_count = sum(len(f.files) for f in operated_upon)
174 tsout("Parameters: MIN_FILE_SIZE={:3.1f} MB, MIN_DEDUP={:3.1f} MB.".format(MIN_FILE_SIZE/mb, MIN_DEDUP/mb))
175 tsout("{} files eligible for deduplication = {:3.1f}% of {} total files.".format(eligible_count, 100.0*eligible_count/line_count, line_count))
176 tsout("Space savings {:.0f} MB = {:3.1f}% of {:.0f} MB total space.".format(current - ideal, 100.0*(current - ideal)/max(total,0.00001), total, MIN_DEDUP/mb))
179 if __name__ == "__main__":
180 if len(sys.argv) >= 2 and sys.argv[1].strip('-').lower() == "scan":
181 scan()
182 elif len(sys.argv) >= 2 and sys.argv[1].strip('-').lower() in ("analyze", "analyse", "analysis"):
183 optimize(dry_run=True)
184 elif len(sys.argv) >= 2 and sys.argv[1].strip('-').lower() in ("optimize", "optimise", "implement", "implementation"):
185 optimize(dry_run=False)
186 else:
187 tserr("Unsupported command line.")
188 sys.exit(-1)
189 #tsprint(repr(RESULTS))