3 # Thomas Nagy, 2015 (ita)
6 A version of prefork.py that uses unix sockets. The advantage is that it does not expose
7 connections to the outside. Yet performance it only works on unix-like systems
8 and performance can be slightly worse.
13 # recommended, fork new processes before using more memory
14 opt.load('preforkunix')
17 bld.load('preforkunix')
22 import os
, re
, socket
, threading
, sys
, subprocess
, atexit
, traceback
, signal
, time
24 from queue
import Queue
26 from Queue
import Queue
30 import pickle
as cPickle
38 def make_header(params
, cookie
=''):
39 header
= ','.join(params
)
40 header
= header
.ljust(HEADER_SIZE
- len(cookie
))
41 assert(len(header
) == HEADER_SIZE
- len(cookie
))
42 header
= header
+ cookie
43 if sys
.hexversion
> 0x3000000:
44 header
= header
.encode('iso8859-1')
47 re_valid_query
= re
.compile('^[a-zA-Z0-9_, ]+$')
49 def send_response(conn
, ret
, out
, err
, exc
):
51 data
= (out
, err
, exc
)
52 data
= cPickle
.dumps(data
, -1)
56 params
= [RES
, str(ret
), str(len(data
))]
58 # no need for the cookie in the response
59 conn
.send(make_header(params
))
63 def process_command(conn
):
64 query
= conn
.recv(HEADER_SIZE
)
68 assert(len(query
) == HEADER_SIZE
)
69 if sys
.hexversion
> 0x3000000:
70 query
= query
.decode('iso8859-1')
73 if not re_valid_query
.match(query
):
74 send_response(conn
, -1, '', '', 'Invalid query %r' % query
)
75 raise ValueError('Invalid query %r' % query
)
77 query
= query
.strip().split(',')
80 run_command(conn
, query
[1:])
82 raise ValueError('Exit')
84 raise ValueError('Invalid query %r' % query
)
87 def run_command(conn
, query
):
90 data
= conn
.recv(size
)
91 assert(len(data
) == size
)
92 kw
= cPickle
.loads(data
)
95 ret
= out
= err
= exc
= None
101 if kw
['stdout'] or kw
['stderr']:
102 p
= subprocess
.Popen(cmd
, **kw
)
103 (out
, err
) = p
.communicate()
106 ret
= subprocess
.Popen(cmd
, **kw
).wait()
107 except KeyboardInterrupt:
109 except Exception as e
:
111 exc
= str(e
) + traceback
.format_exc()
113 send_response(conn
, ret
, out
, err
, exc
)
117 from waflib
import Logs
, Utils
, Runner
, Errors
, Options
119 def init_task_pool(self
):
120 # lazy creation, and set a common pool for all task consumers
121 pool
= self
.pool
= []
122 for i
in range(self
.numjobs
):
123 consumer
= Runner
.get_pool()
124 pool
.append(consumer
)
126 self
.ready
= Queue(0)
128 consumer
.ready
= self
.ready
130 threading
.current_thread().idx
= consumer
.idx
131 except Exception as e
:
136 Runner
.Parallel
.init_task_pool
= init_task_pool
139 child_socket
, parent_socket
= socket
.socketpair(socket
.AF_UNIX
)
143 parent_socket
.close()
145 # if the parent crashes, try to exit cleanly
154 os
.kill(os
.getpid(), signal
.SIGKILL
)
155 t
= threading
.Thread(target
=reap
)
159 # write to child_socket only
161 while process_command(child_socket
):
163 except KeyboardInterrupt:
167 return (pid
, parent_socket
)
185 atexit
.register(close_all
)
187 def put_data(conn
, data
):
189 while cnt
< len(data
):
190 sent
= conn
.send(data
[cnt
:])
192 raise RuntimeError('connection ended')
195 def read_data(conn
, siz
):
199 data
= conn
.recv(min(siz
- cnt
, 1024))
201 raise RuntimeError('connection ended %r %r' % (cnt
, siz
))
204 if sys
.hexversion
> 0x3000000:
205 ret
= ''.encode('iso8859-1').join(buf
)
210 def exec_command(self
, cmd
, **kw
):
212 if kw
['stdout'] not in (None, subprocess
.PIPE
):
213 return self
.exec_command_old(cmd
, **kw
)
215 if kw
['stderr'] not in (None, subprocess
.PIPE
):
216 return self
.exec_command_old(cmd
, **kw
)
218 kw
['shell'] = isinstance(cmd
, str)
219 Logs
.debug('runner: %r' % cmd
)
220 Logs
.debug('runner_env: kw=%s' % kw
)
223 self
.logger
.info(cmd
)
225 if 'stdout' not in kw
:
226 kw
['stdout'] = subprocess
.PIPE
227 if 'stderr' not in kw
:
228 kw
['stderr'] = subprocess
.PIPE
230 if Logs
.verbose
and not kw
['shell'] and not Utils
.check_exe(cmd
[0]):
231 raise Errors
.WafError("Program %s not found!" % cmd
[0])
233 idx
= threading
.current_thread().idx
237 #print("sub %r %r" % (idx, cmd))
238 #print("write to %r %r" % (idx, cmd))
240 data
= cPickle
.dumps(kw
, -1)
241 params
= [REQ
, str(len(data
))]
242 header
= make_header(params
)
246 put_data(conn
, header
+ data
)
248 #print("running %r %r" % (idx, cmd))
249 #print("read from %r %r" % (idx, cmd))
251 data
= read_data(conn
, HEADER_SIZE
)
252 if sys
.hexversion
> 0x3000000:
253 data
= data
.decode('iso8859-1')
255 #print("received %r" % data)
256 lst
= data
.split(',')
262 data
= read_data(conn
, dlen
)
263 (out
, err
, exc
) = cPickle
.loads(data
)
265 raise Errors
.WafError('Execution failure: %s' % exc
)
268 if not isinstance(out
, str):
269 out
= out
.decode(sys
.stdout
.encoding
or 'iso8859-1')
271 self
.logger
.debug('out: %s' % out
)
273 Logs
.info(out
, extra
={'stream':sys
.stdout
, 'c1': ''})
275 if not isinstance(err
, str):
276 err
= err
.decode(sys
.stdout
.encoding
or 'iso8859-1')
278 self
.logger
.error('err: %s' % err
)
280 Logs
.info(err
, extra
={'stream':sys
.stderr
, 'c1': ''})
285 if not getattr(Options
.options
, 'smp', getattr(self
, 'smp', None)):
287 if Utils
.unversioned_sys_platform() in ('freebsd',):
289 cmd
= ['cpuset', '-l', '0', '-p', str(pid
)]
290 elif Utils
.unversioned_sys_platform() in ('linux',):
292 cmd
= ['taskset', '-pc', '0', str(pid
)]
294 self
.cmd_and_log(cmd
, quiet
=0)
297 # memory consumption might be at the lowest point while processing options
298 opt
.add_option('--pin-process', action
='store_true', dest
='smp', default
=False)
299 if Utils
.is_win32
or os
.sep
!= '/':
301 while len(CONNS
) < 30:
302 (pid
, conn
) = make_conn(opt
)
307 if Utils
.is_win32
or os
.sep
!= '/':
309 if bld
.cmd
== 'clean':
311 while len(CONNS
) < bld
.jobs
:
312 (pid
, conn
) = make_conn(bld
)
316 bld
.__class
__.exec_command_old
= bld
.__class
__.exec_command
317 bld
.__class
__.exec_command
= exec_command