thirdparty:waf: New files for waf 1.9.10
[Samba.git] / third_party / waf / waflib / extras / preforkunix.py
blobec9aeeb10e73736be09b9f54b3fc6d5568bb4388
1 #! /usr/bin/env python
2 # encoding: utf-8
3 # Thomas Nagy, 2015 (ita)
5 """
6 A version of prefork.py that uses unix sockets. The advantage is that it does not expose
7 connections to the outside. Yet performance it only works on unix-like systems
8 and performance can be slightly worse.
10 To use::
12 def options(opt):
13 # recommended, fork new processes before using more memory
14 opt.load('preforkunix')
16 def build(bld):
17 bld.load('preforkunix')
18 ...
19 more code
20 """
22 import os, re, socket, threading, sys, subprocess, atexit, traceback, signal, time
23 try:
24 from queue import Queue
25 except ImportError:
26 from Queue import Queue
27 try:
28 import cPickle
29 except ImportError:
30 import pickle as cPickle
32 HEADER_SIZE = 20
34 REQ = 'REQ'
35 RES = 'RES'
36 BYE = 'BYE'
38 def make_header(params, cookie=''):
39 header = ','.join(params)
40 header = header.ljust(HEADER_SIZE - len(cookie))
41 assert(len(header) == HEADER_SIZE - len(cookie))
42 header = header + cookie
43 if sys.hexversion > 0x3000000:
44 header = header.encode('iso8859-1')
45 return header
47 re_valid_query = re.compile('^[a-zA-Z0-9_, ]+$')
48 if 1:
49 def send_response(conn, ret, out, err, exc):
50 if out or err or exc:
51 data = (out, err, exc)
52 data = cPickle.dumps(data, -1)
53 else:
54 data = ''
56 params = [RES, str(ret), str(len(data))]
58 # no need for the cookie in the response
59 conn.send(make_header(params))
60 if data:
61 conn.send(data)
63 def process_command(conn):
64 query = conn.recv(HEADER_SIZE)
65 if not query:
66 return None
67 #print(len(query))
68 assert(len(query) == HEADER_SIZE)
69 if sys.hexversion > 0x3000000:
70 query = query.decode('iso8859-1')
72 #print "%r" % query
73 if not re_valid_query.match(query):
74 send_response(conn, -1, '', '', 'Invalid query %r' % query)
75 raise ValueError('Invalid query %r' % query)
77 query = query.strip().split(',')
79 if query[0] == REQ:
80 run_command(conn, query[1:])
81 elif query[0] == BYE:
82 raise ValueError('Exit')
83 else:
84 raise ValueError('Invalid query %r' % query)
85 return 'ok'
87 def run_command(conn, query):
89 size = int(query[0])
90 data = conn.recv(size)
91 assert(len(data) == size)
92 kw = cPickle.loads(data)
94 # run command
95 ret = out = err = exc = None
96 cmd = kw['cmd']
97 del kw['cmd']
98 #print(cmd)
100 try:
101 if kw['stdout'] or kw['stderr']:
102 p = subprocess.Popen(cmd, **kw)
103 (out, err) = p.communicate()
104 ret = p.returncode
105 else:
106 ret = subprocess.Popen(cmd, **kw).wait()
107 except KeyboardInterrupt:
108 raise
109 except Exception as e:
110 ret = -1
111 exc = str(e) + traceback.format_exc()
113 send_response(conn, ret, out, err, exc)
115 if 1:
117 from waflib import Logs, Utils, Runner, Errors, Options
119 def init_task_pool(self):
120 # lazy creation, and set a common pool for all task consumers
121 pool = self.pool = []
122 for i in range(self.numjobs):
123 consumer = Runner.get_pool()
124 pool.append(consumer)
125 consumer.idx = i
126 self.ready = Queue(0)
127 def setq(consumer):
128 consumer.ready = self.ready
129 try:
130 threading.current_thread().idx = consumer.idx
131 except Exception as e:
132 print(e)
133 for x in pool:
134 x.ready.put(setq)
135 return pool
136 Runner.Parallel.init_task_pool = init_task_pool
138 def make_conn(bld):
139 child_socket, parent_socket = socket.socketpair(socket.AF_UNIX)
140 ppid = os.getpid()
141 pid = os.fork()
142 if pid == 0:
143 parent_socket.close()
145 # if the parent crashes, try to exit cleanly
146 def reap():
147 while 1:
148 try:
149 os.kill(ppid, 0)
150 except OSError:
151 break
152 else:
153 time.sleep(1)
154 os.kill(os.getpid(), signal.SIGKILL)
155 t = threading.Thread(target=reap)
156 t.setDaemon(True)
157 t.start()
159 # write to child_socket only
160 try:
161 while process_command(child_socket):
162 pass
163 except KeyboardInterrupt:
164 sys.exit(2)
165 else:
166 child_socket.close()
167 return (pid, parent_socket)
169 SERVERS = []
170 CONNS = []
171 def close_all():
172 global SERVERS, CONS
173 while CONNS:
174 conn = CONNS.pop()
175 try:
176 conn.close()
177 except:
178 pass
179 while SERVERS:
180 pid = SERVERS.pop()
181 try:
182 os.kill(pid, 9)
183 except:
184 pass
185 atexit.register(close_all)
187 def put_data(conn, data):
188 cnt = 0
189 while cnt < len(data):
190 sent = conn.send(data[cnt:])
191 if sent == 0:
192 raise RuntimeError('connection ended')
193 cnt += sent
195 def read_data(conn, siz):
196 cnt = 0
197 buf = []
198 while cnt < siz:
199 data = conn.recv(min(siz - cnt, 1024))
200 if not data:
201 raise RuntimeError('connection ended %r %r' % (cnt, siz))
202 buf.append(data)
203 cnt += len(data)
204 if sys.hexversion > 0x3000000:
205 ret = ''.encode('iso8859-1').join(buf)
206 else:
207 ret = ''.join(buf)
208 return ret
210 def exec_command(self, cmd, **kw):
211 if 'stdout' in kw:
212 if kw['stdout'] not in (None, subprocess.PIPE):
213 return self.exec_command_old(cmd, **kw)
214 elif 'stderr' in kw:
215 if kw['stderr'] not in (None, subprocess.PIPE):
216 return self.exec_command_old(cmd, **kw)
218 kw['shell'] = isinstance(cmd, str)
219 Logs.debug('runner: %r' % cmd)
220 Logs.debug('runner_env: kw=%s' % kw)
222 if self.logger:
223 self.logger.info(cmd)
225 if 'stdout' not in kw:
226 kw['stdout'] = subprocess.PIPE
227 if 'stderr' not in kw:
228 kw['stderr'] = subprocess.PIPE
230 if Logs.verbose and not kw['shell'] and not Utils.check_exe(cmd[0]):
231 raise Errors.WafError("Program %s not found!" % cmd[0])
233 idx = threading.current_thread().idx
234 kw['cmd'] = cmd
236 # serialization..
237 #print("sub %r %r" % (idx, cmd))
238 #print("write to %r %r" % (idx, cmd))
240 data = cPickle.dumps(kw, -1)
241 params = [REQ, str(len(data))]
242 header = make_header(params)
244 conn = CONNS[idx]
246 put_data(conn, header + data)
248 #print("running %r %r" % (idx, cmd))
249 #print("read from %r %r" % (idx, cmd))
251 data = read_data(conn, HEADER_SIZE)
252 if sys.hexversion > 0x3000000:
253 data = data.decode('iso8859-1')
255 #print("received %r" % data)
256 lst = data.split(',')
257 ret = int(lst[1])
258 dlen = int(lst[2])
260 out = err = None
261 if dlen:
262 data = read_data(conn, dlen)
263 (out, err, exc) = cPickle.loads(data)
264 if exc:
265 raise Errors.WafError('Execution failure: %s' % exc)
267 if out:
268 if not isinstance(out, str):
269 out = out.decode(sys.stdout.encoding or 'iso8859-1')
270 if self.logger:
271 self.logger.debug('out: %s' % out)
272 else:
273 Logs.info(out, extra={'stream':sys.stdout, 'c1': ''})
274 if err:
275 if not isinstance(err, str):
276 err = err.decode(sys.stdout.encoding or 'iso8859-1')
277 if self.logger:
278 self.logger.error('err: %s' % err)
279 else:
280 Logs.info(err, extra={'stream':sys.stderr, 'c1': ''})
282 return ret
284 def init_smp(self):
285 if not getattr(Options.options, 'smp', getattr(self, 'smp', None)):
286 return
287 if Utils.unversioned_sys_platform() in ('freebsd',):
288 pid = os.getpid()
289 cmd = ['cpuset', '-l', '0', '-p', str(pid)]
290 elif Utils.unversioned_sys_platform() in ('linux',):
291 pid = os.getpid()
292 cmd = ['taskset', '-pc', '0', str(pid)]
293 if cmd:
294 self.cmd_and_log(cmd, quiet=0)
296 def options(opt):
297 # memory consumption might be at the lowest point while processing options
298 opt.add_option('--pin-process', action='store_true', dest='smp', default=False)
299 if Utils.is_win32 or os.sep != '/':
300 return
301 while len(CONNS) < 30:
302 (pid, conn) = make_conn(opt)
303 SERVERS.append(pid)
304 CONNS.append(conn)
306 def build(bld):
307 if Utils.is_win32 or os.sep != '/':
308 return
309 if bld.cmd == 'clean':
310 return
311 while len(CONNS) < bld.jobs:
312 (pid, conn) = make_conn(bld)
313 SERVERS.append(pid)
314 CONNS.append(conn)
315 init_smp(bld)
316 bld.__class__.exec_command_old = bld.__class__.exec_command
317 bld.__class__.exec_command = exec_command