3 # Thomas Nagy, 2019 (ita)
6 Filesystem-based cache system to share and re-use build artifacts
8 Cache access operations (copy to and from) are delegated to
9 independent pre-forked worker subprocesses.
11 The following environment variables may be set:
12 * WAFCACHE: several possibilities:
14 absolute path of the waf cache (~/.cache/wafcache_user,
15 where `user` represents the currently logged-in user)
16 - URL to a cache server, for example:
17 export WAFCACHE=http://localhost:8080/files/
18 in that case, GET/POST requests are made to urls of the form
19 http://localhost:8080/files/000000000/0 (cache management is delegated to the server)
20 - GCS, S3 or MINIO bucket
21 gs://my-bucket/ (uses gsutil command line tool or WAFCACHE_CMD)
22 s3://my-bucket/ (uses aws command line tool or WAFCACHE_CMD)
23 minio://my-bucket/ (uses mc command line tool or WAFCACHE_CMD)
24 * WAFCACHE_CMD: bucket upload/download command, for example:
25 WAFCACHE_CMD="gsutil cp %{SRC} %{TGT}"
26 Note that the WAFCACHE bucket value is used for the source or destination
27 depending on the operation (upload or download). For example, with:
28 WAFCACHE="gs://mybucket/"
29 the following commands may be run:
30 gsutil cp build/myprogram gs://mybucket/aa/aaaaa/1
31 gsutil cp gs://mybucket/bb/bbbbb/2 build/somefile
32 * WAFCACHE_NO_PUSH: if set, disables pushing to the cache
33 * WAFCACHE_VERBOSITY: if set, displays more detailed cache operations
34 * WAFCACHE_STATS: if set, displays cache usage statistics on exit
36 File cache specific options:
37 Files are copied using hard links by default; if the cache is located
38 onto another partition, the system switches to file copies instead.
39 * WAFCACHE_TRIM_MAX_FOLDER: maximum amount of tasks to cache (1M)
40 * WAFCACHE_EVICT_MAX_BYTES: maximum amount of cache size in bytes (10GB)
41 * WAFCACHE_EVICT_INTERVAL_MINUTES: minimum time interval to try
42 and trim the cache (3 minutess)
52 waf clean build --zones=wafcache
55 import atexit
, base64
, errno
, fcntl
, getpass
, os
, re
, shutil
, sys
, time
, traceback
, urllib3
, shlex
57 import subprocess32
as subprocess
61 base_cache
= os
.path
.expanduser('~/.cache/')
62 if not os
.path
.isdir(base_cache
):
64 default_wafcache_dir
= os
.path
.join(base_cache
, 'wafcache_' + getpass
.getuser())
66 CACHE_DIR
= os
.environ
.get('WAFCACHE', default_wafcache_dir
)
67 WAFCACHE_CMD
= os
.environ
.get('WAFCACHE_CMD')
68 TRIM_MAX_FOLDERS
= int(os
.environ
.get('WAFCACHE_TRIM_MAX_FOLDER', 1000000))
69 EVICT_INTERVAL_MINUTES
= int(os
.environ
.get('WAFCACHE_EVICT_INTERVAL_MINUTES', 3))
70 EVICT_MAX_BYTES
= int(os
.environ
.get('WAFCACHE_EVICT_MAX_BYTES', 10**10))
71 WAFCACHE_NO_PUSH
= 1 if os
.environ
.get('WAFCACHE_NO_PUSH') else 0
72 WAFCACHE_VERBOSITY
= 1 if os
.environ
.get('WAFCACHE_VERBOSITY') else 0
73 WAFCACHE_STATS
= 1 if os
.environ
.get('WAFCACHE_STATS') else 0
76 re_waf_cmd
= re
.compile('(?P<src>%{SRC})|(?P<tgt>%{TGT})')
81 import pickle
as cPickle
83 if __name__
!= '__main__':
84 from waflib
import Task
, Logs
, Utils
, Build
86 def can_retrieve_cache(self
):
88 New method for waf Task classes
95 sig
= self
.signature()
96 ssig
= Utils
.to_hex(self
.uid() + sig
)
99 self
.generator
.bld
.cache_reqs
+= 1
101 files_to
= [node
.abspath() for node
in self
.outputs
]
102 err
= cache_command(ssig
, [], files_to
)
103 if err
.startswith(OK
):
104 if WAFCACHE_VERBOSITY
:
105 Logs
.pprint('CYAN', ' Fetched %r from cache' % files_to
)
107 Logs
.debug('wafcache: fetched %r from cache', files_to
)
109 self
.generator
.bld
.cache_hits
+= 1
111 if WAFCACHE_VERBOSITY
:
112 Logs
.pprint('YELLOW', ' No cache entry %s' % files_to
)
114 Logs
.debug('wafcache: No cache entry %s: %s', files_to
, err
)
120 def put_files_cache(self
):
122 New method for waf Task classes
124 if WAFCACHE_NO_PUSH
or getattr(self
, 'cached', None) or not self
.outputs
:
128 for node
in self
.outputs
:
129 path
= node
.abspath()
130 if not os
.path
.isfile(path
):
132 files_from
.append(path
)
134 bld
= self
.generator
.bld
135 sig
= self
.signature()
136 ssig
= Utils
.to_hex(self
.uid() + sig
)
138 err
= cache_command(ssig
, files_from
, [])
140 if err
.startswith(OK
):
141 if WAFCACHE_VERBOSITY
:
142 Logs
.pprint('CYAN', ' Successfully uploaded %s to cache' % files_from
)
144 Logs
.debug('wafcache: Successfully uploaded %r to cache', files_from
)
146 self
.generator
.bld
.cache_puts
+= 1
148 if WAFCACHE_VERBOSITY
:
149 Logs
.pprint('RED', ' Error caching step results %s: %s' % (files_from
, err
))
151 Logs
.debug('wafcache: Error caching results %s: %s', files_from
, err
)
153 bld
.task_sigs
[self
.uid()] = self
.cache_sig
155 def hash_env_vars(self
, env
, vars_lst
):
157 Reimplement BuildContext.hash_env_vars so that the resulting hash does not depend on local paths
164 idx
= str(id(env
)) + str(vars_lst
)
166 cache
= self
.cache_env
167 except AttributeError:
168 cache
= self
.cache_env
= {}
171 return self
.cache_env
[idx
]
175 v
= str([env
[a
] for a
in vars_lst
])
176 v
= v
.replace(self
.srcnode
.abspath().__repr
__()[:-1], '')
181 Logs
.debug('envhash: %r %r', ret
, v
)
189 Reimplement Task.uid() so that the signature does not depend on local paths
193 except AttributeError:
195 src
= self
.generator
.bld
.srcnode
197 up(self
.__class
__.__name
__.encode())
198 for x
in self
.inputs
+ self
.outputs
:
199 up(x
.path_from(src
).encode())
200 self
.uid_
= m
.digest()
204 def make_cached(cls
):
206 Enable the waf cache for a given task class
208 if getattr(cls
, 'nocache', None) or getattr(cls
, 'has_cache', False):
211 full_name
= "%s.%s" % (cls
.__module
__, cls
.__name
__)
212 if full_name
in ('waflib.Tools.ccroot.vnum', 'waflib.Build.inst'):
215 m1
= getattr(cls
, 'run', None)
217 if getattr(self
, 'nocache', False):
219 if self
.can_retrieve_cache():
224 m2
= getattr(cls
, 'post_run', None)
226 if getattr(self
, 'nocache', False):
229 self
.put_files_cache()
231 cls
.post_run
= post_run
237 Returns a worker process that can process waf cache commands
238 The worker process is assumed to be returned to the process pool when unused
241 return process_pool
.pop()
243 filepath
= os
.path
.dirname(os
.path
.abspath(__file__
)) + os
.sep
+ 'wafcache.py'
244 cmd
= [sys
.executable
, '-c', Utils
.readf(filepath
)]
245 return subprocess
.Popen(cmd
, stdout
=subprocess
.PIPE
, stdin
=subprocess
.PIPE
, bufsize
=0)
248 for k
in process_pool
:
255 atexit
.register(atexit_pool
)
259 Called during the build process to enable file caching
262 # Init counter for statistics and hook to print results at the end
263 bld
.cache_reqs
= bld
.cache_hits
= bld
.cache_puts
= 0
267 if bld
.cache_reqs
> 0:
268 hit_ratio
= (bld
.cache_hits
/ bld
.cache_reqs
) * 100
269 Logs
.pprint('CYAN', ' wafcache stats: requests: %s, hits, %s, ratio: %.2f%%, writes %s' %
270 (bld
.cache_reqs
, bld
.cache_hits
, hit_ratio
, bld
.cache_puts
) )
272 bld
.add_post_fun(printstats
)
275 # already called once
279 processes
= [get_process() for x
in range(bld
.jobs
)]
280 process_pool
.extend(processes
)
282 Task
.Task
.can_retrieve_cache
= can_retrieve_cache
283 Task
.Task
.put_files_cache
= put_files_cache
285 Build
.BuildContext
.hash_env_vars
= hash_env_vars
286 for x
in reversed(list(Task
.classes
.values())):
289 def cache_command(sig
, files_from
, files_to
):
291 Create a command for cache worker processes, returns a pickled
292 base64-encoded tuple containing the task signature, a list of files to
293 cache and a list of files files to get from cache (one of the lists
294 is assumed to be empty)
298 obj
= base64
.b64encode(cPickle
.dumps([sig
, files_from
, files_to
]))
299 proc
.stdin
.write(obj
)
300 proc
.stdin
.write('\n'.encode())
302 obj
= proc
.stdout
.readline()
304 raise OSError('Preforked sub-process %r died' % proc
.pid
)
305 process_pool
.append(proc
)
306 return cPickle
.loads(base64
.b64decode(obj
))
311 copyfun
= shutil
.copy2
313 def atomic_copy(orig
, dest
):
315 Copy files to the cache, the operation is atomic for a given file
319 up
= os
.path
.dirname(dest
)
328 if e
.errno
== errno
.EXDEV
:
329 copyfun
= shutil
.copy2
337 the cache folders take the form:
338 `CACHE_DIR/0b/0b180f82246d726ece37c8ccd0fb1cde2650d7bfcf122ec1f169079a3bfc0ab9`
339 they are listed in order of last access, and then removed
340 until the amount of folders is within TRIM_MAX_FOLDERS and the total space
341 taken by files is less than EVICT_MAX_BYTES
344 for up
in os
.listdir(CACHE_DIR
):
346 sub
= os
.path
.join(CACHE_DIR
, up
)
347 for hval
in os
.listdir(sub
):
348 path
= os
.path
.join(sub
, hval
)
351 for fname
in os
.listdir(path
):
353 size
+= os
.lstat(os
.path
.join(path
, fname
)).st_size
356 lst
.append((os
.stat(path
).st_mtime
, size
, path
))
358 lst
.sort(key
=lambda x
: x
[0])
361 tot
= sum(x
[1] for x
in lst
)
362 while tot
> EVICT_MAX_BYTES
or len(lst
) > TRIM_MAX_FOLDERS
:
363 _
, tmp_size
, path
= lst
.pop()
366 tmp
= path
+ '.remove'
374 sys
.stderr
.write('Could not rename %r to %r\n' % (path
, tmp
))
379 sys
.stderr
.write('Could not remove %r\n' % tmp
)
380 sys
.stderr
.write("Cache trimmed: %r bytes in %r folders left\n" % (tot
, len(lst
)))
385 Reduce the cache size
387 lockfile
= os
.path
.join(CACHE_DIR
, 'all.lock')
389 st
= os
.stat(lockfile
)
390 except EnvironmentError as e
:
391 if e
.errno
== errno
.ENOENT
:
392 with
open(lockfile
, 'w') as f
:
398 if st
.st_mtime
< time
.time() - EVICT_INTERVAL_MINUTES
* 60:
399 # check every EVICT_INTERVAL_MINUTES minutes if the cache is too big
400 # OCLOEXEC is unnecessary because no processes are spawned
401 fd
= os
.open(lockfile
, os
.O_RDWR | os
.O_CREAT
, 0o755)
404 fcntl
.flock(fd
, fcntl
.LOCK_EX | fcntl
.LOCK_NB
)
405 except EnvironmentError:
406 if WAFCACHE_VERBOSITY
:
407 sys
.stderr
.write('wafcache: another cleaning process is running\n')
409 # now dow the actual cleanup
411 os
.utime(lockfile
, None)
415 class netcache(object):
417 self
.http
= urllib3
.PoolManager()
419 def url_of(self
, sig
, i
):
420 return "%s/%s/%s" % (CACHE_DIR
, sig
, i
)
422 def upload(self
, file_path
, sig
, i
):
423 url
= self
.url_of(sig
, i
)
424 with
open(file_path
, 'rb') as f
:
426 r
= self
.http
.request('POST', url
, timeout
=60,
427 fields
={ 'file': ('%s/%s' % (sig
, i
), file_data
), })
429 raise OSError("Invalid status %r %r" % (url
, r
.status
))
431 def download(self
, file_path
, sig
, i
):
432 url
= self
.url_of(sig
, i
)
433 with self
.http
.request('GET', url
, preload_content
=False, timeout
=60) as inf
:
434 if inf
.status
>= 400:
435 raise OSError("Invalid status %r %r" % (url
, inf
.status
))
436 with
open(file_path
, 'wb') as out
:
437 shutil
.copyfileobj(inf
, out
)
439 def copy_to_cache(self
, sig
, files_from
, files_to
):
441 for i
, x
in enumerate(files_from
):
442 if not os
.path
.islink(x
):
443 self
.upload(x
, sig
, i
)
445 return traceback
.format_exc()
448 def copy_from_cache(self
, sig
, files_from
, files_to
):
450 for i
, x
in enumerate(files_to
):
451 self
.download(x
, sig
, i
)
453 return traceback
.format_exc()
456 class fcache(object):
458 if not os
.path
.exists(CACHE_DIR
):
459 os
.makedirs(CACHE_DIR
)
460 if not os
.path
.exists(CACHE_DIR
):
461 raise ValueError('Could not initialize the cache directory')
463 def copy_to_cache(self
, sig
, files_from
, files_to
):
465 Copy files to the cache, existing files are overwritten,
466 and the copy is atomic only for a given file, not for all files
467 that belong to a given task object
470 for i
, x
in enumerate(files_from
):
471 dest
= os
.path
.join(CACHE_DIR
, sig
[:2], sig
, str(i
))
474 return traceback
.format_exc()
476 # attempt trimming if caching was successful:
477 # we may have things to trim!
481 return traceback
.format_exc()
484 def copy_from_cache(self
, sig
, files_from
, files_to
):
486 Copy files from the cache
489 for i
, x
in enumerate(files_to
):
490 orig
= os
.path
.join(CACHE_DIR
, sig
[:2], sig
, str(i
))
493 # success! update the cache time
494 os
.utime(os
.path
.join(CACHE_DIR
, sig
[:2], sig
), None)
496 return traceback
.format_exc()
499 class bucket_cache(object):
500 def bucket_copy(self
, source
, target
):
503 if match
.group('src'):
505 elif match
.group('tgt'):
507 cmd
= [re_waf_cmd
.sub(replacer
, x
) for x
in shlex
.split(WAFCACHE_CMD
)]
508 elif CACHE_DIR
.startswith('s3://'):
509 cmd
= ['aws', 's3', 'cp', source
, target
]
510 elif CACHE_DIR
.startswith('gs://'):
511 cmd
= ['gsutil', 'cp', source
, target
]
513 cmd
= ['mc', 'cp', source
, target
]
515 proc
= subprocess
.Popen(cmd
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
)
516 out
, err
= proc
.communicate()
518 raise OSError('Error copy %r to %r using: %r (exit %r):\n out:%s\n err:%s' % (
519 source
, target
, cmd
, proc
.returncode
, out
.decode(errors
='replace'), err
.decode(errors
='replace')))
521 def copy_to_cache(self
, sig
, files_from
, files_to
):
523 for i
, x
in enumerate(files_from
):
524 dest
= os
.path
.join(CACHE_DIR
, sig
[:2], sig
, str(i
))
525 self
.bucket_copy(x
, dest
)
527 return traceback
.format_exc()
530 def copy_from_cache(self
, sig
, files_from
, files_to
):
532 for i
, x
in enumerate(files_to
):
533 orig
= os
.path
.join(CACHE_DIR
, sig
[:2], sig
, str(i
))
534 self
.bucket_copy(orig
, x
)
535 except EnvironmentError:
536 return traceback
.format_exc()
541 This function is run when this file is run as a standalone python script,
542 it assumes a parent process that will communicate the commands to it
543 as pickled-encoded tuples (one line per command)
545 The commands are to copy files to the cache or copy files from the
546 cache to a target destination
548 # one operation is performed at a single time by a single process
549 # therefore stdin never has more than one line
550 txt
= sys
.stdin
.readline().strip()
552 # parent process probably ended
556 [sig
, files_from
, files_to
] = cPickle
.loads(base64
.b64decode(txt
))
558 # TODO return early when pushing files upstream
559 ret
= service
.copy_to_cache(sig
, files_from
, files_to
)
561 # the build process waits for workers to (possibly) obtain files from the cache
562 ret
= service
.copy_from_cache(sig
, files_from
, files_to
)
564 ret
= "Invalid command"
566 obj
= base64
.b64encode(cPickle
.dumps(ret
))
567 sys
.stdout
.write(obj
.decode())
568 sys
.stdout
.write('\n')
571 if __name__
== '__main__':
572 if CACHE_DIR
.startswith('s3://') or CACHE_DIR
.startswith('gs://') or CACHE_DIR
.startswith('minio://'):
573 if CACHE_DIR
.startswith('minio://'):
574 CACHE_DIR
= CACHE_DIR
[8:] # minio doesn't need the protocol part, uses config aliases
575 service
= bucket_cache()
576 elif CACHE_DIR
.startswith('http'):
583 except KeyboardInterrupt: