typo error
[pp3.git] / pp.py
blob71ea349d50173097495c7fd6963531a92dc9ea05
1 # -*- coding: utf-8 -*-
2 # Copyright (c) 2009 Ondřej Súkup
3 # Parallel Python Software: http://www.parallelpython.com
4 # Copyright (c) 2005-2009, Vitalii Vanovschi
5 # All rights reserved.
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions are met:
8 # * Redistributions of source code must retain the above copyright notice,
9 # this list of conditions and the following disclaimer.
10 # * Redistributions in binary form must reproduce the above copyright
11 # notice, this list of conditions and the following disclaimer in the
12 # documentation and/or other materials provided with the distribution.
13 # * Neither the name of the author nor the names of its contributors
14 # may be used to endorse or promote products derived from this software
15 # without specific prior written permission.
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
27 # THE POSSIBILITY OF SUCH DAMAGE.
29 import os
30 import _thread
31 import logging
32 import inspect
33 import sys
34 import types
35 import time
36 import atexit
37 import user
38 import pickle as pickle
39 import pptransport
40 import ppauto
42 copyright = "Copyright (c) 2009 Ondřej Súkup / \
43 Copyright (c) 2005-2009 Vitalii Vanovschi. All rights reserved"
44 version = "3.0.0"
46 # reconnect persistent rworkers in 5 sec
47 _RECONNECT_WAIT_TIME = 5
49 _USE_SUBPROCESS = False
50 try:
51 import subprocess
52 _USE_SUBPROCESS = True
53 except ImportError:
54 import popen2
57 class _Task(object):
58 """Class describing single task (job)
59 """
61 def __init__(self, server, tid, callback=None,
62 callbackargs=(), group='default'):
63 """Initializes the task"""
64 self.lock = _thread.allocate_lock()
65 self.lock.acquire()
66 self.tid = tid
67 self.server = server
68 self.callback = callback
69 self.callbackargs = callbackargs
70 self.group = group
71 self.finished = False
72 self.unpickled = False
74 def finalize(self, sresult):
75 """Finalizes the task.
77 For internal use only"""
78 self.sresult = sresult
79 if self.callback:
80 self.__unpickle()
81 self.lock.release()
82 self.finished = True
84 def __call__(self, raw_result=False):
85 """Retrieves result of the task"""
86 self.wait()
88 if not self.unpickled and not raw_result:
89 self.__unpickle()
91 if raw_result:
92 return self.sresult
93 else:
94 return self.result
96 def wait(self):
97 """Waits for the task"""
98 if not self.finished:
99 self.lock.acquire()
100 self.lock.release()
102 def __unpickle(self):
103 """Unpickles the result of the task"""
104 self.result, sout = pickle.loads(self.sresult)
105 self.unpickled = True
106 if len(sout) > 0:
107 print(sout, end=' ')
108 if self.callback:
109 args = self.callbackargs + (self.result, )
110 self.callback(*args)
113 class _Worker(object):
114 """Local worker class
116 command = "\"" + sys.executable + "\" -u \"" \
117 + os.path.dirname(os.path.abspath(__file__))\
118 + os.sep + "ppworker.py\""
120 if sys.platform.startswith("win"):
121 # workargound for windows
122 command = "\"" + command + "\""
123 else:
124 # do not show "Borken pipe message" at exit on unix/linux
125 command += " 2>/dev/null"
127 def __init__(self, restart_on_free, pickle_proto):
128 """Initializes local worker"""
129 self.restart_on_free = restart_on_free
130 self.pickle_proto = pickle_proto
131 self.start()
133 def start(self):
134 """Starts local worker"""
135 if _USE_SUBPROCESS:
136 proc = subprocess.Popen(self.command, stdin=subprocess.PIPE, \
137 stdout=subprocess.PIPE, stderr=subprocess.PIPE, \
138 shell=True)
139 self.t = pptransport.CPipeTransport(proc.stdout, proc.stdin)
140 else:
141 self.t = pptransport.CPipeTransport(\
142 *popen2.popen3(self.command)[:2])
144 self.pid = int(self.t.receive())
145 self.t.send(str(self.pickle_proto))
146 self.is_free = True
148 def stop(self):
149 """Stops local worker"""
150 self.is_free = False
151 self.t.close()
153 def restart(self):
154 """Restarts local worker"""
155 self.stop()
156 self.start()
158 def free(self):
159 """Frees local worker"""
160 if self.restart_on_free:
161 self.restart()
162 else:
163 self.is_free = True
166 class _RWorker(pptransport.CSocketTransport):
167 """Remote worker class
170 def __init__(self, host, port, secret, message=None, persistent=True):
171 """Initializes remote worker"""
172 self.persistent = persistent
173 self.host = host
174 self.port = port
175 self.secret = secret
176 self.address = (host, port)
177 self.id = host + ":" + str(port)
178 logging.debug("Creating Rworker id=%s persistent=%s"
179 % (self.id, persistent))
180 self.connect(message)
181 self.is_free = True
183 def __del__(self):
184 """Closes connection with remote server"""
185 self.close()
187 def connect(self, message=None):
188 """Connects to a remote server"""
189 while True:
190 try:
191 pptransport.SocketTransport.__init__(self)
192 self._connect(self.host, self.port)
193 if not self.authenticate(self.secret):
194 logging.error("Authentication failed for host=%s, port=%s"
195 % (self.host, self.port))
196 return False
197 if message:
198 self.send(message)
199 self.is_free = True
200 return True
201 except:
202 if not self.persistent:
203 logging.debug("Deleting from queue Rworker %s"
204 % (self.id, ))
205 return False
206 # print sys.excepthook(*sys.exc_info())
207 logging.debug("Failed to reconnect with " \
208 "(host=%s, port=%i), will try again in %i s"
209 % (self.host, self.port, _RECONNECT_WAIT_TIME))
210 time.sleep(_RECONNECT_WAIT_TIME)
213 class _Statistics(object):
214 """Class to hold execution statisitcs for a single node
217 def __init__(self, ncpus, rworker=None):
218 """Initializes statistics for a node"""
219 self.ncpus = ncpus
220 self.time = 0.0
221 self.njobs = 0
222 self.rworker = rworker
225 class Template(object):
226 """Template class
229 def __init__(self, job_server, func, depfuncs=(), modules=(),
230 callback=None, callbackargs=(), group='default', globals=None):
231 """Creates Template instance
233 jobs_server - pp server for submitting jobs
234 func - function to be executed
235 depfuncs - tuple with functions which might be called from 'func'
236 modules - tuple with module names to import
237 callback - callback function which will be called with argument
238 list equal to callbackargs+(result,)
239 as soon as calculation is done
240 callbackargs - additional arguments for callback function
241 group - job group, is used when wait(group) is called to wait for
242 jobs in a given group to finish
243 globals - dictionary from which all modules, functions and classes
244 will be imported, for instance: globals=globals()"""
245 self.job_server = job_server
246 self.func = func
247 self.depfuncs = depfuncs
248 self.modules = modules
249 self.callback = callback
250 self.callbackargs = callbackargs
251 self.group = group
252 self.globals = globals
254 def submit(self, *args):
255 """Submits function with *arg arguments to the execution queue
257 return self.job_server.submit(self.func, args, self.depfuncs,
258 self.modules, self.callback, self.callbackargs,
259 self.group, self.globals)
262 class Server(object):
263 """Parallel Python SMP execution server class
266 default_port = 60000
267 default_secret = "epo20pdosl;dksldkmm"
269 def __init__(self, ncpus="autodetect", ppservers=(), secret=None,
270 loglevel=logging.WARNING, logstream=sys.stderr,
271 restart=False, proto=0):
272 """Creates Server instance
274 ncpus - the number of worker processes to start on the local
275 computer, if parameter is omitted it will be set to
276 the number of processors in the system
277 ppservers - list of active parallel python execution servers
278 to connect with
279 secret - passphrase for network connections, if omitted a default
280 passphrase will be used. It's highly recommended to use a
281 custom passphrase for all network connections.
282 loglevel - logging level
283 logstream - log stream destination
284 restart - wheather to restart worker process after each task completion
285 proto - protocol number for pickle module
287 With ncpus = 1 all tasks are executed consequently
288 For the best performance either use the default "autodetect" value
289 or set ncpus to the total number of processors in the system
292 if not isinstance(ppservers, tuple):
293 raise TypeError("ppservers argument must be a tuple")
295 self.__initlog(loglevel, logstream)
296 logging.debug("Creating server instance (pp-" + version+")")
297 self.__tid = 0
298 self.__active_tasks = 0
299 self.__active_tasks_lock = _thread.allocate_lock()
300 self.__queue = []
301 self.__queue_lock = _thread.allocate_lock()
302 self.__workers = []
303 self.__rworkers = []
304 self.__rworkers_reserved = []
305 self.__rworkers_reserved4 = []
306 self.__sourcesHM = {}
307 self.__sfuncHM = {}
308 self.__waittasks = []
309 self.__waittasks_lock = _thread.allocate_lock()
310 self.__exiting = False
311 self.__accurate_stats = True
312 self.autopp_list = {}
313 self.__active_rworkers_list_lock = _thread.allocate_lock()
314 self.__restart_on_free = restart
315 self.__pickle_proto = proto
317 # add local directory and sys.path to PYTHONPATH
318 pythondirs = [os.getcwd()] + sys.path
320 if "PYTHONPATH" in os.environ and os.environ["PYTHONPATH"]:
321 pythondirs += os.environ["PYTHONPATH"].split(os.pathsep)
322 os.environ["PYTHONPATH"] = os.pathsep.join(set(pythondirs))
324 atexit.register(self.destroy)
325 self.__stats = {"local": _Statistics(0)}
326 self.set_ncpus(ncpus)
328 self.ppservers = []
329 self.auto_ppservers = []
331 for ppserver in ppservers:
332 ppserver = ppserver.split(":")
333 host = ppserver[0]
334 if len(ppserver)>1:
335 port = int(ppserver[1])
336 else:
337 port = Server.default_port
338 if host.find("*") == -1:
339 self.ppservers.append((host, port))
340 else:
341 if host == "*":
342 host = "*.*.*.*"
343 interface = host.replace("*", "0")
344 broadcast = host.replace("*", "255")
345 self.auto_ppservers.append(((interface, port),
346 (broadcast, port)))
347 self.__stats_lock = _thread.allocate_lock()
348 if secret is not None:
349 if not isinstance(secret, bytes):
350 raise TypeError("secret must be of a string type")
351 self.secret = str(secret)
352 elif hasattr(user, "pp_secret"):
353 secret = user["pp_secret"]
354 if not isinstance(secret, bytes):
355 raise TypeError("secret must be of a string type")
356 self.secret = str(secret)
357 else:
358 self.secret = Server.default_secret
359 self.__connect()
360 self.__creation_time = time.time()
361 logging.info("pp local server started with %d workers"
362 % (self.__ncpus, ))
364 def submit(self, func, args=(), depfuncs=(), modules=(),
365 callback=None, callbackargs=(), group='default', globals=None):
366 """Submits function to the execution queue
368 func - function to be executed
369 args - tuple with arguments of the 'func'
370 depfuncs - tuple with functions which might be called from 'func'
371 modules - tuple with module names to import
372 callback - callback function which will be called with argument
373 list equal to callbackargs+(result,)
374 as soon as calculation is done
375 callbackargs - additional arguments for callback function
376 group - job group, is used when wait(group) is called to wait for
377 jobs in a given group to finish
378 globals - dictionary from which all modules, functions and classes
379 will be imported, for instance: globals=globals()
382 # perform some checks for frequent mistakes
383 if self.__exiting:
384 raise RuntimeError("Cannot submit jobs: server"\
385 " instance has been destroyed")
387 if not isinstance(args, tuple):
388 raise TypeError("args argument must be a tuple")
390 if not isinstance(depfuncs, tuple):
391 raise TypeError("depfuncs argument must be a tuple")
393 if not isinstance(modules, tuple):
394 raise TypeError("modules argument must be a tuple")
396 if not isinstance(callbackargs, tuple):
397 raise TypeError("callbackargs argument must be a tuple")
399 for module in modules:
400 if not isinstance(module, bytes):
401 raise TypeError("modules argument must be a list of strings")
403 tid = self.__gentid()
405 if globals:
406 modules += tuple(self.__find_modules("", globals))
407 modules = tuple(set(modules))
408 self.__logger.debug("Task %i will autoimport next modules: %s" %
409 (tid, str(modules)))
410 for object1 in list(globals.values()):
411 if isinstance(object1, types.FunctionType) \
412 or isinstance(object1, type):
413 depfuncs += (object1, )
415 task = _Task(self, tid, callback, callbackargs, group)
417 self.__waittasks_lock.acquire()
418 self.__waittasks.append(task)
419 self.__waittasks_lock.release()
421 # if the function is a method of a class add self to the arguments list
422 if isinstance(func, types.MethodType) and func.__self__ is not None:
423 args = (func.__self__, ) + args
425 # if there is an instance of a user deined class in the arguments add
426 # whole class to dependancies
427 for arg in args:
428 # Checks for both classic or new class instances
429 if isinstance(arg, types.InstanceType) \
430 or str(type(arg))[:6] == "<class":
431 depfuncs += (arg.__class__, )
433 # if there is a function in the arguments add this
434 # function to dependancies
435 for arg in args:
436 if isinstance(arg, types.FunctionType):
437 depfuncs += (arg, )
439 sfunc = self.__dumpsfunc((func, ) + depfuncs, modules)
440 sargs = pickle.dumps(args, self.__pickle_proto)
442 self.__queue_lock.acquire()
443 self.__queue.append((task, sfunc, sargs))
444 self.__queue_lock.release()
446 self.__logger.debug("Task %i submited, function='%s'" %
447 (tid, func.__name__))
448 self.__scheduler()
449 return task
451 def wait(self, group=None):
452 """Waits for all jobs in a given group to finish.
453 If group is omitted waits for all jobs to finish
455 while True:
456 self.__waittasks_lock.acquire()
457 for task in self.__waittasks:
458 if not group or task.group == group:
459 self.__waittasks_lock.release()
460 task.wait()
461 break
462 else:
463 self.__waittasks_lock.release()
464 break
466 def get_ncpus(self):
467 """Returns the number of local worker processes (ppworkers)"""
468 return self.__ncpus
470 def set_ncpus(self, ncpus="autodetect"):
471 """Sets the number of local worker processes (ppworkers)
473 ncpus - the number of worker processes, if parammeter is omitted
474 it will be set to the number of processors in the system"""
475 if ncpus == "autodetect":
476 ncpus = self.__detect_ncpus()
477 if not isinstance(ncpus, int):
478 raise TypeError("ncpus must have 'int' type")
479 if ncpus < 0:
480 raise ValueError("ncpus must be an integer > 0")
481 if ncpus > len(self.__workers):
482 self.__workers.extend([_Worker(self.__restart_on_free,
483 self.__pickle_proto) for x in\
484 range(ncpus - len(self.__workers))])
485 self.__stats["local"].ncpus = ncpus
486 self.__ncpus = ncpus
488 def get_active_nodes(self):
489 """Returns active nodes as a dictionary
490 [keys - nodes, values - ncpus]"""
491 active_nodes = {}
492 for node, stat in list(self.__stats.items()):
493 if node == "local" or node in self.autopp_list \
494 and self.autopp_list[node]:
495 active_nodes[node] = stat.ncpus
496 return active_nodes
498 def get_stats(self):
499 """Returns job execution statistics as a dictionary"""
500 for node, stat in list(self.__stats.items()):
501 if stat.rworker:
502 try:
503 stat.rworker.send("TIME")
504 stat.time = float(stat.rworker.receive())
505 except:
506 self.__accurate_stats = False
507 stat.time = 0.0
508 return self.__stats
510 def print_stats(self):
511 """Prints job execution statistics. Useful for benchmarking on
512 clusters"""
514 print("Job execution statistics:")
515 walltime = time.time()-self.__creation_time
516 statistics = list(self.get_stats().items())
517 totaljobs = 0.0
518 for ppserver, stat in statistics:
519 totaljobs += stat.njobs
520 print(" job count | % of all jobs | job time sum | " \
521 "time per job | job server")
522 for ppserver, stat in statistics:
523 if stat.njobs:
524 print(" %6i | %6.2f | %8.4f | %11.6f | %s" \
525 % (stat.njobs, 100.0*stat.njobs/totaljobs, stat.time,
526 stat.time/stat.njobs, ppserver, ))
527 print("Time elapsed since server creation", walltime)
529 if not self.__accurate_stats:
530 print("WARNING: statistics provided above is not accurate" \
531 " due to job rescheduling")
532 print()
534 # all methods below are for internal use only
536 def insert(self, sfunc, sargs, task=None):
537 """Inserts function into the execution queue. It's intended for
538 internal use only (ppserver.py).
540 if not task:
541 tid = self.__gentid()
542 task = _Task(self, tid)
543 self.__queue_lock.acquire()
544 self.__queue.append((task, sfunc, sargs))
545 self.__queue_lock.release()
547 self.__logger.debug("Task %i inserted" % (task.tid, ))
548 self.__scheduler()
549 return task
551 def connect1(self, host, port, persistent=True):
552 """Conects to a remote ppserver specified by host and port"""
553 try:
554 rworker = _RWorker(host, port, self.secret, "STAT", persistent)
555 ncpus = int(rworker.receive())
556 hostid = host+":"+str(port)
557 self.__stats[hostid] = _Statistics(ncpus, rworker)
559 for x in range(ncpus):
560 rworker = _RWorker(host, port, self.secret, "EXEC", persistent)
561 self.__update_active_rworkers(rworker.id, 1)
562 # append is atomic - no need to lock self.__rworkers
563 self.__rworkers.append(rworker)
564 #creating reserved rworkers
565 for x in range(ncpus):
566 rworker = _RWorker(host, port, self.secret, "EXEC", persistent)
567 self.__update_active_rworkers(rworker.id, 1)
568 self.__rworkers_reserved.append(rworker)
569 #creating reserved4 rworkers
570 for x in range(ncpus*0):
571 rworker = _RWorker(host, port, self.secret, "EXEC", persistent)
572 # self.__update_active_rworkers(rworker.id, 1)
573 self.__rworkers_reserved4.append(rworker)
574 logging.debug("Connected to ppserver (host=%s, port=%i) \
575 with %i workers" % (host, port, ncpus))
576 self.__scheduler()
577 except:
578 pass
579 # sys.excepthook(*sys.exc_info())
581 def __connect(self):
582 """Connects to all remote ppservers"""
583 for ppserver in self.ppservers:
584 _thread.start_new_thread(self.connect1, ppserver)
586 discover = ppauto.Discover(self, True)
587 for ppserver in self.auto_ppservers:
588 _thread.start_new_thread(discover.run, ppserver)
590 def __detect_ncpus(self):
591 """Detects the number of effective CPUs in the system"""
592 #for Linux, Unix and MacOS
593 if hasattr(os, "sysconf"):
594 if "SC_NPROCESSORS_ONLN" in os.sysconf_names:
595 #Linux and Unix
596 ncpus = os.sysconf("SC_NPROCESSORS_ONLN")
597 if isinstance(ncpus, int) and ncpus > 0:
598 return ncpus
599 else:
600 #MacOS X
601 return int(os.popen2("sysctl -n hw.ncpu")[1].read())
602 #for Windows
603 if "NUMBER_OF_PROCESSORS" in os.environ:
604 ncpus = int(os.environ["NUMBER_OF_PROCESSORS"])
605 if ncpus > 0:
606 return ncpus
607 #return the default value
608 return 1
610 def __initlog(self, loglevel, logstream):
611 """Initializes logging facility"""
612 log_handler = logging.StreamHandler(logstream)
613 log_handler.setLevel(loglevel)
614 LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s'
615 log_handler.setFormatter(logging.Formatter(LOG_FORMAT))
616 self.__logger = logging.getLogger('')
617 self.__logger.addHandler(log_handler)
618 self.__logger.setLevel(loglevel)
620 def __dumpsfunc(self, funcs, modules):
621 """Serializes functions and modules"""
622 hashs = hash(funcs + modules)
623 if hashs not in self.__sfuncHM:
624 sources = [self.__get_source(func) for func in funcs]
625 self.__sfuncHM[hashs] = pickle.dumps(
626 (funcs[0].__name__, sources, modules),
627 self.__pickle_proto)
628 return self.__sfuncHM[hashs]
630 def __find_modules(self, prefix, dict):
631 """recursively finds all the modules in dict"""
632 modules = []
633 for name, object in list(dict.items()):
634 if isinstance(object, types.ModuleType) \
635 and name not in ("__builtins__", "pp"):
636 if object.__name__ == prefix+name or prefix == "":
637 modules.append(object.__name__)
638 modules.extend(self.__find_modules(
639 object.__name__+".", object.__dict__))
640 return modules
642 def __scheduler(self):
643 """Schedules jobs for execution"""
644 self.__queue_lock.acquire()
645 while self.__queue:
646 if self.__active_tasks < self.__ncpus:
647 #TODO: select a job number on the basis of heuristic
648 task = self.__queue.pop(0)
649 for worker in self.__workers:
650 if worker.is_free:
651 worker.is_free = False
652 break
653 else:
654 self.__logger.error("There are no free workers left")
655 raise RuntimeError("Error: No free workers")
656 self.__add_to_active_tasks(1)
657 try:
658 self.__stats["local"].njobs += 1
659 _thread.start_new_thread(self.__run, task+(worker, ))
660 except:
661 pass
662 else:
663 for rworker in self.__rworkers:
664 if rworker.is_free:
665 rworker.is_free = False
666 task = self.__queue.pop(0)
667 self.__stats[rworker.id].njobs += 1
668 _thread.start_new_thread(self.__rrun, task+(rworker, ))
669 break
670 else:
671 if len(self.__queue) > self.__ncpus:
672 for rworker in self.__rworkers_reserved:
673 if rworker.is_free:
674 rworker.is_free = False
675 task = self.__queue.pop(0)
676 self.__stats[rworker.id].njobs += 1
677 _thread.start_new_thread(self.__rrun,
678 task+(rworker, ))
679 break
680 else:
681 break
682 # this code will not be executed
683 # and is left for further releases
684 if len(self.__queue) > self.__ncpus*0:
685 for rworker in self.__rworkers_reserved4:
686 if rworker.is_free:
687 rworker.is_free = False
688 task = self.__queue.pop(0)
689 self.__stats[rworker.id].njobs += 1
690 _thread.start_new_thread(self.__rrun,
691 task+(rworker, ))
692 break
693 else:
694 break
696 self.__queue_lock.release()
698 def __get_source(self, func):
699 """Fetches source of the function"""
700 hashf = hash(func)
701 if hashf not in self.__sourcesHM:
702 #get lines of the source and adjust indent
703 sourcelines = inspect.getsourcelines(func)[0]
704 #remove indentation from the first line
705 sourcelines[0] = sourcelines[0].lstrip()
706 self.__sourcesHM[hashf] = "".join(sourcelines)
707 return self.__sourcesHM[hashf]
709 def __rrun(self, job, sfunc, sargs, rworker):
710 """Runs a job remotelly"""
711 self.__logger.debug("Task (remote) %i started" % (job.tid, ))
713 try:
714 rworker.csend(sfunc)
715 rworker.send(sargs)
716 sresult = rworker.receive()
717 rworker.is_free = True
718 except:
719 self.__logger.debug("Task %i failed due to broken network " \
720 "connection - rescheduling" % (job.tid, ))
721 self.insert(sfunc, sargs, job)
722 self.__scheduler()
723 self.__update_active_rworkers(rworker.id, -1)
724 if rworker.connect("EXEC"):
725 self.__update_active_rworkers(rworker.id, 1)
726 self.__scheduler()
727 return
729 job.finalize(sresult)
731 # remove the job from the waiting list
732 if self.__waittasks:
733 self.__waittasks_lock.acquire()
734 self.__waittasks.remove(job)
735 self.__waittasks_lock.release()
737 self.__logger.debug("Task (remote) %i ended" % (job.tid, ))
738 self.__scheduler()
740 def __run(self, job, sfunc, sargs, worker):
741 """Runs a job locally"""
743 if self.__exiting:
744 return
745 self.__logger.debug("Task %i started" % (job.tid, ))
747 start_time = time.time()
749 try:
750 worker.t.csend(sfunc)
751 worker.t.send(sargs)
752 sresult = worker.t.receive()
753 except:
754 if self.__exiting:
755 return
756 else:
757 sys.excepthook(*sys.exc_info())
759 worker.free()
761 job.finalize(sresult)
763 # remove the job from the waiting list
764 if self.__waittasks:
765 self.__waittasks_lock.acquire()
766 self.__waittasks.remove(job)
767 self.__waittasks_lock.release()
769 self.__add_to_active_tasks(-1)
770 if not self.__exiting:
771 self.__stat_add_time("local", time.time()-start_time)
772 self.__logger.debug("Task %i ended" % (job.tid, ))
773 self.__scheduler()
775 def __add_to_active_tasks(self, num):
776 """Updates the number of active tasks"""
777 self.__active_tasks_lock.acquire()
778 self.__active_tasks += num
779 self.__active_tasks_lock.release()
781 def __stat_add_time(self, node, time_add):
782 """Updates total runtime on the node"""
783 self.__stats_lock.acquire()
784 self.__stats[node].time += time_add
785 self.__stats_lock.release()
787 def __stat_add_job(self, node):
788 """Increments job count on the node"""
789 self.__stats_lock.acquire()
790 self.__stats[node].njobs += 1
791 self.__stats_lock.release()
793 def __update_active_rworkers(self, id, count):
794 """Updates list of active rworkers"""
795 self.__active_rworkers_list_lock.acquire()
797 if id not in self.autopp_list:
798 self.autopp_list[id] = 0
799 self.autopp_list[id] += count
801 self.__active_rworkers_list_lock.release()
803 def __gentid(self):
804 """Generates a unique job ID number"""
805 self.__tid += 1
806 return self.__tid - 1
808 def destroy(self):
809 """Kills ppworkers and closes open files"""
810 self.__exiting = True
811 self.__queue_lock.acquire()
812 self.__queue = []
813 self.__queue_lock.release()
815 for worker in self.__workers:
816 worker.t.exiting = True
817 if sys.platform.startswith("win"):
818 os.popen('TASKKILL /PID '+str(worker.pid)+' /F')
819 else:
820 try:
821 os.kill(worker.pid, 9)
822 os.waitpid(worker.pid, 0)
823 except:
824 pass