third_party: Update waf to verison 2.0.23
[Samba.git] / third_party / waf / waflib / extras / wafcache.py
blob2cef46c0e1c4a573c965bd41c79c3429fbfc17ba
1 #! /usr/bin/env python
2 # encoding: utf-8
3 # Thomas Nagy, 2019 (ita)
5 """
6 Filesystem-based cache system to share and re-use build artifacts
8 Cache access operations (copy to and from) are delegated to
9 independent pre-forked worker subprocesses.
11 The following environment variables may be set:
12 * WAFCACHE: several possibilities:
13 - File cache:
14 absolute path of the waf cache (~/.cache/wafcache_user,
15 where `user` represents the currently logged-in user)
16 - URL to a cache server, for example:
17 export WAFCACHE=http://localhost:8080/files/
18 in that case, GET/POST requests are made to urls of the form
19 http://localhost:8080/files/000000000/0 (cache management is delegated to the server)
20 - GCS, S3 or MINIO bucket
21 gs://my-bucket/ (uses gsutil command line tool or WAFCACHE_CMD)
22 s3://my-bucket/ (uses aws command line tool or WAFCACHE_CMD)
23 minio://my-bucket/ (uses mc command line tool or WAFCACHE_CMD)
24 * WAFCACHE_CMD: bucket upload/download command, for example:
25 WAFCACHE_CMD="gsutil cp %{SRC} %{TGT}"
26 Note that the WAFCACHE bucket value is used for the source or destination
27 depending on the operation (upload or download). For example, with:
28 WAFCACHE="gs://mybucket/"
29 the following commands may be run:
30 gsutil cp build/myprogram gs://mybucket/aa/aaaaa/1
31 gsutil cp gs://mybucket/bb/bbbbb/2 build/somefile
32 * WAFCACHE_NO_PUSH: if set, disables pushing to the cache
33 * WAFCACHE_VERBOSITY: if set, displays more detailed cache operations
34 * WAFCACHE_STATS: if set, displays cache usage statistics on exit
36 File cache specific options:
37 Files are copied using hard links by default; if the cache is located
38 onto another partition, the system switches to file copies instead.
39 * WAFCACHE_TRIM_MAX_FOLDER: maximum amount of tasks to cache (1M)
40 * WAFCACHE_EVICT_MAX_BYTES: maximum amount of cache size in bytes (10GB)
41 * WAFCACHE_EVICT_INTERVAL_MINUTES: minimum time interval to try
42 and trim the cache (3 minutess)
44 Usage::
46 def build(bld):
47 bld.load('wafcache')
48 ...
50 To troubleshoot::
52 waf clean build --zones=wafcache
53 """
55 import atexit, base64, errno, fcntl, getpass, os, re, shutil, sys, time, traceback, urllib3, shlex
56 try:
57 import subprocess32 as subprocess
58 except ImportError:
59 import subprocess
61 base_cache = os.path.expanduser('~/.cache/')
62 if not os.path.isdir(base_cache):
63 base_cache = '/tmp/'
64 default_wafcache_dir = os.path.join(base_cache, 'wafcache_' + getpass.getuser())
66 CACHE_DIR = os.environ.get('WAFCACHE', default_wafcache_dir)
67 WAFCACHE_CMD = os.environ.get('WAFCACHE_CMD')
68 TRIM_MAX_FOLDERS = int(os.environ.get('WAFCACHE_TRIM_MAX_FOLDER', 1000000))
69 EVICT_INTERVAL_MINUTES = int(os.environ.get('WAFCACHE_EVICT_INTERVAL_MINUTES', 3))
70 EVICT_MAX_BYTES = int(os.environ.get('WAFCACHE_EVICT_MAX_BYTES', 10**10))
71 WAFCACHE_NO_PUSH = 1 if os.environ.get('WAFCACHE_NO_PUSH') else 0
72 WAFCACHE_VERBOSITY = 1 if os.environ.get('WAFCACHE_VERBOSITY') else 0
73 WAFCACHE_STATS = 1 if os.environ.get('WAFCACHE_STATS') else 0
74 OK = "ok"
76 re_waf_cmd = re.compile('(?P<src>%{SRC})|(?P<tgt>%{TGT})')
78 try:
79 import cPickle
80 except ImportError:
81 import pickle as cPickle
83 if __name__ != '__main__':
84 from waflib import Task, Logs, Utils, Build
86 def can_retrieve_cache(self):
87 """
88 New method for waf Task classes
89 """
90 if not self.outputs:
91 return False
93 self.cached = False
95 sig = self.signature()
96 ssig = Utils.to_hex(self.uid() + sig)
98 if WAFCACHE_STATS:
99 self.generator.bld.cache_reqs += 1
101 files_to = [node.abspath() for node in self.outputs]
102 err = cache_command(ssig, [], files_to)
103 if err.startswith(OK):
104 if WAFCACHE_VERBOSITY:
105 Logs.pprint('CYAN', ' Fetched %r from cache' % files_to)
106 else:
107 Logs.debug('wafcache: fetched %r from cache', files_to)
108 if WAFCACHE_STATS:
109 self.generator.bld.cache_hits += 1
110 else:
111 if WAFCACHE_VERBOSITY:
112 Logs.pprint('YELLOW', ' No cache entry %s' % files_to)
113 else:
114 Logs.debug('wafcache: No cache entry %s: %s', files_to, err)
115 return False
117 self.cached = True
118 return True
120 def put_files_cache(self):
122 New method for waf Task classes
124 if WAFCACHE_NO_PUSH or getattr(self, 'cached', None) or not self.outputs:
125 return
127 files_from = []
128 for node in self.outputs:
129 path = node.abspath()
130 if not os.path.isfile(path):
131 return
132 files_from.append(path)
134 bld = self.generator.bld
135 sig = self.signature()
136 ssig = Utils.to_hex(self.uid() + sig)
138 err = cache_command(ssig, files_from, [])
140 if err.startswith(OK):
141 if WAFCACHE_VERBOSITY:
142 Logs.pprint('CYAN', ' Successfully uploaded %s to cache' % files_from)
143 else:
144 Logs.debug('wafcache: Successfully uploaded %r to cache', files_from)
145 if WAFCACHE_STATS:
146 self.generator.bld.cache_puts += 1
147 else:
148 if WAFCACHE_VERBOSITY:
149 Logs.pprint('RED', ' Error caching step results %s: %s' % (files_from, err))
150 else:
151 Logs.debug('wafcache: Error caching results %s: %s', files_from, err)
153 bld.task_sigs[self.uid()] = self.cache_sig
155 def hash_env_vars(self, env, vars_lst):
157 Reimplement BuildContext.hash_env_vars so that the resulting hash does not depend on local paths
159 if not env.table:
160 env = env.parent
161 if not env:
162 return Utils.SIG_NIL
164 idx = str(id(env)) + str(vars_lst)
165 try:
166 cache = self.cache_env
167 except AttributeError:
168 cache = self.cache_env = {}
169 else:
170 try:
171 return self.cache_env[idx]
172 except KeyError:
173 pass
175 v = str([env[a] for a in vars_lst])
176 v = v.replace(self.srcnode.abspath().__repr__()[:-1], '')
177 m = Utils.md5()
178 m.update(v.encode())
179 ret = m.digest()
181 Logs.debug('envhash: %r %r', ret, v)
183 cache[idx] = ret
185 return ret
187 def uid(self):
189 Reimplement Task.uid() so that the signature does not depend on local paths
191 try:
192 return self.uid_
193 except AttributeError:
194 m = Utils.md5()
195 src = self.generator.bld.srcnode
196 up = m.update
197 up(self.__class__.__name__.encode())
198 for x in self.inputs + self.outputs:
199 up(x.path_from(src).encode())
200 self.uid_ = m.digest()
201 return self.uid_
204 def make_cached(cls):
206 Enable the waf cache for a given task class
208 if getattr(cls, 'nocache', None) or getattr(cls, 'has_cache', False):
209 return
211 full_name = "%s.%s" % (cls.__module__, cls.__name__)
212 if full_name in ('waflib.Tools.ccroot.vnum', 'waflib.Build.inst'):
213 return
215 m1 = getattr(cls, 'run', None)
216 def run(self):
217 if getattr(self, 'nocache', False):
218 return m1(self)
219 if self.can_retrieve_cache():
220 return 0
221 return m1(self)
222 cls.run = run
224 m2 = getattr(cls, 'post_run', None)
225 def post_run(self):
226 if getattr(self, 'nocache', False):
227 return m2(self)
228 ret = m2(self)
229 self.put_files_cache()
230 return ret
231 cls.post_run = post_run
232 cls.has_cache = True
234 process_pool = []
235 def get_process():
237 Returns a worker process that can process waf cache commands
238 The worker process is assumed to be returned to the process pool when unused
240 try:
241 return process_pool.pop()
242 except IndexError:
243 filepath = os.path.dirname(os.path.abspath(__file__)) + os.sep + 'wafcache.py'
244 cmd = [sys.executable, '-c', Utils.readf(filepath)]
245 return subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0)
247 def atexit_pool():
248 for k in process_pool:
249 try:
250 os.kill(k.pid, 9)
251 except OSError:
252 pass
253 else:
254 k.wait()
255 atexit.register(atexit_pool)
257 def build(bld):
259 Called during the build process to enable file caching
261 if WAFCACHE_STATS:
262 # Init counter for statistics and hook to print results at the end
263 bld.cache_reqs = bld.cache_hits = bld.cache_puts = 0
265 def printstats(bld):
266 hit_ratio = 0
267 if bld.cache_reqs > 0:
268 hit_ratio = (bld.cache_hits / bld.cache_reqs) * 100
269 Logs.pprint('CYAN', ' wafcache stats: requests: %s, hits, %s, ratio: %.2f%%, writes %s' %
270 (bld.cache_reqs, bld.cache_hits, hit_ratio, bld.cache_puts) )
272 bld.add_post_fun(printstats)
274 if process_pool:
275 # already called once
276 return
278 # pre-allocation
279 processes = [get_process() for x in range(bld.jobs)]
280 process_pool.extend(processes)
282 Task.Task.can_retrieve_cache = can_retrieve_cache
283 Task.Task.put_files_cache = put_files_cache
284 Task.Task.uid = uid
285 Build.BuildContext.hash_env_vars = hash_env_vars
286 for x in reversed(list(Task.classes.values())):
287 make_cached(x)
289 def cache_command(sig, files_from, files_to):
291 Create a command for cache worker processes, returns a pickled
292 base64-encoded tuple containing the task signature, a list of files to
293 cache and a list of files files to get from cache (one of the lists
294 is assumed to be empty)
296 proc = get_process()
298 obj = base64.b64encode(cPickle.dumps([sig, files_from, files_to]))
299 proc.stdin.write(obj)
300 proc.stdin.write('\n'.encode())
301 proc.stdin.flush()
302 obj = proc.stdout.readline()
303 if not obj:
304 raise OSError('Preforked sub-process %r died' % proc.pid)
305 process_pool.append(proc)
306 return cPickle.loads(base64.b64decode(obj))
308 try:
309 copyfun = os.link
310 except NameError:
311 copyfun = shutil.copy2
313 def atomic_copy(orig, dest):
315 Copy files to the cache, the operation is atomic for a given file
317 global copyfun
318 tmp = dest + '.tmp'
319 up = os.path.dirname(dest)
320 try:
321 os.makedirs(up)
322 except OSError:
323 pass
325 try:
326 copyfun(orig, tmp)
327 except OSError as e:
328 if e.errno == errno.EXDEV:
329 copyfun = shutil.copy2
330 copyfun(orig, tmp)
331 else:
332 raise
333 os.rename(tmp, dest)
335 def lru_trim():
337 the cache folders take the form:
338 `CACHE_DIR/0b/0b180f82246d726ece37c8ccd0fb1cde2650d7bfcf122ec1f169079a3bfc0ab9`
339 they are listed in order of last access, and then removed
340 until the amount of folders is within TRIM_MAX_FOLDERS and the total space
341 taken by files is less than EVICT_MAX_BYTES
343 lst = []
344 for up in os.listdir(CACHE_DIR):
345 if len(up) == 2:
346 sub = os.path.join(CACHE_DIR, up)
347 for hval in os.listdir(sub):
348 path = os.path.join(sub, hval)
350 size = 0
351 for fname in os.listdir(path):
352 try:
353 size += os.lstat(os.path.join(path, fname)).st_size
354 except OSError:
355 pass
356 lst.append((os.stat(path).st_mtime, size, path))
358 lst.sort(key=lambda x: x[0])
359 lst.reverse()
361 tot = sum(x[1] for x in lst)
362 while tot > EVICT_MAX_BYTES or len(lst) > TRIM_MAX_FOLDERS:
363 _, tmp_size, path = lst.pop()
364 tot -= tmp_size
366 tmp = path + '.remove'
367 try:
368 shutil.rmtree(tmp)
369 except OSError:
370 pass
371 try:
372 os.rename(path, tmp)
373 except OSError:
374 sys.stderr.write('Could not rename %r to %r\n' % (path, tmp))
375 else:
376 try:
377 shutil.rmtree(tmp)
378 except OSError:
379 sys.stderr.write('Could not remove %r\n' % tmp)
380 sys.stderr.write("Cache trimmed: %r bytes in %r folders left\n" % (tot, len(lst)))
383 def lru_evict():
385 Reduce the cache size
387 lockfile = os.path.join(CACHE_DIR, 'all.lock')
388 try:
389 st = os.stat(lockfile)
390 except EnvironmentError as e:
391 if e.errno == errno.ENOENT:
392 with open(lockfile, 'w') as f:
393 f.write('')
394 return
395 else:
396 raise
398 if st.st_mtime < time.time() - EVICT_INTERVAL_MINUTES * 60:
399 # check every EVICT_INTERVAL_MINUTES minutes if the cache is too big
400 # OCLOEXEC is unnecessary because no processes are spawned
401 fd = os.open(lockfile, os.O_RDWR | os.O_CREAT, 0o755)
402 try:
403 try:
404 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
405 except EnvironmentError:
406 if WAFCACHE_VERBOSITY:
407 sys.stderr.write('wafcache: another cleaning process is running\n')
408 else:
409 # now dow the actual cleanup
410 lru_trim()
411 os.utime(lockfile, None)
412 finally:
413 os.close(fd)
415 class netcache(object):
416 def __init__(self):
417 self.http = urllib3.PoolManager()
419 def url_of(self, sig, i):
420 return "%s/%s/%s" % (CACHE_DIR, sig, i)
422 def upload(self, file_path, sig, i):
423 url = self.url_of(sig, i)
424 with open(file_path, 'rb') as f:
425 file_data = f.read()
426 r = self.http.request('POST', url, timeout=60,
427 fields={ 'file': ('%s/%s' % (sig, i), file_data), })
428 if r.status >= 400:
429 raise OSError("Invalid status %r %r" % (url, r.status))
431 def download(self, file_path, sig, i):
432 url = self.url_of(sig, i)
433 with self.http.request('GET', url, preload_content=False, timeout=60) as inf:
434 if inf.status >= 400:
435 raise OSError("Invalid status %r %r" % (url, inf.status))
436 with open(file_path, 'wb') as out:
437 shutil.copyfileobj(inf, out)
439 def copy_to_cache(self, sig, files_from, files_to):
440 try:
441 for i, x in enumerate(files_from):
442 if not os.path.islink(x):
443 self.upload(x, sig, i)
444 except Exception:
445 return traceback.format_exc()
446 return OK
448 def copy_from_cache(self, sig, files_from, files_to):
449 try:
450 for i, x in enumerate(files_to):
451 self.download(x, sig, i)
452 except Exception:
453 return traceback.format_exc()
454 return OK
456 class fcache(object):
457 def __init__(self):
458 if not os.path.exists(CACHE_DIR):
459 os.makedirs(CACHE_DIR)
460 if not os.path.exists(CACHE_DIR):
461 raise ValueError('Could not initialize the cache directory')
463 def copy_to_cache(self, sig, files_from, files_to):
465 Copy files to the cache, existing files are overwritten,
466 and the copy is atomic only for a given file, not for all files
467 that belong to a given task object
469 try:
470 for i, x in enumerate(files_from):
471 dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
472 atomic_copy(x, dest)
473 except Exception:
474 return traceback.format_exc()
475 else:
476 # attempt trimming if caching was successful:
477 # we may have things to trim!
478 try:
479 lru_evict()
480 except Exception:
481 return traceback.format_exc()
482 return OK
484 def copy_from_cache(self, sig, files_from, files_to):
486 Copy files from the cache
488 try:
489 for i, x in enumerate(files_to):
490 orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
491 atomic_copy(orig, x)
493 # success! update the cache time
494 os.utime(os.path.join(CACHE_DIR, sig[:2], sig), None)
495 except Exception:
496 return traceback.format_exc()
497 return OK
499 class bucket_cache(object):
500 def bucket_copy(self, source, target):
501 if WAFCACHE_CMD:
502 def replacer(match):
503 if match.group('src'):
504 return source
505 elif match.group('tgt'):
506 return target
507 cmd = [re_waf_cmd.sub(replacer, x) for x in shlex.split(WAFCACHE_CMD)]
508 elif CACHE_DIR.startswith('s3://'):
509 cmd = ['aws', 's3', 'cp', source, target]
510 elif CACHE_DIR.startswith('gs://'):
511 cmd = ['gsutil', 'cp', source, target]
512 else:
513 cmd = ['mc', 'cp', source, target]
515 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
516 out, err = proc.communicate()
517 if proc.returncode:
518 raise OSError('Error copy %r to %r using: %r (exit %r):\n out:%s\n err:%s' % (
519 source, target, cmd, proc.returncode, out.decode(errors='replace'), err.decode(errors='replace')))
521 def copy_to_cache(self, sig, files_from, files_to):
522 try:
523 for i, x in enumerate(files_from):
524 dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
525 self.bucket_copy(x, dest)
526 except Exception:
527 return traceback.format_exc()
528 return OK
530 def copy_from_cache(self, sig, files_from, files_to):
531 try:
532 for i, x in enumerate(files_to):
533 orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
534 self.bucket_copy(orig, x)
535 except EnvironmentError:
536 return traceback.format_exc()
537 return OK
539 def loop(service):
541 This function is run when this file is run as a standalone python script,
542 it assumes a parent process that will communicate the commands to it
543 as pickled-encoded tuples (one line per command)
545 The commands are to copy files to the cache or copy files from the
546 cache to a target destination
548 # one operation is performed at a single time by a single process
549 # therefore stdin never has more than one line
550 txt = sys.stdin.readline().strip()
551 if not txt:
552 # parent process probably ended
553 sys.exit(1)
554 ret = OK
556 [sig, files_from, files_to] = cPickle.loads(base64.b64decode(txt))
557 if files_from:
558 # TODO return early when pushing files upstream
559 ret = service.copy_to_cache(sig, files_from, files_to)
560 elif files_to:
561 # the build process waits for workers to (possibly) obtain files from the cache
562 ret = service.copy_from_cache(sig, files_from, files_to)
563 else:
564 ret = "Invalid command"
566 obj = base64.b64encode(cPickle.dumps(ret))
567 sys.stdout.write(obj.decode())
568 sys.stdout.write('\n')
569 sys.stdout.flush()
571 if __name__ == '__main__':
572 if CACHE_DIR.startswith('s3://') or CACHE_DIR.startswith('gs://') or CACHE_DIR.startswith('minio://'):
573 if CACHE_DIR.startswith('minio://'):
574 CACHE_DIR = CACHE_DIR[8:] # minio doesn't need the protocol part, uses config aliases
575 service = bucket_cache()
576 elif CACHE_DIR.startswith('http'):
577 service = netcache()
578 else:
579 service = fcache()
580 while 1:
581 try:
582 loop(service)
583 except KeyboardInterrupt:
584 break