Tweak the comments and formatting.
[python.git] / Lib / threading.py
blobd497a46dc7b8773f6e387a1a05f43244e9d996ae
1 """Thread module emulating a subset of Java's threading model."""
3 import sys as _sys
5 try:
6 import thread
7 except ImportError:
8 del _sys.modules[__name__]
9 raise
11 import warnings
12 from time import time as _time, sleep as _sleep
13 from traceback import format_exc as _format_exc
14 from collections import deque
16 # Rename some stuff so "from threading import *" is safe
17 __all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event',
18 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
19 'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
21 _start_new_thread = thread.start_new_thread
22 _allocate_lock = thread.allocate_lock
23 _get_ident = thread.get_ident
24 ThreadError = thread.error
25 del thread
28 # sys.exc_clear is used to work around the fact that except blocks
29 # don't fully clear the exception until 3.0.
30 warnings.filterwarnings('ignore', category=DeprecationWarning,
31 module='threading', message='sys.exc_clear')
34 # Debug support (adapted from ihooks.py).
35 # All the major classes here derive from _Verbose. We force that to
36 # be a new-style class so that all the major classes here are new-style.
37 # This helps debugging (type(instance) is more revealing for instances
38 # of new-style classes).
40 _VERBOSE = False
42 if __debug__:
44 class _Verbose(object):
46 def __init__(self, verbose=None):
47 if verbose is None:
48 verbose = _VERBOSE
49 self.__verbose = verbose
51 def _note(self, format, *args):
52 if self.__verbose:
53 format = format % args
54 format = "%s: %s\n" % (
55 currentThread().getName(), format)
56 _sys.stderr.write(format)
58 else:
59 # Disable this when using "python -O"
60 class _Verbose(object):
61 def __init__(self, verbose=None):
62 pass
63 def _note(self, *args):
64 pass
66 # Support for profile and trace hooks
68 _profile_hook = None
69 _trace_hook = None
71 def setprofile(func):
72 global _profile_hook
73 _profile_hook = func
75 def settrace(func):
76 global _trace_hook
77 _trace_hook = func
79 # Synchronization classes
81 Lock = _allocate_lock
83 def RLock(*args, **kwargs):
84 return _RLock(*args, **kwargs)
86 class _RLock(_Verbose):
88 def __init__(self, verbose=None):
89 _Verbose.__init__(self, verbose)
90 self.__block = _allocate_lock()
91 self.__owner = None
92 self.__count = 0
94 def __repr__(self):
95 owner = self.__owner
96 return "<%s(%s, %d)>" % (
97 self.__class__.__name__,
98 owner and owner.getName(),
99 self.__count)
101 def acquire(self, blocking=1):
102 me = currentThread()
103 if self.__owner is me:
104 self.__count = self.__count + 1
105 if __debug__:
106 self._note("%s.acquire(%s): recursive success", self, blocking)
107 return 1
108 rc = self.__block.acquire(blocking)
109 if rc:
110 self.__owner = me
111 self.__count = 1
112 if __debug__:
113 self._note("%s.acquire(%s): initial success", self, blocking)
114 else:
115 if __debug__:
116 self._note("%s.acquire(%s): failure", self, blocking)
117 return rc
119 __enter__ = acquire
121 def release(self):
122 if self.__owner is not currentThread():
123 raise RuntimeError("cannot release un-aquired lock")
124 self.__count = count = self.__count - 1
125 if not count:
126 self.__owner = None
127 self.__block.release()
128 if __debug__:
129 self._note("%s.release(): final release", self)
130 else:
131 if __debug__:
132 self._note("%s.release(): non-final release", self)
134 def __exit__(self, t, v, tb):
135 self.release()
137 # Internal methods used by condition variables
139 def _acquire_restore(self, (count, owner)):
140 self.__block.acquire()
141 self.__count = count
142 self.__owner = owner
143 if __debug__:
144 self._note("%s._acquire_restore()", self)
146 def _release_save(self):
147 if __debug__:
148 self._note("%s._release_save()", self)
149 count = self.__count
150 self.__count = 0
151 owner = self.__owner
152 self.__owner = None
153 self.__block.release()
154 return (count, owner)
156 def _is_owned(self):
157 return self.__owner is currentThread()
160 def Condition(*args, **kwargs):
161 return _Condition(*args, **kwargs)
163 class _Condition(_Verbose):
165 def __init__(self, lock=None, verbose=None):
166 _Verbose.__init__(self, verbose)
167 if lock is None:
168 lock = RLock()
169 self.__lock = lock
170 # Export the lock's acquire() and release() methods
171 self.acquire = lock.acquire
172 self.release = lock.release
173 # If the lock defines _release_save() and/or _acquire_restore(),
174 # these override the default implementations (which just call
175 # release() and acquire() on the lock). Ditto for _is_owned().
176 try:
177 self._release_save = lock._release_save
178 except AttributeError:
179 pass
180 try:
181 self._acquire_restore = lock._acquire_restore
182 except AttributeError:
183 pass
184 try:
185 self._is_owned = lock._is_owned
186 except AttributeError:
187 pass
188 self.__waiters = []
190 def __enter__(self):
191 return self.__lock.__enter__()
193 def __exit__(self, *args):
194 return self.__lock.__exit__(*args)
196 def __repr__(self):
197 return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
199 def _release_save(self):
200 self.__lock.release() # No state to save
202 def _acquire_restore(self, x):
203 self.__lock.acquire() # Ignore saved state
205 def _is_owned(self):
206 # Return True if lock is owned by currentThread.
207 # This method is called only if __lock doesn't have _is_owned().
208 if self.__lock.acquire(0):
209 self.__lock.release()
210 return False
211 else:
212 return True
214 def wait(self, timeout=None):
215 if not self._is_owned():
216 raise RuntimeError("cannot wait on un-aquired lock")
217 waiter = _allocate_lock()
218 waiter.acquire()
219 self.__waiters.append(waiter)
220 saved_state = self._release_save()
221 try: # restore state no matter what (e.g., KeyboardInterrupt)
222 if timeout is None:
223 waiter.acquire()
224 if __debug__:
225 self._note("%s.wait(): got it", self)
226 else:
227 # Balancing act: We can't afford a pure busy loop, so we
228 # have to sleep; but if we sleep the whole timeout time,
229 # we'll be unresponsive. The scheme here sleeps very
230 # little at first, longer as time goes on, but never longer
231 # than 20 times per second (or the timeout time remaining).
232 endtime = _time() + timeout
233 delay = 0.0005 # 500 us -> initial delay of 1 ms
234 while True:
235 gotit = waiter.acquire(0)
236 if gotit:
237 break
238 remaining = endtime - _time()
239 if remaining <= 0:
240 break
241 delay = min(delay * 2, remaining, .05)
242 _sleep(delay)
243 if not gotit:
244 if __debug__:
245 self._note("%s.wait(%s): timed out", self, timeout)
246 try:
247 self.__waiters.remove(waiter)
248 except ValueError:
249 pass
250 else:
251 if __debug__:
252 self._note("%s.wait(%s): got it", self, timeout)
253 finally:
254 self._acquire_restore(saved_state)
256 def notify(self, n=1):
257 if not self._is_owned():
258 raise RuntimeError("cannot notify on un-aquired lock")
259 __waiters = self.__waiters
260 waiters = __waiters[:n]
261 if not waiters:
262 if __debug__:
263 self._note("%s.notify(): no waiters", self)
264 return
265 self._note("%s.notify(): notifying %d waiter%s", self, n,
266 n!=1 and "s" or "")
267 for waiter in waiters:
268 waiter.release()
269 try:
270 __waiters.remove(waiter)
271 except ValueError:
272 pass
274 def notifyAll(self):
275 self.notify(len(self.__waiters))
278 def Semaphore(*args, **kwargs):
279 return _Semaphore(*args, **kwargs)
281 class _Semaphore(_Verbose):
283 # After Tim Peters' semaphore class, but not quite the same (no maximum)
285 def __init__(self, value=1, verbose=None):
286 if value < 0:
287 raise ValueError("semaphore initial value must be >= 0")
288 _Verbose.__init__(self, verbose)
289 self.__cond = Condition(Lock())
290 self.__value = value
292 def acquire(self, blocking=1):
293 rc = False
294 self.__cond.acquire()
295 while self.__value == 0:
296 if not blocking:
297 break
298 if __debug__:
299 self._note("%s.acquire(%s): blocked waiting, value=%s",
300 self, blocking, self.__value)
301 self.__cond.wait()
302 else:
303 self.__value = self.__value - 1
304 if __debug__:
305 self._note("%s.acquire: success, value=%s",
306 self, self.__value)
307 rc = True
308 self.__cond.release()
309 return rc
311 __enter__ = acquire
313 def release(self):
314 self.__cond.acquire()
315 self.__value = self.__value + 1
316 if __debug__:
317 self._note("%s.release: success, value=%s",
318 self, self.__value)
319 self.__cond.notify()
320 self.__cond.release()
322 def __exit__(self, t, v, tb):
323 self.release()
326 def BoundedSemaphore(*args, **kwargs):
327 return _BoundedSemaphore(*args, **kwargs)
329 class _BoundedSemaphore(_Semaphore):
330 """Semaphore that checks that # releases is <= # acquires"""
331 def __init__(self, value=1, verbose=None):
332 _Semaphore.__init__(self, value, verbose)
333 self._initial_value = value
335 def release(self):
336 if self._Semaphore__value >= self._initial_value:
337 raise ValueError, "Semaphore released too many times"
338 return _Semaphore.release(self)
341 def Event(*args, **kwargs):
342 return _Event(*args, **kwargs)
344 class _Event(_Verbose):
346 # After Tim Peters' event class (without is_posted())
348 def __init__(self, verbose=None):
349 _Verbose.__init__(self, verbose)
350 self.__cond = Condition(Lock())
351 self.__flag = False
353 def isSet(self):
354 return self.__flag
356 def set(self):
357 self.__cond.acquire()
358 try:
359 self.__flag = True
360 self.__cond.notifyAll()
361 finally:
362 self.__cond.release()
364 def clear(self):
365 self.__cond.acquire()
366 try:
367 self.__flag = False
368 finally:
369 self.__cond.release()
371 def wait(self, timeout=None):
372 self.__cond.acquire()
373 try:
374 if not self.__flag:
375 self.__cond.wait(timeout)
376 finally:
377 self.__cond.release()
379 # Helper to generate new thread names
380 _counter = 0
381 def _newname(template="Thread-%d"):
382 global _counter
383 _counter = _counter + 1
384 return template % _counter
386 # Active thread administration
387 _active_limbo_lock = _allocate_lock()
388 _active = {} # maps thread id to Thread object
389 _limbo = {}
392 # Main class for threads
394 class Thread(_Verbose):
396 __initialized = False
397 # Need to store a reference to sys.exc_info for printing
398 # out exceptions when a thread tries to use a global var. during interp.
399 # shutdown and thus raises an exception about trying to perform some
400 # operation on/with a NoneType
401 __exc_info = _sys.exc_info
402 # Keep sys.exc_clear too to clear the exception just before
403 # allowing .join() to return.
404 __exc_clear = _sys.exc_clear
406 def __init__(self, group=None, target=None, name=None,
407 args=(), kwargs=None, verbose=None):
408 assert group is None, "group argument must be None for now"
409 _Verbose.__init__(self, verbose)
410 if kwargs is None:
411 kwargs = {}
412 self.__target = target
413 self.__name = str(name or _newname())
414 self.__args = args
415 self.__kwargs = kwargs
416 self.__daemonic = self._set_daemon()
417 self.__started = Event()
418 self.__stopped = False
419 self.__block = Condition(Lock())
420 self.__initialized = True
421 # sys.stderr is not stored in the class like
422 # sys.exc_info since it can be changed between instances
423 self.__stderr = _sys.stderr
425 def _set_daemon(self):
426 # Overridden in _MainThread and _DummyThread
427 return currentThread().isDaemon()
429 def __repr__(self):
430 assert self.__initialized, "Thread.__init__() was not called"
431 status = "initial"
432 if self.__started.isSet():
433 status = "started"
434 if self.__stopped:
435 status = "stopped"
436 if self.__daemonic:
437 status = status + " daemon"
438 return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
440 def start(self):
441 if not self.__initialized:
442 raise RuntimeError("thread.__init__() not called")
443 if self.__started.isSet():
444 raise RuntimeError("thread already started")
445 if __debug__:
446 self._note("%s.start(): starting thread", self)
447 _active_limbo_lock.acquire()
448 _limbo[self] = self
449 _active_limbo_lock.release()
450 _start_new_thread(self.__bootstrap, ())
451 self.__started.wait()
453 def run(self):
454 try:
455 if self.__target:
456 self.__target(*self.__args, **self.__kwargs)
457 finally:
458 # Avoid a refcycle if the thread is running a function with
459 # an argument that has a member that points to the thread.
460 del self.__target, self.__args, self.__kwargs
462 def __bootstrap(self):
463 # Wrapper around the real bootstrap code that ignores
464 # exceptions during interpreter cleanup. Those typically
465 # happen when a daemon thread wakes up at an unfortunate
466 # moment, finds the world around it destroyed, and raises some
467 # random exception *** while trying to report the exception in
468 # __bootstrap_inner() below ***. Those random exceptions
469 # don't help anybody, and they confuse users, so we suppress
470 # them. We suppress them only when it appears that the world
471 # indeed has already been destroyed, so that exceptions in
472 # __bootstrap_inner() during normal business hours are properly
473 # reported. Also, we only suppress them for daemonic threads;
474 # if a non-daemonic encounters this, something else is wrong.
475 try:
476 self.__bootstrap_inner()
477 except:
478 if self.__daemonic and _sys is None:
479 return
480 raise
482 def __bootstrap_inner(self):
483 try:
484 self.__started.set()
485 _active_limbo_lock.acquire()
486 _active[_get_ident()] = self
487 del _limbo[self]
488 _active_limbo_lock.release()
489 if __debug__:
490 self._note("%s.__bootstrap(): thread started", self)
492 if _trace_hook:
493 self._note("%s.__bootstrap(): registering trace hook", self)
494 _sys.settrace(_trace_hook)
495 if _profile_hook:
496 self._note("%s.__bootstrap(): registering profile hook", self)
497 _sys.setprofile(_profile_hook)
499 try:
500 self.run()
501 except SystemExit:
502 if __debug__:
503 self._note("%s.__bootstrap(): raised SystemExit", self)
504 except:
505 if __debug__:
506 self._note("%s.__bootstrap(): unhandled exception", self)
507 # If sys.stderr is no more (most likely from interpreter
508 # shutdown) use self.__stderr. Otherwise still use sys (as in
509 # _sys) in case sys.stderr was redefined since the creation of
510 # self.
511 if _sys:
512 _sys.stderr.write("Exception in thread %s:\n%s\n" %
513 (self.getName(), _format_exc()))
514 else:
515 # Do the best job possible w/o a huge amt. of code to
516 # approximate a traceback (code ideas from
517 # Lib/traceback.py)
518 exc_type, exc_value, exc_tb = self.__exc_info()
519 try:
520 print>>self.__stderr, (
521 "Exception in thread " + self.getName() +
522 " (most likely raised during interpreter shutdown):")
523 print>>self.__stderr, (
524 "Traceback (most recent call last):")
525 while exc_tb:
526 print>>self.__stderr, (
527 ' File "%s", line %s, in %s' %
528 (exc_tb.tb_frame.f_code.co_filename,
529 exc_tb.tb_lineno,
530 exc_tb.tb_frame.f_code.co_name))
531 exc_tb = exc_tb.tb_next
532 print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
533 # Make sure that exc_tb gets deleted since it is a memory
534 # hog; deleting everything else is just for thoroughness
535 finally:
536 del exc_type, exc_value, exc_tb
537 else:
538 if __debug__:
539 self._note("%s.__bootstrap(): normal return", self)
540 finally:
541 # Prevent a race in
542 # test_threading.test_no_refcycle_through_target when
543 # the exception keeps the target alive past when we
544 # assert that it's dead.
545 self.__exc_clear()
546 finally:
547 with _active_limbo_lock:
548 self.__stop()
549 try:
550 # We don't call self.__delete() because it also
551 # grabs _active_limbo_lock.
552 del _active[_get_ident()]
553 except:
554 pass
556 def __stop(self):
557 self.__block.acquire()
558 self.__stopped = True
559 self.__block.notifyAll()
560 self.__block.release()
562 def __delete(self):
563 "Remove current thread from the dict of currently running threads."
565 # Notes about running with dummy_thread:
567 # Must take care to not raise an exception if dummy_thread is being
568 # used (and thus this module is being used as an instance of
569 # dummy_threading). dummy_thread.get_ident() always returns -1 since
570 # there is only one thread if dummy_thread is being used. Thus
571 # len(_active) is always <= 1 here, and any Thread instance created
572 # overwrites the (if any) thread currently registered in _active.
574 # An instance of _MainThread is always created by 'threading'. This
575 # gets overwritten the instant an instance of Thread is created; both
576 # threads return -1 from dummy_thread.get_ident() and thus have the
577 # same key in the dict. So when the _MainThread instance created by
578 # 'threading' tries to clean itself up when atexit calls this method
579 # it gets a KeyError if another Thread instance was created.
581 # This all means that KeyError from trying to delete something from
582 # _active if dummy_threading is being used is a red herring. But
583 # since it isn't if dummy_threading is *not* being used then don't
584 # hide the exception.
586 try:
587 with _active_limbo_lock:
588 del _active[_get_ident()]
589 # There must not be any python code between the previous line
590 # and after the lock is released. Otherwise a tracing function
591 # could try to acquire the lock again in the same thread, (in
592 # currentThread()), and would block.
593 except KeyError:
594 if 'dummy_threading' not in _sys.modules:
595 raise
597 def join(self, timeout=None):
598 if not self.__initialized:
599 raise RuntimeError("Thread.__init__() not called")
600 if not self.__started.isSet():
601 raise RuntimeError("cannot join thread before it is started")
602 if self is currentThread():
603 raise RuntimeError("cannot join current thread")
605 if __debug__:
606 if not self.__stopped:
607 self._note("%s.join(): waiting until thread stops", self)
608 self.__block.acquire()
609 try:
610 if timeout is None:
611 while not self.__stopped:
612 self.__block.wait()
613 if __debug__:
614 self._note("%s.join(): thread stopped", self)
615 else:
616 deadline = _time() + timeout
617 while not self.__stopped:
618 delay = deadline - _time()
619 if delay <= 0:
620 if __debug__:
621 self._note("%s.join(): timed out", self)
622 break
623 self.__block.wait(delay)
624 else:
625 if __debug__:
626 self._note("%s.join(): thread stopped", self)
627 finally:
628 self.__block.release()
630 def getName(self):
631 assert self.__initialized, "Thread.__init__() not called"
632 return self.__name
634 def setName(self, name):
635 assert self.__initialized, "Thread.__init__() not called"
636 self.__name = str(name)
638 def isAlive(self):
639 assert self.__initialized, "Thread.__init__() not called"
640 return self.__started.isSet() and not self.__stopped
642 def isDaemon(self):
643 assert self.__initialized, "Thread.__init__() not called"
644 return self.__daemonic
646 def setDaemon(self, daemonic):
647 if not self.__initialized:
648 raise RuntimeError("Thread.__init__() not called")
649 if self.__started.isSet():
650 raise RuntimeError("cannot set daemon status of active thread");
651 self.__daemonic = daemonic
653 # The timer class was contributed by Itamar Shtull-Trauring
655 def Timer(*args, **kwargs):
656 return _Timer(*args, **kwargs)
658 class _Timer(Thread):
659 """Call a function after a specified number of seconds:
661 t = Timer(30.0, f, args=[], kwargs={})
662 t.start()
663 t.cancel() # stop the timer's action if it's still waiting
666 def __init__(self, interval, function, args=[], kwargs={}):
667 Thread.__init__(self)
668 self.interval = interval
669 self.function = function
670 self.args = args
671 self.kwargs = kwargs
672 self.finished = Event()
674 def cancel(self):
675 """Stop the timer if it hasn't finished yet"""
676 self.finished.set()
678 def run(self):
679 self.finished.wait(self.interval)
680 if not self.finished.isSet():
681 self.function(*self.args, **self.kwargs)
682 self.finished.set()
684 # Special thread class to represent the main thread
685 # This is garbage collected through an exit handler
687 class _MainThread(Thread):
689 def __init__(self):
690 Thread.__init__(self, name="MainThread")
691 self._Thread__started.set()
692 _active_limbo_lock.acquire()
693 _active[_get_ident()] = self
694 _active_limbo_lock.release()
696 def _set_daemon(self):
697 return False
699 def _exitfunc(self):
700 self._Thread__stop()
701 t = _pickSomeNonDaemonThread()
702 if t:
703 if __debug__:
704 self._note("%s: waiting for other threads", self)
705 while t:
706 t.join()
707 t = _pickSomeNonDaemonThread()
708 if __debug__:
709 self._note("%s: exiting", self)
710 self._Thread__delete()
712 def _pickSomeNonDaemonThread():
713 for t in enumerate():
714 if not t.isDaemon() and t.isAlive():
715 return t
716 return None
719 # Dummy thread class to represent threads not started here.
720 # These aren't garbage collected when they die, nor can they be waited for.
721 # If they invoke anything in threading.py that calls currentThread(), they
722 # leave an entry in the _active dict forever after.
723 # Their purpose is to return *something* from currentThread().
724 # They are marked as daemon threads so we won't wait for them
725 # when we exit (conform previous semantics).
727 class _DummyThread(Thread):
729 def __init__(self):
730 Thread.__init__(self, name=_newname("Dummy-%d"))
732 # Thread.__block consumes an OS-level locking primitive, which
733 # can never be used by a _DummyThread. Since a _DummyThread
734 # instance is immortal, that's bad, so release this resource.
735 del self._Thread__block
737 self._Thread__started.set()
738 _active_limbo_lock.acquire()
739 _active[_get_ident()] = self
740 _active_limbo_lock.release()
742 def _set_daemon(self):
743 return True
745 def join(self, timeout=None):
746 assert False, "cannot join a dummy thread"
749 # Global API functions
751 def currentThread():
752 try:
753 return _active[_get_ident()]
754 except KeyError:
755 ##print "currentThread(): no current thread for", _get_ident()
756 return _DummyThread()
758 def activeCount():
759 _active_limbo_lock.acquire()
760 count = len(_active) + len(_limbo)
761 _active_limbo_lock.release()
762 return count
764 def enumerate():
765 _active_limbo_lock.acquire()
766 active = _active.values() + _limbo.values()
767 _active_limbo_lock.release()
768 return active
770 from thread import stack_size
772 # Create the main thread object,
773 # and make it available for the interpreter
774 # (Py_Main) as threading._shutdown.
776 _shutdown = _MainThread()._exitfunc
778 # get thread-local implementation, either from the thread
779 # module, or from the python fallback
781 try:
782 from thread import _local as local
783 except ImportError:
784 from _threading_local import local
787 # Self-test code
789 def _test():
791 class BoundedQueue(_Verbose):
793 def __init__(self, limit):
794 _Verbose.__init__(self)
795 self.mon = RLock()
796 self.rc = Condition(self.mon)
797 self.wc = Condition(self.mon)
798 self.limit = limit
799 self.queue = deque()
801 def put(self, item):
802 self.mon.acquire()
803 while len(self.queue) >= self.limit:
804 self._note("put(%s): queue full", item)
805 self.wc.wait()
806 self.queue.append(item)
807 self._note("put(%s): appended, length now %d",
808 item, len(self.queue))
809 self.rc.notify()
810 self.mon.release()
812 def get(self):
813 self.mon.acquire()
814 while not self.queue:
815 self._note("get(): queue empty")
816 self.rc.wait()
817 item = self.queue.popleft()
818 self._note("get(): got %s, %d left", item, len(self.queue))
819 self.wc.notify()
820 self.mon.release()
821 return item
823 class ProducerThread(Thread):
825 def __init__(self, queue, quota):
826 Thread.__init__(self, name="Producer")
827 self.queue = queue
828 self.quota = quota
830 def run(self):
831 from random import random
832 counter = 0
833 while counter < self.quota:
834 counter = counter + 1
835 self.queue.put("%s.%d" % (self.getName(), counter))
836 _sleep(random() * 0.00001)
839 class ConsumerThread(Thread):
841 def __init__(self, queue, count):
842 Thread.__init__(self, name="Consumer")
843 self.queue = queue
844 self.count = count
846 def run(self):
847 while self.count > 0:
848 item = self.queue.get()
849 print item
850 self.count = self.count - 1
852 NP = 3
853 QL = 4
854 NI = 5
856 Q = BoundedQueue(QL)
857 P = []
858 for i in range(NP):
859 t = ProducerThread(Q, NI)
860 t.setName("Producer-%d" % (i+1))
861 P.append(t)
862 C = ConsumerThread(Q, NI*NP)
863 for t in P:
864 t.start()
865 _sleep(0.000001)
866 C.start()
867 for t in P:
868 t.join()
869 C.join()
871 if __name__ == '__main__':
872 _test()