1 """Thread module emulating a subset of Java's threading model."""
8 del _sys
.modules
[__name__
]
12 from time
import time
as _time
, sleep
as _sleep
13 from traceback
import format_exc
as _format_exc
14 from collections
import deque
16 # Rename some stuff so "from threading import *" is safe
17 __all__
= ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event',
18 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
19 'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
21 _start_new_thread
= thread
.start_new_thread
22 _allocate_lock
= thread
.allocate_lock
23 _get_ident
= thread
.get_ident
24 ThreadError
= thread
.error
28 # sys.exc_clear is used to work around the fact that except blocks
29 # don't fully clear the exception until 3.0.
30 warnings
.filterwarnings('ignore', category
=DeprecationWarning,
31 module
='threading', message
='sys.exc_clear')
34 # Debug support (adapted from ihooks.py).
35 # All the major classes here derive from _Verbose. We force that to
36 # be a new-style class so that all the major classes here are new-style.
37 # This helps debugging (type(instance) is more revealing for instances
38 # of new-style classes).
44 class _Verbose(object):
46 def __init__(self
, verbose
=None):
49 self
.__verbose
= verbose
51 def _note(self
, format
, *args
):
53 format
= format
% args
54 format
= "%s: %s\n" % (
55 currentThread().getName(), format
)
56 _sys
.stderr
.write(format
)
59 # Disable this when using "python -O"
60 class _Verbose(object):
61 def __init__(self
, verbose
=None):
63 def _note(self
, *args
):
66 # Support for profile and trace hooks
79 # Synchronization classes
83 def RLock(*args
, **kwargs
):
84 return _RLock(*args
, **kwargs
)
86 class _RLock(_Verbose
):
88 def __init__(self
, verbose
=None):
89 _Verbose
.__init
__(self
, verbose
)
90 self
.__block
= _allocate_lock()
96 return "<%s(%s, %d)>" % (
97 self
.__class
__.__name
__,
98 owner
and owner
.getName(),
101 def acquire(self
, blocking
=1):
103 if self
.__owner
is me
:
104 self
.__count
= self
.__count
+ 1
106 self
._note
("%s.acquire(%s): recursive success", self
, blocking
)
108 rc
= self
.__block
.acquire(blocking
)
113 self
._note
("%s.acquire(%s): initial success", self
, blocking
)
116 self
._note
("%s.acquire(%s): failure", self
, blocking
)
122 if self
.__owner
is not currentThread():
123 raise RuntimeError("cannot release un-aquired lock")
124 self
.__count
= count
= self
.__count
- 1
127 self
.__block
.release()
129 self
._note
("%s.release(): final release", self
)
132 self
._note
("%s.release(): non-final release", self
)
134 def __exit__(self
, t
, v
, tb
):
137 # Internal methods used by condition variables
139 def _acquire_restore(self
, (count
, owner
)):
140 self
.__block
.acquire()
144 self
._note
("%s._acquire_restore()", self
)
146 def _release_save(self
):
148 self
._note
("%s._release_save()", self
)
153 self
.__block
.release()
154 return (count
, owner
)
157 return self
.__owner
is currentThread()
160 def Condition(*args
, **kwargs
):
161 return _Condition(*args
, **kwargs
)
163 class _Condition(_Verbose
):
165 def __init__(self
, lock
=None, verbose
=None):
166 _Verbose
.__init
__(self
, verbose
)
170 # Export the lock's acquire() and release() methods
171 self
.acquire
= lock
.acquire
172 self
.release
= lock
.release
173 # If the lock defines _release_save() and/or _acquire_restore(),
174 # these override the default implementations (which just call
175 # release() and acquire() on the lock). Ditto for _is_owned().
177 self
._release
_save
= lock
._release
_save
178 except AttributeError:
181 self
._acquire
_restore
= lock
._acquire
_restore
182 except AttributeError:
185 self
._is
_owned
= lock
._is
_owned
186 except AttributeError:
191 return self
.__lock
.__enter
__()
193 def __exit__(self
, *args
):
194 return self
.__lock
.__exit
__(*args
)
197 return "<Condition(%s, %d)>" % (self
.__lock
, len(self
.__waiters
))
199 def _release_save(self
):
200 self
.__lock
.release() # No state to save
202 def _acquire_restore(self
, x
):
203 self
.__lock
.acquire() # Ignore saved state
206 # Return True if lock is owned by currentThread.
207 # This method is called only if __lock doesn't have _is_owned().
208 if self
.__lock
.acquire(0):
209 self
.__lock
.release()
214 def wait(self
, timeout
=None):
215 if not self
._is
_owned
():
216 raise RuntimeError("cannot wait on un-aquired lock")
217 waiter
= _allocate_lock()
219 self
.__waiters
.append(waiter
)
220 saved_state
= self
._release
_save
()
221 try: # restore state no matter what (e.g., KeyboardInterrupt)
225 self
._note
("%s.wait(): got it", self
)
227 # Balancing act: We can't afford a pure busy loop, so we
228 # have to sleep; but if we sleep the whole timeout time,
229 # we'll be unresponsive. The scheme here sleeps very
230 # little at first, longer as time goes on, but never longer
231 # than 20 times per second (or the timeout time remaining).
232 endtime
= _time() + timeout
233 delay
= 0.0005 # 500 us -> initial delay of 1 ms
235 gotit
= waiter
.acquire(0)
238 remaining
= endtime
- _time()
241 delay
= min(delay
* 2, remaining
, .05)
245 self
._note
("%s.wait(%s): timed out", self
, timeout
)
247 self
.__waiters
.remove(waiter
)
252 self
._note
("%s.wait(%s): got it", self
, timeout
)
254 self
._acquire
_restore
(saved_state
)
256 def notify(self
, n
=1):
257 if not self
._is
_owned
():
258 raise RuntimeError("cannot notify on un-aquired lock")
259 __waiters
= self
.__waiters
260 waiters
= __waiters
[:n
]
263 self
._note
("%s.notify(): no waiters", self
)
265 self
._note
("%s.notify(): notifying %d waiter%s", self
, n
,
267 for waiter
in waiters
:
270 __waiters
.remove(waiter
)
275 self
.notify(len(self
.__waiters
))
278 def Semaphore(*args
, **kwargs
):
279 return _Semaphore(*args
, **kwargs
)
281 class _Semaphore(_Verbose
):
283 # After Tim Peters' semaphore class, but not quite the same (no maximum)
285 def __init__(self
, value
=1, verbose
=None):
287 raise ValueError("semaphore initial value must be >= 0")
288 _Verbose
.__init
__(self
, verbose
)
289 self
.__cond
= Condition(Lock())
292 def acquire(self
, blocking
=1):
294 self
.__cond
.acquire()
295 while self
.__value
== 0:
299 self
._note
("%s.acquire(%s): blocked waiting, value=%s",
300 self
, blocking
, self
.__value
)
303 self
.__value
= self
.__value
- 1
305 self
._note
("%s.acquire: success, value=%s",
308 self
.__cond
.release()
314 self
.__cond
.acquire()
315 self
.__value
= self
.__value
+ 1
317 self
._note
("%s.release: success, value=%s",
320 self
.__cond
.release()
322 def __exit__(self
, t
, v
, tb
):
326 def BoundedSemaphore(*args
, **kwargs
):
327 return _BoundedSemaphore(*args
, **kwargs
)
329 class _BoundedSemaphore(_Semaphore
):
330 """Semaphore that checks that # releases is <= # acquires"""
331 def __init__(self
, value
=1, verbose
=None):
332 _Semaphore
.__init
__(self
, value
, verbose
)
333 self
._initial
_value
= value
336 if self
._Semaphore
__value
>= self
._initial
_value
:
337 raise ValueError, "Semaphore released too many times"
338 return _Semaphore
.release(self
)
341 def Event(*args
, **kwargs
):
342 return _Event(*args
, **kwargs
)
344 class _Event(_Verbose
):
346 # After Tim Peters' event class (without is_posted())
348 def __init__(self
, verbose
=None):
349 _Verbose
.__init
__(self
, verbose
)
350 self
.__cond
= Condition(Lock())
357 self
.__cond
.acquire()
360 self
.__cond
.notifyAll()
362 self
.__cond
.release()
365 self
.__cond
.acquire()
369 self
.__cond
.release()
371 def wait(self
, timeout
=None):
372 self
.__cond
.acquire()
375 self
.__cond
.wait(timeout
)
377 self
.__cond
.release()
379 # Helper to generate new thread names
381 def _newname(template
="Thread-%d"):
383 _counter
= _counter
+ 1
384 return template
% _counter
386 # Active thread administration
387 _active_limbo_lock
= _allocate_lock()
388 _active
= {} # maps thread id to Thread object
392 # Main class for threads
394 class Thread(_Verbose
):
396 __initialized
= False
397 # Need to store a reference to sys.exc_info for printing
398 # out exceptions when a thread tries to use a global var. during interp.
399 # shutdown and thus raises an exception about trying to perform some
400 # operation on/with a NoneType
401 __exc_info
= _sys
.exc_info
402 # Keep sys.exc_clear too to clear the exception just before
403 # allowing .join() to return.
404 __exc_clear
= _sys
.exc_clear
406 def __init__(self
, group
=None, target
=None, name
=None,
407 args
=(), kwargs
=None, verbose
=None):
408 assert group
is None, "group argument must be None for now"
409 _Verbose
.__init
__(self
, verbose
)
412 self
.__target
= target
413 self
.__name
= str(name
or _newname())
415 self
.__kwargs
= kwargs
416 self
.__daemonic
= self
._set
_daemon
()
417 self
.__started
= Event()
418 self
.__stopped
= False
419 self
.__block
= Condition(Lock())
420 self
.__initialized
= True
421 # sys.stderr is not stored in the class like
422 # sys.exc_info since it can be changed between instances
423 self
.__stderr
= _sys
.stderr
425 def _set_daemon(self
):
426 # Overridden in _MainThread and _DummyThread
427 return currentThread().isDaemon()
430 assert self
.__initialized
, "Thread.__init__() was not called"
432 if self
.__started
.isSet():
437 status
= status
+ " daemon"
438 return "<%s(%s, %s)>" % (self
.__class
__.__name
__, self
.__name
, status
)
441 if not self
.__initialized
:
442 raise RuntimeError("thread.__init__() not called")
443 if self
.__started
.isSet():
444 raise RuntimeError("thread already started")
446 self
._note
("%s.start(): starting thread", self
)
447 _active_limbo_lock
.acquire()
449 _active_limbo_lock
.release()
450 _start_new_thread(self
.__bootstrap
, ())
451 self
.__started
.wait()
456 self
.__target
(*self
.__args
, **self
.__kwargs
)
458 # Avoid a refcycle if the thread is running a function with
459 # an argument that has a member that points to the thread.
460 del self
.__target
, self
.__args
, self
.__kwargs
462 def __bootstrap(self
):
463 # Wrapper around the real bootstrap code that ignores
464 # exceptions during interpreter cleanup. Those typically
465 # happen when a daemon thread wakes up at an unfortunate
466 # moment, finds the world around it destroyed, and raises some
467 # random exception *** while trying to report the exception in
468 # __bootstrap_inner() below ***. Those random exceptions
469 # don't help anybody, and they confuse users, so we suppress
470 # them. We suppress them only when it appears that the world
471 # indeed has already been destroyed, so that exceptions in
472 # __bootstrap_inner() during normal business hours are properly
473 # reported. Also, we only suppress them for daemonic threads;
474 # if a non-daemonic encounters this, something else is wrong.
476 self
.__bootstrap
_inner
()
478 if self
.__daemonic
and _sys
is None:
482 def __bootstrap_inner(self
):
485 _active_limbo_lock
.acquire()
486 _active
[_get_ident()] = self
488 _active_limbo_lock
.release()
490 self
._note
("%s.__bootstrap(): thread started", self
)
493 self
._note
("%s.__bootstrap(): registering trace hook", self
)
494 _sys
.settrace(_trace_hook
)
496 self
._note
("%s.__bootstrap(): registering profile hook", self
)
497 _sys
.setprofile(_profile_hook
)
503 self
._note
("%s.__bootstrap(): raised SystemExit", self
)
506 self
._note
("%s.__bootstrap(): unhandled exception", self
)
507 # If sys.stderr is no more (most likely from interpreter
508 # shutdown) use self.__stderr. Otherwise still use sys (as in
509 # _sys) in case sys.stderr was redefined since the creation of
512 _sys
.stderr
.write("Exception in thread %s:\n%s\n" %
513 (self
.getName(), _format_exc()))
515 # Do the best job possible w/o a huge amt. of code to
516 # approximate a traceback (code ideas from
518 exc_type
, exc_value
, exc_tb
= self
.__exc
_info
()
520 print>>self
.__stderr
, (
521 "Exception in thread " + self
.getName() +
522 " (most likely raised during interpreter shutdown):")
523 print>>self
.__stderr
, (
524 "Traceback (most recent call last):")
526 print>>self
.__stderr
, (
527 ' File "%s", line %s, in %s' %
528 (exc_tb
.tb_frame
.f_code
.co_filename
,
530 exc_tb
.tb_frame
.f_code
.co_name
))
531 exc_tb
= exc_tb
.tb_next
532 print>>self
.__stderr
, ("%s: %s" % (exc_type
, exc_value
))
533 # Make sure that exc_tb gets deleted since it is a memory
534 # hog; deleting everything else is just for thoroughness
536 del exc_type
, exc_value
, exc_tb
539 self
._note
("%s.__bootstrap(): normal return", self
)
542 # test_threading.test_no_refcycle_through_target when
543 # the exception keeps the target alive past when we
544 # assert that it's dead.
547 with _active_limbo_lock
:
550 # We don't call self.__delete() because it also
551 # grabs _active_limbo_lock.
552 del _active
[_get_ident()]
557 self
.__block
.acquire()
558 self
.__stopped
= True
559 self
.__block
.notifyAll()
560 self
.__block
.release()
563 "Remove current thread from the dict of currently running threads."
565 # Notes about running with dummy_thread:
567 # Must take care to not raise an exception if dummy_thread is being
568 # used (and thus this module is being used as an instance of
569 # dummy_threading). dummy_thread.get_ident() always returns -1 since
570 # there is only one thread if dummy_thread is being used. Thus
571 # len(_active) is always <= 1 here, and any Thread instance created
572 # overwrites the (if any) thread currently registered in _active.
574 # An instance of _MainThread is always created by 'threading'. This
575 # gets overwritten the instant an instance of Thread is created; both
576 # threads return -1 from dummy_thread.get_ident() and thus have the
577 # same key in the dict. So when the _MainThread instance created by
578 # 'threading' tries to clean itself up when atexit calls this method
579 # it gets a KeyError if another Thread instance was created.
581 # This all means that KeyError from trying to delete something from
582 # _active if dummy_threading is being used is a red herring. But
583 # since it isn't if dummy_threading is *not* being used then don't
584 # hide the exception.
587 with _active_limbo_lock
:
588 del _active
[_get_ident()]
589 # There must not be any python code between the previous line
590 # and after the lock is released. Otherwise a tracing function
591 # could try to acquire the lock again in the same thread, (in
592 # currentThread()), and would block.
594 if 'dummy_threading' not in _sys
.modules
:
597 def join(self
, timeout
=None):
598 if not self
.__initialized
:
599 raise RuntimeError("Thread.__init__() not called")
600 if not self
.__started
.isSet():
601 raise RuntimeError("cannot join thread before it is started")
602 if self
is currentThread():
603 raise RuntimeError("cannot join current thread")
606 if not self
.__stopped
:
607 self
._note
("%s.join(): waiting until thread stops", self
)
608 self
.__block
.acquire()
611 while not self
.__stopped
:
614 self
._note
("%s.join(): thread stopped", self
)
616 deadline
= _time() + timeout
617 while not self
.__stopped
:
618 delay
= deadline
- _time()
621 self
._note
("%s.join(): timed out", self
)
623 self
.__block
.wait(delay
)
626 self
._note
("%s.join(): thread stopped", self
)
628 self
.__block
.release()
631 assert self
.__initialized
, "Thread.__init__() not called"
634 def setName(self
, name
):
635 assert self
.__initialized
, "Thread.__init__() not called"
636 self
.__name
= str(name
)
639 assert self
.__initialized
, "Thread.__init__() not called"
640 return self
.__started
.isSet() and not self
.__stopped
643 assert self
.__initialized
, "Thread.__init__() not called"
644 return self
.__daemonic
646 def setDaemon(self
, daemonic
):
647 if not self
.__initialized
:
648 raise RuntimeError("Thread.__init__() not called")
649 if self
.__started
.isSet():
650 raise RuntimeError("cannot set daemon status of active thread");
651 self
.__daemonic
= daemonic
653 # The timer class was contributed by Itamar Shtull-Trauring
655 def Timer(*args
, **kwargs
):
656 return _Timer(*args
, **kwargs
)
658 class _Timer(Thread
):
659 """Call a function after a specified number of seconds:
661 t = Timer(30.0, f, args=[], kwargs={})
663 t.cancel() # stop the timer's action if it's still waiting
666 def __init__(self
, interval
, function
, args
=[], kwargs
={}):
667 Thread
.__init
__(self
)
668 self
.interval
= interval
669 self
.function
= function
672 self
.finished
= Event()
675 """Stop the timer if it hasn't finished yet"""
679 self
.finished
.wait(self
.interval
)
680 if not self
.finished
.isSet():
681 self
.function(*self
.args
, **self
.kwargs
)
684 # Special thread class to represent the main thread
685 # This is garbage collected through an exit handler
687 class _MainThread(Thread
):
690 Thread
.__init
__(self
, name
="MainThread")
691 self
._Thread
__started
.set()
692 _active_limbo_lock
.acquire()
693 _active
[_get_ident()] = self
694 _active_limbo_lock
.release()
696 def _set_daemon(self
):
701 t
= _pickSomeNonDaemonThread()
704 self
._note
("%s: waiting for other threads", self
)
707 t
= _pickSomeNonDaemonThread()
709 self
._note
("%s: exiting", self
)
710 self
._Thread
__delete
()
712 def _pickSomeNonDaemonThread():
713 for t
in enumerate():
714 if not t
.isDaemon() and t
.isAlive():
719 # Dummy thread class to represent threads not started here.
720 # These aren't garbage collected when they die, nor can they be waited for.
721 # If they invoke anything in threading.py that calls currentThread(), they
722 # leave an entry in the _active dict forever after.
723 # Their purpose is to return *something* from currentThread().
724 # They are marked as daemon threads so we won't wait for them
725 # when we exit (conform previous semantics).
727 class _DummyThread(Thread
):
730 Thread
.__init
__(self
, name
=_newname("Dummy-%d"))
732 # Thread.__block consumes an OS-level locking primitive, which
733 # can never be used by a _DummyThread. Since a _DummyThread
734 # instance is immortal, that's bad, so release this resource.
735 del self
._Thread
__block
737 self
._Thread
__started
.set()
738 _active_limbo_lock
.acquire()
739 _active
[_get_ident()] = self
740 _active_limbo_lock
.release()
742 def _set_daemon(self
):
745 def join(self
, timeout
=None):
746 assert False, "cannot join a dummy thread"
749 # Global API functions
753 return _active
[_get_ident()]
755 ##print "currentThread(): no current thread for", _get_ident()
756 return _DummyThread()
759 _active_limbo_lock
.acquire()
760 count
= len(_active
) + len(_limbo
)
761 _active_limbo_lock
.release()
765 _active_limbo_lock
.acquire()
766 active
= _active
.values() + _limbo
.values()
767 _active_limbo_lock
.release()
770 from thread
import stack_size
772 # Create the main thread object,
773 # and make it available for the interpreter
774 # (Py_Main) as threading._shutdown.
776 _shutdown
= _MainThread()._exitfunc
778 # get thread-local implementation, either from the thread
779 # module, or from the python fallback
782 from thread
import _local
as local
784 from _threading_local
import local
791 class BoundedQueue(_Verbose
):
793 def __init__(self
, limit
):
794 _Verbose
.__init
__(self
)
796 self
.rc
= Condition(self
.mon
)
797 self
.wc
= Condition(self
.mon
)
803 while len(self
.queue
) >= self
.limit
:
804 self
._note
("put(%s): queue full", item
)
806 self
.queue
.append(item
)
807 self
._note
("put(%s): appended, length now %d",
808 item
, len(self
.queue
))
814 while not self
.queue
:
815 self
._note
("get(): queue empty")
817 item
= self
.queue
.popleft()
818 self
._note
("get(): got %s, %d left", item
, len(self
.queue
))
823 class ProducerThread(Thread
):
825 def __init__(self
, queue
, quota
):
826 Thread
.__init
__(self
, name
="Producer")
831 from random
import random
833 while counter
< self
.quota
:
834 counter
= counter
+ 1
835 self
.queue
.put("%s.%d" % (self
.getName(), counter
))
836 _sleep(random() * 0.00001)
839 class ConsumerThread(Thread
):
841 def __init__(self
, queue
, count
):
842 Thread
.__init
__(self
, name
="Consumer")
847 while self
.count
> 0:
848 item
= self
.queue
.get()
850 self
.count
= self
.count
- 1
859 t
= ProducerThread(Q
, NI
)
860 t
.setName("Producer-%d" % (i
+1))
862 C
= ConsumerThread(Q
, NI
*NP
)
871 if __name__
== '__main__':