1 """Thread module emulating a subset of Java's threading model."""
8 del _sys
.modules
[__name__
]
13 from functools
import wraps
14 from time
import time
as _time
, sleep
as _sleep
15 from traceback
import format_exc
as _format_exc
16 from collections
import deque
18 # Rename some stuff so "from threading import *" is safe
19 __all__
= ['activeCount', 'active_count', 'Condition', 'currentThread',
20 'current_thread', 'enumerate', 'Event',
21 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
22 'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
24 _start_new_thread
= thread
.start_new_thread
25 _allocate_lock
= thread
.allocate_lock
26 _get_ident
= thread
.get_ident
27 ThreadError
= thread
.error
31 # sys.exc_clear is used to work around the fact that except blocks
32 # don't fully clear the exception until 3.0.
33 warnings
.filterwarnings('ignore', category
=DeprecationWarning,
34 module
='threading', message
='sys.exc_clear')
37 def _old_api(callable, old_name
):
38 if not _sys
.py3kwarning
:
41 def old(*args
, **kwargs
):
42 warnings
.warnpy3k("In 3.x, {0} is renamed to {1}."
43 .format(old_name
, callable.__name
__),
45 return callable(*args
, **kwargs
)
46 old
.__name
__ = old_name
49 # Debug support (adapted from ihooks.py).
50 # All the major classes here derive from _Verbose. We force that to
51 # be a new-style class so that all the major classes here are new-style.
52 # This helps debugging (type(instance) is more revealing for instances
53 # of new-style classes).
59 class _Verbose(object):
61 def __init__(self
, verbose
=None):
64 self
.__verbose
= verbose
66 def _note(self
, format
, *args
):
68 format
= format
% args
69 format
= "%s: %s\n" % (
70 current_thread().get_name(), format
)
71 _sys
.stderr
.write(format
)
74 # Disable this when using "python -O"
75 class _Verbose(object):
76 def __init__(self
, verbose
=None):
78 def _note(self
, *args
):
81 # Support for profile and trace hooks
94 # Synchronization classes
98 def RLock(*args
, **kwargs
):
99 return _RLock(*args
, **kwargs
)
101 class _RLock(_Verbose
):
103 def __init__(self
, verbose
=None):
104 _Verbose
.__init
__(self
, verbose
)
105 self
.__block
= _allocate_lock()
111 return "<%s(%s, %d)>" % (
112 self
.__class
__.__name
__,
113 owner
and owner
.get_name(),
116 def acquire(self
, blocking
=1):
117 me
= current_thread()
118 if self
.__owner
is me
:
119 self
.__count
= self
.__count
+ 1
121 self
._note
("%s.acquire(%s): recursive success", self
, blocking
)
123 rc
= self
.__block
.acquire(blocking
)
128 self
._note
("%s.acquire(%s): initial success", self
, blocking
)
131 self
._note
("%s.acquire(%s): failure", self
, blocking
)
137 if self
.__owner
is not current_thread():
138 raise RuntimeError("cannot release un-aquired lock")
139 self
.__count
= count
= self
.__count
- 1
142 self
.__block
.release()
144 self
._note
("%s.release(): final release", self
)
147 self
._note
("%s.release(): non-final release", self
)
149 def __exit__(self
, t
, v
, tb
):
152 # Internal methods used by condition variables
154 def _acquire_restore(self
, (count
, owner
)):
155 self
.__block
.acquire()
159 self
._note
("%s._acquire_restore()", self
)
161 def _release_save(self
):
163 self
._note
("%s._release_save()", self
)
168 self
.__block
.release()
169 return (count
, owner
)
172 return self
.__owner
is current_thread()
175 def Condition(*args
, **kwargs
):
176 return _Condition(*args
, **kwargs
)
178 class _Condition(_Verbose
):
180 def __init__(self
, lock
=None, verbose
=None):
181 _Verbose
.__init
__(self
, verbose
)
185 # Export the lock's acquire() and release() methods
186 self
.acquire
= lock
.acquire
187 self
.release
= lock
.release
188 # If the lock defines _release_save() and/or _acquire_restore(),
189 # these override the default implementations (which just call
190 # release() and acquire() on the lock). Ditto for _is_owned().
192 self
._release
_save
= lock
._release
_save
193 except AttributeError:
196 self
._acquire
_restore
= lock
._acquire
_restore
197 except AttributeError:
200 self
._is
_owned
= lock
._is
_owned
201 except AttributeError:
206 return self
.__lock
.__enter
__()
208 def __exit__(self
, *args
):
209 return self
.__lock
.__exit
__(*args
)
212 return "<Condition(%s, %d)>" % (self
.__lock
, len(self
.__waiters
))
214 def _release_save(self
):
215 self
.__lock
.release() # No state to save
217 def _acquire_restore(self
, x
):
218 self
.__lock
.acquire() # Ignore saved state
221 # Return True if lock is owned by current_thread.
222 # This method is called only if __lock doesn't have _is_owned().
223 if self
.__lock
.acquire(0):
224 self
.__lock
.release()
229 def wait(self
, timeout
=None):
230 if not self
._is
_owned
():
231 raise RuntimeError("cannot wait on un-aquired lock")
232 waiter
= _allocate_lock()
234 self
.__waiters
.append(waiter
)
235 saved_state
= self
._release
_save
()
236 try: # restore state no matter what (e.g., KeyboardInterrupt)
240 self
._note
("%s.wait(): got it", self
)
242 # Balancing act: We can't afford a pure busy loop, so we
243 # have to sleep; but if we sleep the whole timeout time,
244 # we'll be unresponsive. The scheme here sleeps very
245 # little at first, longer as time goes on, but never longer
246 # than 20 times per second (or the timeout time remaining).
247 endtime
= _time() + timeout
248 delay
= 0.0005 # 500 us -> initial delay of 1 ms
250 gotit
= waiter
.acquire(0)
253 remaining
= endtime
- _time()
256 delay
= min(delay
* 2, remaining
, .05)
260 self
._note
("%s.wait(%s): timed out", self
, timeout
)
262 self
.__waiters
.remove(waiter
)
267 self
._note
("%s.wait(%s): got it", self
, timeout
)
269 self
._acquire
_restore
(saved_state
)
271 def notify(self
, n
=1):
272 if not self
._is
_owned
():
273 raise RuntimeError("cannot notify on un-aquired lock")
274 __waiters
= self
.__waiters
275 waiters
= __waiters
[:n
]
278 self
._note
("%s.notify(): no waiters", self
)
280 self
._note
("%s.notify(): notifying %d waiter%s", self
, n
,
282 for waiter
in waiters
:
285 __waiters
.remove(waiter
)
289 def notify_all(self
):
290 self
.notify(len(self
.__waiters
))
292 notifyAll
= _old_api(notify_all
, "notifyAll")
295 def Semaphore(*args
, **kwargs
):
296 return _Semaphore(*args
, **kwargs
)
298 class _Semaphore(_Verbose
):
300 # After Tim Peters' semaphore class, but not quite the same (no maximum)
302 def __init__(self
, value
=1, verbose
=None):
304 raise ValueError("semaphore initial value must be >= 0")
305 _Verbose
.__init
__(self
, verbose
)
306 self
.__cond
= Condition(Lock())
309 def acquire(self
, blocking
=1):
311 self
.__cond
.acquire()
312 while self
.__value
== 0:
316 self
._note
("%s.acquire(%s): blocked waiting, value=%s",
317 self
, blocking
, self
.__value
)
320 self
.__value
= self
.__value
- 1
322 self
._note
("%s.acquire: success, value=%s",
325 self
.__cond
.release()
331 self
.__cond
.acquire()
332 self
.__value
= self
.__value
+ 1
334 self
._note
("%s.release: success, value=%s",
337 self
.__cond
.release()
339 def __exit__(self
, t
, v
, tb
):
343 def BoundedSemaphore(*args
, **kwargs
):
344 return _BoundedSemaphore(*args
, **kwargs
)
346 class _BoundedSemaphore(_Semaphore
):
347 """Semaphore that checks that # releases is <= # acquires"""
348 def __init__(self
, value
=1, verbose
=None):
349 _Semaphore
.__init
__(self
, value
, verbose
)
350 self
._initial
_value
= value
353 if self
._Semaphore
__value
>= self
._initial
_value
:
354 raise ValueError, "Semaphore released too many times"
355 return _Semaphore
.release(self
)
358 def Event(*args
, **kwargs
):
359 return _Event(*args
, **kwargs
)
361 class _Event(_Verbose
):
363 # After Tim Peters' event class (without is_posted())
365 def __init__(self
, verbose
=None):
366 _Verbose
.__init
__(self
, verbose
)
367 self
.__cond
= Condition(Lock())
373 isSet
= _old_api(is_set
, "isSet")
376 self
.__cond
.acquire()
379 self
.__cond
.notify_all()
381 self
.__cond
.release()
384 self
.__cond
.acquire()
388 self
.__cond
.release()
390 def wait(self
, timeout
=None):
391 self
.__cond
.acquire()
394 self
.__cond
.wait(timeout
)
396 self
.__cond
.release()
398 # Helper to generate new thread names
400 def _newname(template
="Thread-%d"):
402 _counter
= _counter
+ 1
403 return template
% _counter
405 # Active thread administration
406 _active_limbo_lock
= _allocate_lock()
407 _active
= {} # maps thread id to Thread object
411 # Main class for threads
413 class Thread(_Verbose
):
415 __initialized
= False
416 # Need to store a reference to sys.exc_info for printing
417 # out exceptions when a thread tries to use a global var. during interp.
418 # shutdown and thus raises an exception about trying to perform some
419 # operation on/with a NoneType
420 __exc_info
= _sys
.exc_info
421 # Keep sys.exc_clear too to clear the exception just before
422 # allowing .join() to return.
423 __exc_clear
= _sys
.exc_clear
425 def __init__(self
, group
=None, target
=None, name
=None,
426 args
=(), kwargs
=None, verbose
=None):
427 assert group
is None, "group argument must be None for now"
428 _Verbose
.__init
__(self
, verbose
)
431 self
.__target
= target
432 self
.__name
= str(name
or _newname())
434 self
.__kwargs
= kwargs
435 self
.__daemonic
= self
._set
_daemon
()
437 self
.__started
= Event()
438 self
.__stopped
= False
439 self
.__block
= Condition(Lock())
440 self
.__initialized
= True
441 # sys.stderr is not stored in the class like
442 # sys.exc_info since it can be changed between instances
443 self
.__stderr
= _sys
.stderr
445 def _set_daemon(self
):
446 # Overridden in _MainThread and _DummyThread
447 return current_thread().is_daemon()
450 assert self
.__initialized
, "Thread.__init__() was not called"
452 if self
.__started
.is_set():
458 if self
.__ident
is not None:
459 status
+= " %s" % self
.__ident
460 return "<%s(%s, %s)>" % (self
.__class
__.__name
__, self
.__name
, status
)
463 if not self
.__initialized
:
464 raise RuntimeError("thread.__init__() not called")
465 if self
.__started
.is_set():
466 raise RuntimeError("thread already started")
468 self
._note
("%s.start(): starting thread", self
)
469 _active_limbo_lock
.acquire()
471 _active_limbo_lock
.release()
472 _start_new_thread(self
.__bootstrap
, ())
473 self
.__started
.wait()
478 self
.__target
(*self
.__args
, **self
.__kwargs
)
480 # Avoid a refcycle if the thread is running a function with
481 # an argument that has a member that points to the thread.
482 del self
.__target
, self
.__args
, self
.__kwargs
484 def __bootstrap(self
):
485 # Wrapper around the real bootstrap code that ignores
486 # exceptions during interpreter cleanup. Those typically
487 # happen when a daemon thread wakes up at an unfortunate
488 # moment, finds the world around it destroyed, and raises some
489 # random exception *** while trying to report the exception in
490 # __bootstrap_inner() below ***. Those random exceptions
491 # don't help anybody, and they confuse users, so we suppress
492 # them. We suppress them only when it appears that the world
493 # indeed has already been destroyed, so that exceptions in
494 # __bootstrap_inner() during normal business hours are properly
495 # reported. Also, we only suppress them for daemonic threads;
496 # if a non-daemonic encounters this, something else is wrong.
498 self
.__bootstrap
_inner
()
500 if self
.__daemonic
and _sys
is None:
504 def __bootstrap_inner(self
):
506 self
.__ident
= _get_ident()
508 _active_limbo_lock
.acquire()
509 _active
[self
.__ident
] = self
511 _active_limbo_lock
.release()
513 self
._note
("%s.__bootstrap(): thread started", self
)
516 self
._note
("%s.__bootstrap(): registering trace hook", self
)
517 _sys
.settrace(_trace_hook
)
519 self
._note
("%s.__bootstrap(): registering profile hook", self
)
520 _sys
.setprofile(_profile_hook
)
526 self
._note
("%s.__bootstrap(): raised SystemExit", self
)
529 self
._note
("%s.__bootstrap(): unhandled exception", self
)
530 # If sys.stderr is no more (most likely from interpreter
531 # shutdown) use self.__stderr. Otherwise still use sys (as in
532 # _sys) in case sys.stderr was redefined since the creation of
535 _sys
.stderr
.write("Exception in thread %s:\n%s\n" %
536 (self
.get_name(), _format_exc()))
538 # Do the best job possible w/o a huge amt. of code to
539 # approximate a traceback (code ideas from
541 exc_type
, exc_value
, exc_tb
= self
.__exc
_info
()
543 print>>self
.__stderr
, (
544 "Exception in thread " + self
.get_name() +
545 " (most likely raised during interpreter shutdown):")
546 print>>self
.__stderr
, (
547 "Traceback (most recent call last):")
549 print>>self
.__stderr
, (
550 ' File "%s", line %s, in %s' %
551 (exc_tb
.tb_frame
.f_code
.co_filename
,
553 exc_tb
.tb_frame
.f_code
.co_name
))
554 exc_tb
= exc_tb
.tb_next
555 print>>self
.__stderr
, ("%s: %s" % (exc_type
, exc_value
))
556 # Make sure that exc_tb gets deleted since it is a memory
557 # hog; deleting everything else is just for thoroughness
559 del exc_type
, exc_value
, exc_tb
562 self
._note
("%s.__bootstrap(): normal return", self
)
565 # test_threading.test_no_refcycle_through_target when
566 # the exception keeps the target alive past when we
567 # assert that it's dead.
570 with _active_limbo_lock
:
573 # We don't call self.__delete() because it also
574 # grabs _active_limbo_lock.
575 del _active
[_get_ident()]
580 self
.__block
.acquire()
581 self
.__stopped
= True
582 self
.__block
.notify_all()
583 self
.__block
.release()
586 "Remove current thread from the dict of currently running threads."
588 # Notes about running with dummy_thread:
590 # Must take care to not raise an exception if dummy_thread is being
591 # used (and thus this module is being used as an instance of
592 # dummy_threading). dummy_thread.get_ident() always returns -1 since
593 # there is only one thread if dummy_thread is being used. Thus
594 # len(_active) is always <= 1 here, and any Thread instance created
595 # overwrites the (if any) thread currently registered in _active.
597 # An instance of _MainThread is always created by 'threading'. This
598 # gets overwritten the instant an instance of Thread is created; both
599 # threads return -1 from dummy_thread.get_ident() and thus have the
600 # same key in the dict. So when the _MainThread instance created by
601 # 'threading' tries to clean itself up when atexit calls this method
602 # it gets a KeyError if another Thread instance was created.
604 # This all means that KeyError from trying to delete something from
605 # _active if dummy_threading is being used is a red herring. But
606 # since it isn't if dummy_threading is *not* being used then don't
607 # hide the exception.
610 with _active_limbo_lock
:
611 del _active
[_get_ident()]
612 # There must not be any python code between the previous line
613 # and after the lock is released. Otherwise a tracing function
614 # could try to acquire the lock again in the same thread, (in
615 # current_thread()), and would block.
617 if 'dummy_threading' not in _sys
.modules
:
620 def join(self
, timeout
=None):
621 if not self
.__initialized
:
622 raise RuntimeError("Thread.__init__() not called")
623 if not self
.__started
.is_set():
624 raise RuntimeError("cannot join thread before it is started")
625 if self
is current_thread():
626 raise RuntimeError("cannot join current thread")
629 if not self
.__stopped
:
630 self
._note
("%s.join(): waiting until thread stops", self
)
631 self
.__block
.acquire()
634 while not self
.__stopped
:
637 self
._note
("%s.join(): thread stopped", self
)
639 deadline
= _time() + timeout
640 while not self
.__stopped
:
641 delay
= deadline
- _time()
644 self
._note
("%s.join(): timed out", self
)
646 self
.__block
.wait(delay
)
649 self
._note
("%s.join(): thread stopped", self
)
651 self
.__block
.release()
654 assert self
.__initialized
, "Thread.__init__() not called"
657 getName
= _old_api(get_name
, "getName")
659 def set_name(self
, name
):
660 assert self
.__initialized
, "Thread.__init__() not called"
661 self
.__name
= str(name
)
663 setName
= _old_api(set_name
, "setName")
666 assert self
.__initialized
, "Thread.__init__() not called"
670 assert self
.__initialized
, "Thread.__init__() not called"
671 return self
.__started
.is_set() and not self
.__stopped
673 isAlive
= _old_api(is_alive
, "isAlive")
676 assert self
.__initialized
, "Thread.__init__() not called"
677 return self
.__daemonic
679 isDaemon
= _old_api(is_daemon
, "isDaemon")
681 def set_daemon(self
, daemonic
):
682 if not self
.__initialized
:
683 raise RuntimeError("Thread.__init__() not called")
684 if self
.__started
.is_set():
685 raise RuntimeError("cannot set daemon status of active thread");
686 self
.__daemonic
= daemonic
688 setDaemon
= _old_api(set_daemon
, "setDaemon")
690 # The timer class was contributed by Itamar Shtull-Trauring
692 def Timer(*args
, **kwargs
):
693 return _Timer(*args
, **kwargs
)
695 class _Timer(Thread
):
696 """Call a function after a specified number of seconds:
698 t = Timer(30.0, f, args=[], kwargs={})
700 t.cancel() # stop the timer's action if it's still waiting
703 def __init__(self
, interval
, function
, args
=[], kwargs
={}):
704 Thread
.__init
__(self
)
705 self
.interval
= interval
706 self
.function
= function
709 self
.finished
= Event()
712 """Stop the timer if it hasn't finished yet"""
716 self
.finished
.wait(self
.interval
)
717 if not self
.finished
.is_set():
718 self
.function(*self
.args
, **self
.kwargs
)
721 # Special thread class to represent the main thread
722 # This is garbage collected through an exit handler
724 class _MainThread(Thread
):
727 Thread
.__init
__(self
, name
="MainThread")
728 self
._Thread
__started
.set()
729 _active_limbo_lock
.acquire()
730 _active
[_get_ident()] = self
731 _active_limbo_lock
.release()
733 def _set_daemon(self
):
738 t
= _pickSomeNonDaemonThread()
741 self
._note
("%s: waiting for other threads", self
)
744 t
= _pickSomeNonDaemonThread()
746 self
._note
("%s: exiting", self
)
747 self
._Thread
__delete
()
749 def _pickSomeNonDaemonThread():
750 for t
in enumerate():
751 if not t
.is_daemon() and t
.is_alive():
756 # Dummy thread class to represent threads not started here.
757 # These aren't garbage collected when they die, nor can they be waited for.
758 # If they invoke anything in threading.py that calls current_thread(), they
759 # leave an entry in the _active dict forever after.
760 # Their purpose is to return *something* from current_thread().
761 # They are marked as daemon threads so we won't wait for them
762 # when we exit (conform previous semantics).
764 class _DummyThread(Thread
):
767 Thread
.__init
__(self
, name
=_newname("Dummy-%d"))
769 # Thread.__block consumes an OS-level locking primitive, which
770 # can never be used by a _DummyThread. Since a _DummyThread
771 # instance is immortal, that's bad, so release this resource.
772 del self
._Thread
__block
774 self
._Thread
__started
.set()
775 _active_limbo_lock
.acquire()
776 _active
[_get_ident()] = self
777 _active_limbo_lock
.release()
779 def _set_daemon(self
):
782 def join(self
, timeout
=None):
783 assert False, "cannot join a dummy thread"
786 # Global API functions
788 def current_thread():
790 return _active
[_get_ident()]
792 ##print "current_thread(): no current thread for", _get_ident()
793 return _DummyThread()
795 currentThread
= _old_api(current_thread
, "currentThread")
798 _active_limbo_lock
.acquire()
799 count
= len(_active
) + len(_limbo
)
800 _active_limbo_lock
.release()
803 activeCount
= _old_api(active_count
, "activeCount")
806 _active_limbo_lock
.acquire()
807 active
= _active
.values() + _limbo
.values()
808 _active_limbo_lock
.release()
811 from thread
import stack_size
813 # Create the main thread object,
814 # and make it available for the interpreter
815 # (Py_Main) as threading._shutdown.
817 _shutdown
= _MainThread()._exitfunc
819 # get thread-local implementation, either from the thread
820 # module, or from the python fallback
823 from thread
import _local
as local
825 from _threading_local
import local
829 # This function is called by Python/ceval.c:PyEval_ReInitThreads which
830 # is called from PyOS_AfterFork. Here we cleanup threading module state
831 # that should not exist after a fork.
833 # Reset _active_limbo_lock, in case we forked while the lock was held
834 # by another (non-forked) thread. http://bugs.python.org/issue874900
835 global _active_limbo_lock
836 _active_limbo_lock
= _allocate_lock()
838 # fork() only copied the current thread; clear references to others.
840 current
= current_thread()
841 with _active_limbo_lock
:
842 for ident
, thread
in _active
.iteritems():
843 if thread
is current
:
844 # There is only one active thread.
845 new_active
[ident
] = thread
847 # All the others are already stopped.
848 # We don't call _Thread__stop() because it tries to acquire
849 # thread._Thread__block which could also have been held while
851 thread
._Thread
__stopped
= True
855 _active
.update(new_active
)
856 assert len(_active
) == 1
863 class BoundedQueue(_Verbose
):
865 def __init__(self
, limit
):
866 _Verbose
.__init
__(self
)
868 self
.rc
= Condition(self
.mon
)
869 self
.wc
= Condition(self
.mon
)
875 while len(self
.queue
) >= self
.limit
:
876 self
._note
("put(%s): queue full", item
)
878 self
.queue
.append(item
)
879 self
._note
("put(%s): appended, length now %d",
880 item
, len(self
.queue
))
886 while not self
.queue
:
887 self
._note
("get(): queue empty")
889 item
= self
.queue
.popleft()
890 self
._note
("get(): got %s, %d left", item
, len(self
.queue
))
895 class ProducerThread(Thread
):
897 def __init__(self
, queue
, quota
):
898 Thread
.__init
__(self
, name
="Producer")
903 from random
import random
905 while counter
< self
.quota
:
906 counter
= counter
+ 1
907 self
.queue
.put("%s.%d" % (self
.get_name(), counter
))
908 _sleep(random() * 0.00001)
911 class ConsumerThread(Thread
):
913 def __init__(self
, queue
, count
):
914 Thread
.__init
__(self
, name
="Consumer")
919 while self
.count
> 0:
920 item
= self
.queue
.get()
922 self
.count
= self
.count
- 1
931 t
= ProducerThread(Q
, NI
)
932 t
.setName("Producer-%d" % (i
+1))
934 C
= ConsumerThread(Q
, NI
*NP
)
943 if __name__
== '__main__':