1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """A utility to run functions with timeouts and retries."""
6 # pylint: disable=W0702
12 from devil
.utils
import reraiser_thread
13 from devil
.utils
import watchdog_timer
15 logger
= logging
.getLogger(__name__
)
18 class TimeoutRetryThreadGroup(reraiser_thread
.ReraiserThreadGroup
):
20 def __init__(self
, timeout
, threads
=None):
21 super(TimeoutRetryThreadGroup
, self
).__init
__(threads
)
22 self
._watcher
= watchdog_timer
.WatchdogTimer(timeout
)
25 """Returns the watchdog keeping track of this thread's time."""
28 def GetElapsedTime(self
):
29 return self
._watcher
.GetElapsed()
31 def GetRemainingTime(self
, required
=0, msg
=None):
32 """Get the remaining time before the thread times out.
34 Useful to send as the |timeout| parameter of async IO operations.
37 required: minimum amount of time that will be required to complete, e.g.,
38 some sleep or IO operation.
39 msg: error message to show if timing out.
42 The number of seconds remaining before the thread times out, or None
43 if the thread never times out.
46 reraiser_thread.TimeoutError if the remaining time is less than the
49 remaining
= self
._watcher
.GetRemaining()
50 if remaining
is not None and remaining
< required
:
52 msg
= 'Timeout expired'
54 msg
+= (', wait of %.1f secs required but only %.1f secs left'
55 % (required
, remaining
))
56 raise reraiser_thread
.TimeoutError(msg
)
60 def CurrentTimeoutThreadGroup():
61 """Returns the thread group that owns or is blocked on the active thread.
64 Returns None if no TimeoutRetryThreadGroup is tracking the current thread.
66 thread_group
= reraiser_thread
.CurrentThreadGroup()
68 if isinstance(thread_group
, TimeoutRetryThreadGroup
):
70 thread_group
= thread_group
.blocked_parent_thread_group
74 def WaitFor(condition
, wait_period
=5, max_tries
=None):
75 """Wait for a condition to become true.
77 Repeatedly call the function condition(), with no arguments, until it returns
80 If called within a TimeoutRetryThreadGroup, it cooperates nicely with it.
83 condition: function with the condition to check
84 wait_period: number of seconds to wait before retrying to check the
86 max_tries: maximum number of checks to make, the default tries forever
87 or until the TimeoutRetryThreadGroup expires.
90 The true value returned by the condition, or None if the condition was
91 not met after max_tries.
94 reraiser_thread.TimeoutError: if the current thread is a
95 TimeoutRetryThreadGroup and the timeout expires.
97 condition_name
= condition
.__name
__
98 timeout_thread_group
= CurrentTimeoutThreadGroup()
99 while max_tries
is None or max_tries
> 0:
101 if max_tries
is not None:
103 msg
= ['condition', repr(condition_name
), 'met' if result
else 'not met']
104 if timeout_thread_group
:
105 # pylint: disable=no-member
106 msg
.append('(%.1fs)' % timeout_thread_group
.GetElapsedTime())
107 logger
.info(' '.join(msg
))
110 if timeout_thread_group
:
111 # pylint: disable=no-member
112 timeout_thread_group
.GetRemainingTime(wait_period
,
113 msg
='Timed out waiting for %r' % condition_name
)
114 time
.sleep(wait_period
)
118 def AlwaysRetry(_exception
):
122 def Run(func
, timeout
, retries
, args
=None, kwargs
=None, desc
=None,
123 error_log_func
=logging
.critical
, retry_if_func
=AlwaysRetry
):
124 """Runs the passed function in a separate thread with timeouts and retries.
127 func: the function to be wrapped.
128 timeout: the timeout in seconds for each try.
129 retries: the number of retries.
130 args: list of positional args to pass to |func|.
131 kwargs: dictionary of keyword args to pass to |func|.
132 desc: An optional description of |func| used in logging. If omitted,
133 |func.__name__| will be used.
134 error_log_func: Logging function when logging errors.
135 retry_if_func: Unary callable that takes an exception and returns
136 whether |func| should be retried. Defaults to always retrying.
139 The return value of func(*args, **kwargs).
150 thread_name
= 'TimeoutThread-%d-for-%s' % (num_try
,
151 threading
.current_thread().name
)
152 child_thread
= reraiser_thread
.ReraiserThread(lambda: func(*args
, **kwargs
),
155 thread_group
= TimeoutRetryThreadGroup(timeout
, threads
=[child_thread
])
156 thread_group
.StartAll(will_block
=True)
158 thread_group
.JoinAll(watcher
=thread_group
.GetWatcher(), timeout
=60,
159 error_log_func
=error_log_func
)
160 if thread_group
.IsAlive():
161 logger
.info('Still working on %s', desc
)
163 return thread_group
.GetAllReturnValues()[0]
164 except reraiser_thread
.TimeoutError
as e
:
165 # Timeouts already get their stacks logged.
166 if num_try
> retries
or not retry_if_func(e
):
168 # Do not catch KeyboardInterrupt.
169 except Exception as e
: # pylint: disable=broad-except
170 if num_try
> retries
or not retry_if_func(e
):
173 '(%s) Exception on %s, attempt %d of %d: %r',
174 thread_name
, desc
, num_try
, retries
+ 1, e
)