1 """Thread module emulating a subset of Java's threading model."""
8 del _sys
.modules
[__name__
]
11 from time
import time
as _time
, sleep
as _sleep
12 from traceback
import format_exc
as _format_exc
13 from collections
import deque
15 # Rename some stuff so "from threading import *" is safe
16 __all__
= ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event',
17 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
18 'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
20 _start_new_thread
= thread
.start_new_thread
21 _allocate_lock
= thread
.allocate_lock
22 _get_ident
= thread
.get_ident
23 ThreadError
= thread
.error
27 # Debug support (adapted from ihooks.py).
28 # All the major classes here derive from _Verbose. We force that to
29 # be a new-style class so that all the major classes here are new-style.
30 # This helps debugging (type(instance) is more revealing for instances
31 # of new-style classes).
37 class _Verbose(object):
39 def __init__(self
, verbose
=None):
42 self
.__verbose
= verbose
44 def _note(self
, format
, *args
):
46 format
= format
% args
47 format
= "%s: %s\n" % (
48 currentThread().getName(), format
)
49 _sys
.stderr
.write(format
)
52 # Disable this when using "python -O"
53 class _Verbose(object):
54 def __init__(self
, verbose
=None):
56 def _note(self
, *args
):
59 # Support for profile and trace hooks
72 # Synchronization classes
76 def RLock(*args
, **kwargs
):
77 return _RLock(*args
, **kwargs
)
79 class _RLock(_Verbose
):
81 def __init__(self
, verbose
=None):
82 _Verbose
.__init
__(self
, verbose
)
83 self
.__block
= _allocate_lock()
88 return "<%s(%s, %d)>" % (
89 self
.__class
__.__name
__,
90 self
.__owner
and self
.__owner
.getName(),
93 def acquire(self
, blocking
=1):
95 if self
.__owner
is me
:
96 self
.__count
= self
.__count
+ 1
98 self
._note
("%s.acquire(%s): recursive success", self
, blocking
)
100 rc
= self
.__block
.acquire(blocking
)
105 self
._note
("%s.acquire(%s): initial success", self
, blocking
)
108 self
._note
("%s.acquire(%s): failure", self
, blocking
)
115 assert self
.__owner
is me
, "release() of un-acquire()d lock"
116 self
.__count
= count
= self
.__count
- 1
119 self
.__block
.release()
121 self
._note
("%s.release(): final release", self
)
124 self
._note
("%s.release(): non-final release", self
)
126 def __exit__(self
, t
, v
, tb
):
129 # Internal methods used by condition variables
131 def _acquire_restore(self
, (count
, owner
)):
132 self
.__block
.acquire()
136 self
._note
("%s._acquire_restore()", self
)
138 def _release_save(self
):
140 self
._note
("%s._release_save()", self
)
145 self
.__block
.release()
146 return (count
, owner
)
149 return self
.__owner
is currentThread()
152 def Condition(*args
, **kwargs
):
153 return _Condition(*args
, **kwargs
)
155 class _Condition(_Verbose
):
157 def __init__(self
, lock
=None, verbose
=None):
158 _Verbose
.__init
__(self
, verbose
)
162 # Export the lock's acquire() and release() methods
163 self
.acquire
= lock
.acquire
164 self
.release
= lock
.release
165 # If the lock defines _release_save() and/or _acquire_restore(),
166 # these override the default implementations (which just call
167 # release() and acquire() on the lock). Ditto for _is_owned().
169 self
._release
_save
= lock
._release
_save
170 except AttributeError:
173 self
._acquire
_restore
= lock
._acquire
_restore
174 except AttributeError:
177 self
._is
_owned
= lock
._is
_owned
178 except AttributeError:
183 return self
.__lock
.__enter
__()
185 def __exit__(self
, *args
):
186 return self
.__lock
.__exit
__(*args
)
189 return "<Condition(%s, %d)>" % (self
.__lock
, len(self
.__waiters
))
191 def _release_save(self
):
192 self
.__lock
.release() # No state to save
194 def _acquire_restore(self
, x
):
195 self
.__lock
.acquire() # Ignore saved state
198 # Return True if lock is owned by currentThread.
199 # This method is called only if __lock doesn't have _is_owned().
200 if self
.__lock
.acquire(0):
201 self
.__lock
.release()
206 def wait(self
, timeout
=None):
207 assert self
._is
_owned
(), "wait() of un-acquire()d lock"
208 waiter
= _allocate_lock()
210 self
.__waiters
.append(waiter
)
211 saved_state
= self
._release
_save
()
212 try: # restore state no matter what (e.g., KeyboardInterrupt)
216 self
._note
("%s.wait(): got it", self
)
218 # Balancing act: We can't afford a pure busy loop, so we
219 # have to sleep; but if we sleep the whole timeout time,
220 # we'll be unresponsive. The scheme here sleeps very
221 # little at first, longer as time goes on, but never longer
222 # than 20 times per second (or the timeout time remaining).
223 endtime
= _time() + timeout
224 delay
= 0.0005 # 500 us -> initial delay of 1 ms
226 gotit
= waiter
.acquire(0)
229 remaining
= endtime
- _time()
232 delay
= min(delay
* 2, remaining
, .05)
236 self
._note
("%s.wait(%s): timed out", self
, timeout
)
238 self
.__waiters
.remove(waiter
)
243 self
._note
("%s.wait(%s): got it", self
, timeout
)
245 self
._acquire
_restore
(saved_state
)
247 def notify(self
, n
=1):
248 assert self
._is
_owned
(), "notify() of un-acquire()d lock"
249 __waiters
= self
.__waiters
250 waiters
= __waiters
[:n
]
253 self
._note
("%s.notify(): no waiters", self
)
255 self
._note
("%s.notify(): notifying %d waiter%s", self
, n
,
257 for waiter
in waiters
:
260 __waiters
.remove(waiter
)
265 self
.notify(len(self
.__waiters
))
268 def Semaphore(*args
, **kwargs
):
269 return _Semaphore(*args
, **kwargs
)
271 class _Semaphore(_Verbose
):
273 # After Tim Peters' semaphore class, but not quite the same (no maximum)
275 def __init__(self
, value
=1, verbose
=None):
276 assert value
>= 0, "Semaphore initial value must be >= 0"
277 _Verbose
.__init
__(self
, verbose
)
278 self
.__cond
= Condition(Lock())
281 def acquire(self
, blocking
=1):
283 self
.__cond
.acquire()
284 while self
.__value
== 0:
288 self
._note
("%s.acquire(%s): blocked waiting, value=%s",
289 self
, blocking
, self
.__value
)
292 self
.__value
= self
.__value
- 1
294 self
._note
("%s.acquire: success, value=%s",
297 self
.__cond
.release()
303 self
.__cond
.acquire()
304 self
.__value
= self
.__value
+ 1
306 self
._note
("%s.release: success, value=%s",
309 self
.__cond
.release()
311 def __exit__(self
, t
, v
, tb
):
315 def BoundedSemaphore(*args
, **kwargs
):
316 return _BoundedSemaphore(*args
, **kwargs
)
318 class _BoundedSemaphore(_Semaphore
):
319 """Semaphore that checks that # releases is <= # acquires"""
320 def __init__(self
, value
=1, verbose
=None):
321 _Semaphore
.__init
__(self
, value
, verbose
)
322 self
._initial
_value
= value
325 if self
._Semaphore
__value
>= self
._initial
_value
:
326 raise ValueError, "Semaphore released too many times"
327 return _Semaphore
.release(self
)
330 def Event(*args
, **kwargs
):
331 return _Event(*args
, **kwargs
)
333 class _Event(_Verbose
):
335 # After Tim Peters' event class (without is_posted())
337 def __init__(self
, verbose
=None):
338 _Verbose
.__init
__(self
, verbose
)
339 self
.__cond
= Condition(Lock())
346 self
.__cond
.acquire()
349 self
.__cond
.notifyAll()
351 self
.__cond
.release()
354 self
.__cond
.acquire()
358 self
.__cond
.release()
360 def wait(self
, timeout
=None):
361 self
.__cond
.acquire()
364 self
.__cond
.wait(timeout
)
366 self
.__cond
.release()
368 # Helper to generate new thread names
370 def _newname(template
="Thread-%d"):
372 _counter
= _counter
+ 1
373 return template
% _counter
375 # Active thread administration
376 _active_limbo_lock
= _allocate_lock()
377 _active
= {} # maps thread id to Thread object
381 # Main class for threads
383 class Thread(_Verbose
):
385 __initialized
= False
386 # Need to store a reference to sys.exc_info for printing
387 # out exceptions when a thread tries to use a global var. during interp.
388 # shutdown and thus raises an exception about trying to perform some
389 # operation on/with a NoneType
390 __exc_info
= _sys
.exc_info
392 def __init__(self
, group
=None, target
=None, name
=None,
393 args
=(), kwargs
=None, verbose
=None):
394 assert group
is None, "group argument must be None for now"
395 _Verbose
.__init
__(self
, verbose
)
398 self
.__target
= target
399 self
.__name
= str(name
or _newname())
401 self
.__kwargs
= kwargs
402 self
.__daemonic
= self
._set
_daemon
()
403 self
.__started
= False
404 self
.__stopped
= False
405 self
.__block
= Condition(Lock())
406 self
.__initialized
= True
407 # sys.stderr is not stored in the class like
408 # sys.exc_info since it can be changed between instances
409 self
.__stderr
= _sys
.stderr
411 def _set_daemon(self
):
412 # Overridden in _MainThread and _DummyThread
413 return currentThread().isDaemon()
416 assert self
.__initialized
, "Thread.__init__() was not called"
423 status
= status
+ " daemon"
424 return "<%s(%s, %s)>" % (self
.__class
__.__name
__, self
.__name
, status
)
427 assert self
.__initialized
, "Thread.__init__() not called"
428 assert not self
.__started
, "thread already started"
430 self
._note
("%s.start(): starting thread", self
)
431 _active_limbo_lock
.acquire()
433 _active_limbo_lock
.release()
434 _start_new_thread(self
.__bootstrap
, ())
435 self
.__started
= True
436 _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack)
440 self
.__target
(*self
.__args
, **self
.__kwargs
)
442 def __bootstrap(self
):
444 self
.__started
= True
445 _active_limbo_lock
.acquire()
446 _active
[_get_ident()] = self
448 _active_limbo_lock
.release()
450 self
._note
("%s.__bootstrap(): thread started", self
)
453 self
._note
("%s.__bootstrap(): registering trace hook", self
)
454 _sys
.settrace(_trace_hook
)
456 self
._note
("%s.__bootstrap(): registering profile hook", self
)
457 _sys
.setprofile(_profile_hook
)
463 self
._note
("%s.__bootstrap(): raised SystemExit", self
)
466 self
._note
("%s.__bootstrap(): unhandled exception", self
)
467 # If sys.stderr is no more (most likely from interpreter
468 # shutdown) use self.__stderr. Otherwise still use sys (as in
469 # _sys) in case sys.stderr was redefined since the creation of
472 _sys
.stderr
.write("Exception in thread %s:\n%s\n" %
473 (self
.getName(), _format_exc()))
475 # Do the best job possible w/o a huge amt. of code to
476 # approximate a traceback (code ideas from
478 exc_type
, exc_value
, exc_tb
= self
.__exc
_info
()
480 print>>self
.__stderr
, (
481 "Exception in thread " + self
.getName() +
482 " (most likely raised during interpreter shutdown):")
483 print>>self
.__stderr
, (
484 "Traceback (most recent call last):")
486 print>>self
.__stderr
, (
487 ' File "%s", line %s, in %s' %
488 (exc_tb
.tb_frame
.f_code
.co_filename
,
490 exc_tb
.tb_frame
.f_code
.co_name
))
491 exc_tb
= exc_tb
.tb_next
492 print>>self
.__stderr
, ("%s: %s" % (exc_type
, exc_value
))
493 # Make sure that exc_tb gets deleted since it is a memory
494 # hog; deleting everything else is just for thoroughness
496 del exc_type
, exc_value
, exc_tb
499 self
._note
("%s.__bootstrap(): normal return", self
)
508 self
.__block
.acquire()
509 self
.__stopped
= True
510 self
.__block
.notifyAll()
511 self
.__block
.release()
514 "Remove current thread from the dict of currently running threads."
516 # Notes about running with dummy_thread:
518 # Must take care to not raise an exception if dummy_thread is being
519 # used (and thus this module is being used as an instance of
520 # dummy_threading). dummy_thread.get_ident() always returns -1 since
521 # there is only one thread if dummy_thread is being used. Thus
522 # len(_active) is always <= 1 here, and any Thread instance created
523 # overwrites the (if any) thread currently registered in _active.
525 # An instance of _MainThread is always created by 'threading'. This
526 # gets overwritten the instant an instance of Thread is created; both
527 # threads return -1 from dummy_thread.get_ident() and thus have the
528 # same key in the dict. So when the _MainThread instance created by
529 # 'threading' tries to clean itself up when atexit calls this method
530 # it gets a KeyError if another Thread instance was created.
532 # This all means that KeyError from trying to delete something from
533 # _active if dummy_threading is being used is a red herring. But
534 # since it isn't if dummy_threading is *not* being used then don't
535 # hide the exception.
537 _active_limbo_lock
.acquire()
540 del _active
[_get_ident()]
542 if 'dummy_threading' not in _sys
.modules
:
545 _active_limbo_lock
.release()
547 def join(self
, timeout
=None):
548 assert self
.__initialized
, "Thread.__init__() not called"
549 assert self
.__started
, "cannot join thread before it is started"
550 assert self
is not currentThread(), "cannot join current thread"
552 if not self
.__stopped
:
553 self
._note
("%s.join(): waiting until thread stops", self
)
554 self
.__block
.acquire()
557 while not self
.__stopped
:
560 self
._note
("%s.join(): thread stopped", self
)
562 deadline
= _time() + timeout
563 while not self
.__stopped
:
564 delay
= deadline
- _time()
567 self
._note
("%s.join(): timed out", self
)
569 self
.__block
.wait(delay
)
572 self
._note
("%s.join(): thread stopped", self
)
574 self
.__block
.release()
577 assert self
.__initialized
, "Thread.__init__() not called"
580 def setName(self
, name
):
581 assert self
.__initialized
, "Thread.__init__() not called"
582 self
.__name
= str(name
)
585 assert self
.__initialized
, "Thread.__init__() not called"
586 return self
.__started
and not self
.__stopped
589 assert self
.__initialized
, "Thread.__init__() not called"
590 return self
.__daemonic
592 def setDaemon(self
, daemonic
):
593 assert self
.__initialized
, "Thread.__init__() not called"
594 assert not self
.__started
, "cannot set daemon status of active thread"
595 self
.__daemonic
= daemonic
597 # The timer class was contributed by Itamar Shtull-Trauring
599 def Timer(*args
, **kwargs
):
600 return _Timer(*args
, **kwargs
)
602 class _Timer(Thread
):
603 """Call a function after a specified number of seconds:
605 t = Timer(30.0, f, args=[], kwargs={})
607 t.cancel() # stop the timer's action if it's still waiting
610 def __init__(self
, interval
, function
, args
=[], kwargs
={}):
611 Thread
.__init
__(self
)
612 self
.interval
= interval
613 self
.function
= function
616 self
.finished
= Event()
619 """Stop the timer if it hasn't finished yet"""
623 self
.finished
.wait(self
.interval
)
624 if not self
.finished
.isSet():
625 self
.function(*self
.args
, **self
.kwargs
)
628 # Special thread class to represent the main thread
629 # This is garbage collected through an exit handler
631 class _MainThread(Thread
):
634 Thread
.__init
__(self
, name
="MainThread")
635 self
._Thread
__started
= True
636 _active_limbo_lock
.acquire()
637 _active
[_get_ident()] = self
638 _active_limbo_lock
.release()
640 atexit
.register(self
.__exitfunc
)
642 def _set_daemon(self
):
645 def __exitfunc(self
):
647 t
= _pickSomeNonDaemonThread()
650 self
._note
("%s: waiting for other threads", self
)
653 t
= _pickSomeNonDaemonThread()
655 self
._note
("%s: exiting", self
)
656 self
._Thread
__delete
()
658 def _pickSomeNonDaemonThread():
659 for t
in enumerate():
660 if not t
.isDaemon() and t
.isAlive():
665 # Dummy thread class to represent threads not started here.
666 # These aren't garbage collected when they die, nor can they be waited for.
667 # If they invoke anything in threading.py that calls currentThread(), they
668 # leave an entry in the _active dict forever after.
669 # Their purpose is to return *something* from currentThread().
670 # They are marked as daemon threads so we won't wait for them
671 # when we exit (conform previous semantics).
673 class _DummyThread(Thread
):
676 Thread
.__init
__(self
, name
=_newname("Dummy-%d"))
678 # Thread.__block consumes an OS-level locking primitive, which
679 # can never be used by a _DummyThread. Since a _DummyThread
680 # instance is immortal, that's bad, so release this resource.
681 del self
._Thread
__block
683 self
._Thread
__started
= True
684 _active_limbo_lock
.acquire()
685 _active
[_get_ident()] = self
686 _active_limbo_lock
.release()
688 def _set_daemon(self
):
691 def join(self
, timeout
=None):
692 assert False, "cannot join a dummy thread"
695 # Global API functions
699 return _active
[_get_ident()]
701 ##print "currentThread(): no current thread for", _get_ident()
702 return _DummyThread()
705 _active_limbo_lock
.acquire()
706 count
= len(_active
) + len(_limbo
)
707 _active_limbo_lock
.release()
711 _active_limbo_lock
.acquire()
712 active
= _active
.values() + _limbo
.values()
713 _active_limbo_lock
.release()
716 from thread
import stack_size
718 # Create the main thread object
722 # get thread-local implementation, either from the thread
723 # module, or from the python fallback
726 from thread
import _local
as local
728 from _threading_local
import local
735 class BoundedQueue(_Verbose
):
737 def __init__(self
, limit
):
738 _Verbose
.__init
__(self
)
740 self
.rc
= Condition(self
.mon
)
741 self
.wc
= Condition(self
.mon
)
747 while len(self
.queue
) >= self
.limit
:
748 self
._note
("put(%s): queue full", item
)
750 self
.queue
.append(item
)
751 self
._note
("put(%s): appended, length now %d",
752 item
, len(self
.queue
))
758 while not self
.queue
:
759 self
._note
("get(): queue empty")
761 item
= self
.queue
.popleft()
762 self
._note
("get(): got %s, %d left", item
, len(self
.queue
))
767 class ProducerThread(Thread
):
769 def __init__(self
, queue
, quota
):
770 Thread
.__init
__(self
, name
="Producer")
775 from random
import random
777 while counter
< self
.quota
:
778 counter
= counter
+ 1
779 self
.queue
.put("%s.%d" % (self
.getName(), counter
))
780 _sleep(random() * 0.00001)
783 class ConsumerThread(Thread
):
785 def __init__(self
, queue
, count
):
786 Thread
.__init
__(self
, name
="Consumer")
791 while self
.count
> 0:
792 item
= self
.queue
.get()
794 self
.count
= self
.count
- 1
803 t
= ProducerThread(Q
, NI
)
804 t
.setName("Producer-%d" % (i
+1))
806 C
= ConsumerThread(Q
, NI
*NP
)
815 if __name__
== '__main__':