1 # -*- coding: utf-8 -*-
2 # Copyright (c) 2009 Ondřej Súkup
3 # Parallel Python Software: http://www.parallelpython.com
4 # Copyright (c) 2005-2009, Vitalii Vanovschi
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions are met:
8 # * Redistributions of source code must retain the above copyright notice,
9 # this list of conditions and the following disclaimer.
10 # * Redistributions in binary form must reproduce the above copyright
11 # notice, this list of conditions and the following disclaimer in the
12 # documentation and/or other materials provided with the distribution.
13 # * Neither the name of the author nor the names of its contributors
14 # may be used to endorse or promote products derived from this software
15 # without specific prior written permission.
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
27 # THE POSSIBILITY OF SUCH DAMAGE.
38 import pickle
as pickle
42 copyright
= "Copyright (c) 2009 Ondřej Súkup / \
43 Copyright (c) 2005-2009 Vitalii Vanovschi. All rights reserved"
46 # reconnect persistent rworkers in 5 sec
47 _RECONNECT_WAIT_TIME
= 5
49 _USE_SUBPROCESS
= False
52 _USE_SUBPROCESS
= True
58 """Class describing single task (job)
61 def __init__(self
, server
, tid
, callback
=None,
62 callbackargs
=(), group
='default'):
63 """Initializes the task"""
64 self
.lock
= _thread
.allocate_lock()
68 self
.callback
= callback
69 self
.callbackargs
= callbackargs
72 self
.unpickled
= False
74 def finalize(self
, sresult
):
75 """Finalizes the task.
77 For internal use only"""
78 self
.sresult
= sresult
84 def __call__(self
, raw_result
=False):
85 """Retrieves result of the task"""
88 if not self
.unpickled
and not raw_result
:
97 """Waits for the task"""
102 def __unpickle(self
):
103 """Unpickles the result of the task"""
104 self
.result
, sout
= pickle
.loads(self
.sresult
)
105 self
.unpickled
= True
109 args
= self
.callbackargs
+ (self
.result
, )
113 class _Worker(object):
114 """Local worker class
116 command
= "\"" + sys
.executable
+ "\" -u \"" \
117 + os
.path
.dirname(os
.path
.abspath(__file__
))\
118 + os
.sep
+ "ppworker.py\""
120 if sys
.platform
.startswith("win"):
121 # workargound for windows
122 command
= "\"" + command
+ "\""
124 # do not show "Borken pipe message" at exit on unix/linux
125 command
+= " 2>/dev/null"
127 def __init__(self
, restart_on_free
, pickle_proto
):
128 """Initializes local worker"""
129 self
.restart_on_free
= restart_on_free
130 self
.pickle_proto
= pickle_proto
134 """Starts local worker"""
136 proc
= subprocess
.Popen(self
.command
, stdin
=subprocess
.PIPE
, \
137 stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
, \
139 self
.t
= pptransport
.CPipeTransport(proc
.stdout
, proc
.stdin
)
141 self
.t
= pptransport
.CPipeTransport(\
142 *popen2
.popen3(self
.command
)[:2])
144 self
.pid
= int(self
.t
.receive())
145 self
.t
.send(str(self
.pickle_proto
))
149 """Stops local worker"""
154 """Restarts local worker"""
159 """Frees local worker"""
160 if self
.restart_on_free
:
166 class _RWorker(pptransport
.CSocketTransport
):
167 """Remote worker class
170 def __init__(self
, host
, port
, secret
, message
=None, persistent
=True):
171 """Initializes remote worker"""
172 self
.persistent
= persistent
176 self
.address
= (host
, port
)
177 self
.id = host
+ ":" + str(port
)
178 logging
.debug("Creating Rworker id=%s persistent=%s"
179 % (self
.id, persistent
))
180 self
.connect(message
)
184 """Closes connection with remote server"""
187 def connect(self
, message
=None):
188 """Connects to a remote server"""
191 pptransport
.SocketTransport
.__init
__(self
)
192 self
._connect
(self
.host
, self
.port
)
193 if not self
.authenticate(self
.secret
):
194 logging
.error("Authentication failed for host=%s, port=%s"
195 % (self
.host
, self
.port
))
202 if not self
.persistent
:
203 logging
.debug("Deleting from queue Rworker %s"
206 # print sys.excepthook(*sys.exc_info())
207 logging
.debug("Failed to reconnect with " \
208 "(host=%s, port=%i), will try again in %i s"
209 % (self
.host
, self
.port
, _RECONNECT_WAIT_TIME
))
210 time
.sleep(_RECONNECT_WAIT_TIME
)
213 class _Statistics(object):
214 """Class to hold execution statisitcs for a single node
217 def __init__(self
, ncpus
, rworker
=None):
218 """Initializes statistics for a node"""
222 self
.rworker
= rworker
225 class Template(object):
229 def __init__(self
, job_server
, func
, depfuncs
=(), modules
=(),
230 callback
=None, callbackargs
=(), group
='default', globals=None):
231 """Creates Template instance
233 jobs_server - pp server for submitting jobs
234 func - function to be executed
235 depfuncs - tuple with functions which might be called from 'func'
236 modules - tuple with module names to import
237 callback - callback function which will be called with argument
238 list equal to callbackargs+(result,)
239 as soon as calculation is done
240 callbackargs - additional arguments for callback function
241 group - job group, is used when wait(group) is called to wait for
242 jobs in a given group to finish
243 globals - dictionary from which all modules, functions and classes
244 will be imported, for instance: globals=globals()"""
245 self
.job_server
= job_server
247 self
.depfuncs
= depfuncs
248 self
.modules
= modules
249 self
.callback
= callback
250 self
.callbackargs
= callbackargs
252 self
.globals = globals
254 def submit(self
, *args
):
255 """Submits function with *arg arguments to the execution queue
257 return self
.job_server
.submit(self
.func
, args
, self
.depfuncs
,
258 self
.modules
, self
.callback
, self
.callbackargs
,
259 self
.group
, self
.globals)
262 class Server(object):
263 """Parallel Python SMP execution server class
267 default_secret
= "epo20pdosl;dksldkmm"
269 def __init__(self
, ncpus
="autodetect", ppservers
=(), secret
=None,
270 loglevel
=logging
.WARNING
, logstream
=sys
.stderr
,
271 restart
=False, proto
=0):
272 """Creates Server instance
274 ncpus - the number of worker processes to start on the local
275 computer, if parameter is omitted it will be set to
276 the number of processors in the system
277 ppservers - list of active parallel python execution servers
279 secret - passphrase for network connections, if omitted a default
280 passphrase will be used. It's highly recommended to use a
281 custom passphrase for all network connections.
282 loglevel - logging level
283 logstream - log stream destination
284 restart - wheather to restart worker process after each task completion
285 proto - protocol number for pickle module
287 With ncpus = 1 all tasks are executed consequently
288 For the best performance either use the default "autodetect" value
289 or set ncpus to the total number of processors in the system
292 if not isinstance(ppservers
, tuple):
293 raise TypeError("ppservers argument must be a tuple")
295 self
.__initlog
(loglevel
, logstream
)
296 logging
.debug("Creating server instance (pp-" + version
+")")
298 self
.__active
_tasks
= 0
299 self
.__active
_tasks
_lock
= _thread
.allocate_lock()
301 self
.__queue
_lock
= _thread
.allocate_lock()
304 self
.__rworkers
_reserved
= []
305 self
.__rworkers
_reserved
4 = []
306 self
.__sourcesHM
= {}
308 self
.__waittasks
= []
309 self
.__waittasks
_lock
= _thread
.allocate_lock()
310 self
.__exiting
= False
311 self
.__accurate
_stats
= True
312 self
.autopp_list
= {}
313 self
.__active
_rworkers
_list
_lock
= _thread
.allocate_lock()
314 self
.__restart
_on
_free
= restart
315 self
.__pickle
_proto
= proto
317 # add local directory and sys.path to PYTHONPATH
318 pythondirs
= [os
.getcwd()] + sys
.path
320 if "PYTHONPATH" in os
.environ
and os
.environ
["PYTHONPATH"]:
321 pythondirs
+= os
.environ
["PYTHONPATH"].split(os
.pathsep
)
322 os
.environ
["PYTHONPATH"] = os
.pathsep
.join(set(pythondirs
))
324 atexit
.register(self
.destroy
)
325 self
.__stats
= {"local": _Statistics(0)}
326 self
.set_ncpus(ncpus
)
329 self
.auto_ppservers
= []
331 for ppserver
in ppservers
:
332 ppserver
= ppserver
.split(":")
335 port
= int(ppserver
[1])
337 port
= Server
.default_port
338 if host
.find("*") == -1:
339 self
.ppservers
.append((host
, port
))
343 interface
= host
.replace("*", "0")
344 broadcast
= host
.replace("*", "255")
345 self
.auto_ppservers
.append(((interface
, port
),
347 self
.__stats
_lock
= _thread
.allocate_lock()
348 if secret
is not None:
349 if not isinstance(secret
, bytes
):
350 raise TypeError("secret must be of a string type")
351 self
.secret
= str(secret
)
352 elif hasattr(user
, "pp_secret"):
353 secret
= user
["pp_secret"]
354 if not isinstance(secret
, bytes
):
355 raise TypeError("secret must be of a string type")
356 self
.secret
= str(secret
)
358 self
.secret
= Server
.default_secret
360 self
.__creation
_time
= time
.time()
361 logging
.info("pp local server started with %d workers"
364 def submit(self
, func
, args
=(), depfuncs
=(), modules
=(),
365 callback
=None, callbackargs
=(), group
='default', globals=None):
366 """Submits function to the execution queue
368 func - function to be executed
369 args - tuple with arguments of the 'func'
370 depfuncs - tuple with functions which might be called from 'func'
371 modules - tuple with module names to import
372 callback - callback function which will be called with argument
373 list equal to callbackargs+(result,)
374 as soon as calculation is done
375 callbackargs - additional arguments for callback function
376 group - job group, is used when wait(group) is called to wait for
377 jobs in a given group to finish
378 globals - dictionary from which all modules, functions and classes
379 will be imported, for instance: globals=globals()
382 # perform some checks for frequent mistakes
384 raise RuntimeError("Cannot submit jobs: server"\
385 " instance has been destroyed")
387 if not isinstance(args
, tuple):
388 raise TypeError("args argument must be a tuple")
390 if not isinstance(depfuncs
, tuple):
391 raise TypeError("depfuncs argument must be a tuple")
393 if not isinstance(modules
, tuple):
394 raise TypeError("modules argument must be a tuple")
396 if not isinstance(callbackargs
, tuple):
397 raise TypeError("callbackargs argument must be a tuple")
399 for module
in modules
:
400 if not isinstance(module
, bytes
):
401 raise TypeError("modules argument must be a list of strings")
403 tid
= self
.__gentid
()
406 modules
+= tuple(self
.__find
_modules
("", globals))
407 modules
= tuple(set(modules
))
408 self
.__logger
.debug("Task %i will autoimport next modules: %s" %
410 for object1
in list(globals.values()):
411 if isinstance(object1
, types
.FunctionType
) \
412 or isinstance(object1
, type):
413 depfuncs
+= (object1
, )
415 task
= _Task(self
, tid
, callback
, callbackargs
, group
)
417 self
.__waittasks
_lock
.acquire()
418 self
.__waittasks
.append(task
)
419 self
.__waittasks
_lock
.release()
421 # if the function is a method of a class add self to the arguments list
422 if isinstance(func
, types
.MethodType
) and func
.__self
__ is not None:
423 args
= (func
.__self
__, ) + args
425 # if there is an instance of a user deined class in the arguments add
426 # whole class to dependancies
428 # Checks for both classic or new class instances
429 if isinstance(arg
, types
.InstanceType
) \
430 or str(type(arg
))[:6] == "<class":
431 depfuncs
+= (arg
.__class
__, )
433 # if there is a function in the arguments add this
434 # function to dependancies
436 if isinstance(arg
, types
.FunctionType
):
439 sfunc
= self
.__dumpsfunc
((func
, ) + depfuncs
, modules
)
440 sargs
= pickle
.dumps(args
, self
.__pickle
_proto
)
442 self
.__queue
_lock
.acquire()
443 self
.__queue
.append((task
, sfunc
, sargs
))
444 self
.__queue
_lock
.release()
446 self
.__logger
.debug("Task %i submited, function='%s'" %
447 (tid
, func
.__name
__))
451 def wait(self
, group
=None):
452 """Waits for all jobs in a given group to finish.
453 If group is omitted waits for all jobs to finish
456 self
.__waittasks
_lock
.acquire()
457 for task
in self
.__waittasks
:
458 if not group
or task
.group
== group
:
459 self
.__waittasks
_lock
.release()
463 self
.__waittasks
_lock
.release()
467 """Returns the number of local worker processes (ppworkers)"""
470 def set_ncpus(self
, ncpus
="autodetect"):
471 """Sets the number of local worker processes (ppworkers)
473 ncpus - the number of worker processes, if parammeter is omitted
474 it will be set to the number of processors in the system"""
475 if ncpus
== "autodetect":
476 ncpus
= self
.__detect
_ncpus
()
477 if not isinstance(ncpus
, int):
478 raise TypeError("ncpus must have 'int' type")
480 raise ValueError("ncpus must be an integer > 0")
481 if ncpus
> len(self
.__workers
):
482 self
.__workers
.extend([_Worker(self
.__restart
_on
_free
,
483 self
.__pickle
_proto
) for x
in\
484 range(ncpus
- len(self
.__workers
))])
485 self
.__stats
["local"].ncpus
= ncpus
488 def get_active_nodes(self
):
489 """Returns active nodes as a dictionary
490 [keys - nodes, values - ncpus]"""
492 for node
, stat
in list(self
.__stats
.items()):
493 if node
== "local" or node
in self
.autopp_list \
494 and self
.autopp_list
[node
]:
495 active_nodes
[node
] = stat
.ncpus
499 """Returns job execution statistics as a dictionary"""
500 for node
, stat
in list(self
.__stats
.items()):
503 stat
.rworker
.send("TIME")
504 stat
.time
= float(stat
.rworker
.receive())
506 self
.__accurate
_stats
= False
510 def print_stats(self
):
511 """Prints job execution statistics. Useful for benchmarking on
514 print("Job execution statistics:")
515 walltime
= time
.time()-self
.__creation
_time
516 statistics
= list(self
.get_stats().items())
518 for ppserver
, stat
in statistics
:
519 totaljobs
+= stat
.njobs
520 print(" job count | % of all jobs | job time sum | " \
521 "time per job | job server")
522 for ppserver
, stat
in statistics
:
524 print(" %6i | %6.2f | %8.4f | %11.6f | %s" \
525 % (stat
.njobs
, 100.0*stat
.njobs
/totaljobs
, stat
.time
,
526 stat
.time
/stat
.njobs
, ppserver
, ))
527 print("Time elapsed since server creation", walltime
)
529 if not self
.__accurate
_stats
:
530 print("WARNING: statistics provided above is not accurate" \
531 " due to job rescheduling")
534 # all methods below are for internal use only
536 def insert(self
, sfunc
, sargs
, task
=None):
537 """Inserts function into the execution queue. It's intended for
538 internal use only (ppserver.py).
541 tid
= self
.__gentid
()
542 task
= _Task(self
, tid
)
543 self
.__queue
_lock
.acquire()
544 self
.__queue
.append((task
, sfunc
, sargs
))
545 self
.__queue
_lock
.release()
547 self
.__logger
.debug("Task %i inserted" % (task
.tid
, ))
551 def connect1(self
, host
, port
, persistent
=True):
552 """Conects to a remote ppserver specified by host and port"""
554 rworker
= _RWorker(host
, port
, self
.secret
, "STAT", persistent
)
555 ncpus
= int(rworker
.receive())
556 hostid
= host
+":"+str(port
)
557 self
.__stats
[hostid
] = _Statistics(ncpus
, rworker
)
559 for x
in range(ncpus
):
560 rworker
= _RWorker(host
, port
, self
.secret
, "EXEC", persistent
)
561 self
.__update
_active
_rworkers
(rworker
.id, 1)
562 # append is atomic - no need to lock self.__rworkers
563 self
.__rworkers
.append(rworker
)
564 #creating reserved rworkers
565 for x
in range(ncpus
):
566 rworker
= _RWorker(host
, port
, self
.secret
, "EXEC", persistent
)
567 self
.__update
_active
_rworkers
(rworker
.id, 1)
568 self
.__rworkers
_reserved
.append(rworker
)
569 #creating reserved4 rworkers
570 for x
in range(ncpus
*0):
571 rworker
= _RWorker(host
, port
, self
.secret
, "EXEC", persistent
)
572 # self.__update_active_rworkers(rworker.id, 1)
573 self
.__rworkers
_reserved
4.append(rworker
)
574 logging
.debug("Connected to ppserver (host=%s, port=%i) \
575 with %i workers" % (host
, port
, ncpus
))
579 # sys.excepthook(*sys.exc_info())
582 """Connects to all remote ppservers"""
583 for ppserver
in self
.ppservers
:
584 _thread
.start_new_thread(self
.connect1
, ppserver
)
586 discover
= ppauto
.Discover(self
, True)
587 for ppserver
in self
.auto_ppservers
:
588 _thread
.start_new_thread(discover
.run
, ppserver
)
590 def __detect_ncpus(self
):
591 """Detects the number of effective CPUs in the system"""
592 #for Linux, Unix and MacOS
593 if hasattr(os
, "sysconf"):
594 if "SC_NPROCESSORS_ONLN" in os
.sysconf_names
:
596 ncpus
= os
.sysconf("SC_NPROCESSORS_ONLN")
597 if isinstance(ncpus
, int) and ncpus
> 0:
601 return int(os
.popen2("sysctl -n hw.ncpu")[1].read())
603 if "NUMBER_OF_PROCESSORS" in os
.environ
:
604 ncpus
= int(os
.environ
["NUMBER_OF_PROCESSORS"])
607 #return the default value
610 def __initlog(self
, loglevel
, logstream
):
611 """Initializes logging facility"""
612 log_handler
= logging
.StreamHandler(logstream
)
613 log_handler
.setLevel(loglevel
)
614 LOG_FORMAT
= '%(asctime)s %(levelname)s %(message)s'
615 log_handler
.setFormatter(logging
.Formatter(LOG_FORMAT
))
616 self
.__logger
= logging
.getLogger('')
617 self
.__logger
.addHandler(log_handler
)
618 self
.__logger
.setLevel(loglevel
)
620 def __dumpsfunc(self
, funcs
, modules
):
621 """Serializes functions and modules"""
622 hashs
= hash(funcs
+ modules
)
623 if hashs
not in self
.__sfuncHM
:
624 sources
= [self
.__get
_source
(func
) for func
in funcs
]
625 self
.__sfuncHM
[hashs
] = pickle
.dumps(
626 (funcs
[0].__name
__, sources
, modules
),
628 return self
.__sfuncHM
[hashs
]
630 def __find_modules(self
, prefix
, dict):
631 """recursively finds all the modules in dict"""
633 for name
, object in list(dict.items()):
634 if isinstance(object, types
.ModuleType
) \
635 and name
not in ("__builtins__", "pp"):
636 if object.__name
__ == prefix
+name
or prefix
== "":
637 modules
.append(object.__name
__)
638 modules
.extend(self
.__find
_modules
(
639 object.__name
__+".", object.__dict
__))
642 def __scheduler(self
):
643 """Schedules jobs for execution"""
644 self
.__queue
_lock
.acquire()
646 if self
.__active
_tasks
< self
.__ncpus
:
647 #TODO: select a job number on the basis of heuristic
648 task
= self
.__queue
.pop(0)
649 for worker
in self
.__workers
:
651 worker
.is_free
= False
654 self
.__logger
.error("There are no free workers left")
655 raise RuntimeError("Error: No free workers")
656 self
.__add
_to
_active
_tasks
(1)
658 self
.__stats
["local"].njobs
+= 1
659 _thread
.start_new_thread(self
.__run
, task
+(worker
, ))
663 for rworker
in self
.__rworkers
:
665 rworker
.is_free
= False
666 task
= self
.__queue
.pop(0)
667 self
.__stats
[rworker
.id].njobs
+= 1
668 _thread
.start_new_thread(self
.__rrun
, task
+(rworker
, ))
671 if len(self
.__queue
) > self
.__ncpus
:
672 for rworker
in self
.__rworkers
_reserved
:
674 rworker
.is_free
= False
675 task
= self
.__queue
.pop(0)
676 self
.__stats
[rworker
.id].njobs
+= 1
677 _thread
.start_new_thread(self
.__rrun
,
682 # this code will not be executed
683 # and is left for further releases
684 if len(self
.__queue
) > self
.__ncpus
*0:
685 for rworker
in self
.__rworkers
_reserved
4:
687 rworker
.is_free
= False
688 task
= self
.__queue
.pop(0)
689 self
.__stats
[rworker
.id].njobs
+= 1
690 _thread
.start_new_thread(self
.__rrun
,
696 self
.__queue
_lock
.release()
698 def __get_source(self
, func
):
699 """Fetches source of the function"""
701 if hashf
not in self
.__sourcesHM
:
702 #get lines of the source and adjust indent
703 sourcelines
= inspect
.getsourcelines(func
)[0]
704 #remove indentation from the first line
705 sourcelines
[0] = sourcelines
[0].lstrip()
706 self
.__sourcesHM
[hashf
] = "".join(sourcelines
)
707 return self
.__sourcesHM
[hashf
]
709 def __rrun(self
, job
, sfunc
, sargs
, rworker
):
710 """Runs a job remotelly"""
711 self
.__logger
.debug("Task (remote) %i started" % (job
.tid
, ))
716 sresult
= rworker
.receive()
717 rworker
.is_free
= True
719 self
.__logger
.debug("Task %i failed due to broken network " \
720 "connection - rescheduling" % (job
.tid
, ))
721 self
.insert(sfunc
, sargs
, job
)
723 self
.__update
_active
_rworkers
(rworker
.id, -1)
724 if rworker
.connect("EXEC"):
725 self
.__update
_active
_rworkers
(rworker
.id, 1)
729 job
.finalize(sresult
)
731 # remove the job from the waiting list
733 self
.__waittasks
_lock
.acquire()
734 self
.__waittasks
.remove(job
)
735 self
.__waittasks
_lock
.release()
737 self
.__logger
.debug("Task (remote) %i ended" % (job
.tid
, ))
740 def __run(self
, job
, sfunc
, sargs
, worker
):
741 """Runs a job locally"""
745 self
.__logger
.debug("Task %i started" % (job
.tid
, ))
747 start_time
= time
.time()
750 worker
.t
.csend(sfunc
)
752 sresult
= worker
.t
.receive()
757 sys
.excepthook(*sys
.exc_info())
761 job
.finalize(sresult
)
763 # remove the job from the waiting list
765 self
.__waittasks
_lock
.acquire()
766 self
.__waittasks
.remove(job
)
767 self
.__waittasks
_lock
.release()
769 self
.__add
_to
_active
_tasks
(-1)
770 if not self
.__exiting
:
771 self
.__stat
_add
_time
("local", time
.time()-start_time
)
772 self
.__logger
.debug("Task %i ended" % (job
.tid
, ))
775 def __add_to_active_tasks(self
, num
):
776 """Updates the number of active tasks"""
777 self
.__active
_tasks
_lock
.acquire()
778 self
.__active
_tasks
+= num
779 self
.__active
_tasks
_lock
.release()
781 def __stat_add_time(self
, node
, time_add
):
782 """Updates total runtime on the node"""
783 self
.__stats
_lock
.acquire()
784 self
.__stats
[node
].time
+= time_add
785 self
.__stats
_lock
.release()
787 def __stat_add_job(self
, node
):
788 """Increments job count on the node"""
789 self
.__stats
_lock
.acquire()
790 self
.__stats
[node
].njobs
+= 1
791 self
.__stats
_lock
.release()
793 def __update_active_rworkers(self
, id, count
):
794 """Updates list of active rworkers"""
795 self
.__active
_rworkers
_list
_lock
.acquire()
797 if id not in self
.autopp_list
:
798 self
.autopp_list
[id] = 0
799 self
.autopp_list
[id] += count
801 self
.__active
_rworkers
_list
_lock
.release()
804 """Generates a unique job ID number"""
806 return self
.__tid
- 1
809 """Kills ppworkers and closes open files"""
810 self
.__exiting
= True
811 self
.__queue
_lock
.acquire()
813 self
.__queue
_lock
.release()
815 for worker
in self
.__workers
:
816 worker
.t
.exiting
= True
817 if sys
.platform
.startswith("win"):
818 os
.popen('TASKKILL /PID '+str(worker
.pid
)+' /F')
821 os
.kill(worker
.pid
, 9)
822 os
.waitpid(worker
.pid
, 0)