3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Manage the lifecycle of runtime processes and dispatch requests to them."""
27 from google
.appengine
.tools
.devappserver2
import errors
31 READY_REQUEST
= 1 # A warmup request i.e. /_ah/warmup.
32 BACKGROUND_REQUEST
= 2 # A request to create a background thread.
33 SHUTDOWN_REQUEST
= 3 # A request to stop the module i.e. /_ah/stop.
34 # A request to send a command to the module for evaluation e.g. for use by
36 INTERACTIVE_REQUEST
= 4
38 # Constants for use with FILE_CHANGE_INSTANCE_RESTART_POLICY. These constants
39 # determine whether an instance will be restarted if a file is changed in
40 # the application_root or any directory returned by
41 # InstanceFactory.get_restart_directories.
42 ALWAYS
= 0 # Always restart instances.
43 AFTER_FIRST_REQUEST
= 1 # Restart instances that have received >= 1 request.
44 NEVER
= 2 # Never restart instances.
47 class CannotAcceptRequests(errors
.Error
):
48 """An Instance cannot accept a request e.g. because it is quitting."""
51 class CannotQuitServingInstance(errors
.Error
):
52 """An Instance cannot be quit e.g. because it is handling a request."""
55 class InvalidInstanceId(errors
.Error
):
56 """The requested instance id is not serving."""
59 class RuntimeProxy(object):
60 """Abstract base class for a subclass that manages a runtime process."""
62 def handle(self
, environ
, start_response
, url_map
, match
, request_id
,
64 """Serves this request by forwarding it to the runtime process.
67 environ: An environ dict for the request as defined in PEP-333.
68 start_response: A function with semantics defined in PEP-333.
69 url_map: An appinfo.URLMap instance containing the configuration for the
70 handler matching this request.
71 match: A re.MatchObject containing the result of the matched URL pattern.
72 request_id: A unique string id associated with the request.
73 request_type: The type of the request. See instance.*_REQUEST module
77 A sequence of strings containing the body of the HTTP response.
79 raise NotImplementedError()
82 """Starts the runtime process and waits until it is ready to serve."""
83 raise NotImplementedError()
86 """Terminates the runtime process."""
87 raise NotImplementedError()
90 class Instance(object):
91 """Handle requests through a RuntimeProxy."""
97 max_concurrent_requests
,
98 max_background_threads
=0,
99 expect_ready_request
=False):
100 """Initializer for Instance.
103 request_data: A wsgi_request_info.WSGIRequestInfo that will be provided
104 with request information for use by API stubs.
105 instance_id: A string or integer representing the unique (per module) id
107 runtime_proxy: A RuntimeProxy instance that will be used to handle
109 max_concurrent_requests: The maximum number of concurrent requests that
110 the instance can handle. If the instance does not support concurrent
111 requests then the value should be 1.
112 max_background_threads: The maximum number of background threads that
113 the instance can handle. If the instance does not support background
114 threads then the value should be 0.
115 expect_ready_request: If True then the instance will be sent a special
116 request (i.e. /_ah/warmup or /_ah/start) before it can handle external
119 self
._request
_data
= request_data
120 self
._instance
_id
= instance_id
121 self
._max
_concurrent
_requests
= max_concurrent_requests
122 self
._max
_background
_threads
= max_background_threads
123 self
._runtime
_proxy
= runtime_proxy
125 self
._condition
= threading
.Condition()
127 self
._num
_outstanding
_requests
= 0 # Protected by self._condition.
128 self
._num
_running
_background
_threads
= 0 # Protected by self._condition.
129 self
._total
_requests
= 0 # Protected by self._condition.
130 self
._started
= False # Protected by self._condition.
131 self
._quitting
= False # Protected by self._condition.
132 self
._quit
= False # Protected by self._condition.
133 self
._last
_request
_end
_time
= time
.time() # Protected by self._condition.
134 self
._expecting
_ready
_request
= expect_ready_request
135 self
._expecting
_shutdown
_request
= False
138 # A deque containg (start_time, end_time) 2-tuples representing completed
139 # requests. This is used to compute latency and qps statistics.
140 self
._request
_history
= collections
.deque() # Protected by self._condition.
144 if not self
._started
:
145 statuses
.append('not started')
147 statuses
.append('quitting')
149 statuses
.append('quit')
150 if self
._expecting
_ready
_request
:
151 statuses
.append('handling ready request')
154 status
= ' [%s]' % ' '.join(statuses
)
158 return '<Instance %s: %d/%d, total: %d%s>' % (
160 self
._num
_outstanding
_requests
,
161 self
._max
_concurrent
_requests
,
162 self
._total
_requests
,
166 def instance_id(self
):
167 """The unique string or integer id for the Instance."""
168 return self
._instance
_id
171 def total_requests(self
):
172 """The total number requests that the Instance has handled."""
173 with self
._condition
:
174 return self
._total
_requests
177 def remaining_request_capacity(self
):
178 """The number of extra requests that the Instance can currently handle."""
179 with self
._condition
:
180 return self
._max
_concurrent
_requests
- self
._num
_outstanding
_requests
183 def remaining_background_thread_capacity(self
):
184 """The number of extra background threads the Instance can handle."""
185 with self
._condition
:
186 return self
._max
_background
_threads
- self
._num
_running
_background
_threads
189 def num_outstanding_requests(self
):
190 """The number of requests that the Instance is currently handling."""
191 with self
._condition
:
192 return self
._num
_outstanding
_requests
195 def idle_seconds(self
):
196 """The number of seconds that the Instance has been idle.
198 Will be 0.0 if the Instance has not started.
200 with self
._condition
:
201 if self
._num
_outstanding
_requests
:
203 elif not self
._started
:
206 return time
.time() - self
._last
_request
_end
_time
209 def handling_ready_request(self
):
210 """True if the Instance is handling or will be sent a ready request."""
211 return self
._expecting
_ready
_request
213 def get_latency_60s(self
):
214 """Returns the average request latency over the last 60s in seconds."""
215 with self
._condition
:
216 self
._trim
_request
_history
_to
_60s
()
217 if not self
._request
_history
:
221 end
- start
for (start
, end
) in self
._request
_history
)
222 return total_latency
/ len(self
._request
_history
)
224 def get_qps_60s(self
):
225 """Returns the average queries-per-second over the last 60 seconds."""
226 with self
._condition
:
227 self
._trim
_request
_history
_to
_60s
()
228 if not self
._request
_history
:
231 return len(self
._request
_history
) / 60.0
235 with self
._condition
:
236 return self
._quit
or self
._quitting
or self
._expecting
_shutdown
_request
239 def can_accept_requests(self
):
240 """True if .handle() will accept requests.
242 Does not consider outstanding request volume.
244 with self
._condition
:
245 return (not self
._quit
and
246 not self
._quitting
and
247 not self
._expecting
_ready
_request
and
248 not self
._expecting
_shutdown
_request
and
252 def _trim_request_history_to_60s(self
):
253 """Removes obsolete entries from _outstanding_request_history."""
254 window_start
= time
.time() - 60
255 with self
._condition
:
256 while self
._request
_history
:
257 t
, _
= self
._request
_history
[0]
259 self
._request
_history
.popleft()
264 """Start the instance and the RuntimeProxy.
267 True if the Instance was started or False, if the Instance has already
270 with self
._condition
:
273 self
._runtime
_proxy
.start()
274 with self
._condition
:
276 self
._runtime
_proxy
.quit()
278 self
._last
_request
_end
_time
= time
.time()
280 logging
.debug('Started instance: %s', self
)
283 def quit(self
, allow_async
=False, force
=False, expect_shutdown
=False):
284 """Quits the instance and the RuntimeProxy.
287 allow_async: Whether to enqueue the quit after all requests have completed
288 if the instance cannot be quit immediately.
289 force: Whether to force the instance to quit even if the instance is
290 currently handling a request. This overrides allow_async if True.
291 expect_shutdown: Whether the instance will be sent a shutdown request.
294 CannotQuitServingInstance: if the Instance is currently handling a
295 request and allow_async is False.
297 with self
._condition
:
300 if not self
._started
:
304 self
._expecting
_shutdown
_request
= True
306 if (self
._num
_outstanding
_requests
or
307 self
._num
_running
_background
_threads
or
308 self
._expecting
_shutdown
_request
):
310 if allow_async
or expect_shutdown
:
311 self
._quitting
= True
313 raise CannotQuitServingInstance()
315 self
._runtime
_proxy
.quit()
316 self
._condition
.notify_all()
317 logging
.debug('Quit instance: %s', self
)
319 def reserve_background_thread(self
):
320 """Reserves a background thread slot.
323 CannotAcceptRequests: if the Instance is already handling the maximum
324 permissible number of background threads or is not in a state where it
325 can handle background threads.
327 with self
._condition
:
329 raise CannotAcceptRequests('Instance has been quit')
330 if not self
._started
:
331 raise CannotAcceptRequests('Instance has not started')
332 if not self
.remaining_background_thread_capacity
:
333 raise CannotAcceptRequests(
334 'Instance has no additional background thread capacity')
335 self
._num
_running
_background
_threads
+= 1
337 def handle(self
, environ
, start_response
, url_map
, match
, request_id
,
339 """Handles an HTTP request by forwarding it to the RuntimeProxy.
342 environ: An environ dict for the request as defined in PEP-333.
343 start_response: A function with semantics defined in PEP-333.
344 url_map: An appinfo.URLMap instance containing the configuration for the
345 handler matching this request.
346 match: A re.MatchObject containing the result of the matched URL pattern.
347 request_id: A unique string id associated with the request.
348 request_type: The type of the request. See *_REQUEST module constants.
351 An iterable over strings containing the body of the HTTP response.
354 CannotAcceptRequests: if the Instance has quit or is already handling the
355 maximum permissible number of concurrent requests.
357 start_time
= time
.time()
358 with self
._condition
:
360 raise CannotAcceptRequests('Instance has been quit')
361 if not self
._started
:
362 raise CannotAcceptRequests('Instance has not started')
364 if request_type
not in (BACKGROUND_REQUEST
, SHUTDOWN_REQUEST
):
366 raise CannotAcceptRequests('Instance is shutting down')
367 if self
._expecting
_ready
_request
and request_type
!= READY_REQUEST
:
368 raise CannotAcceptRequests('Instance is waiting for ready request')
369 if not self
.remaining_request_capacity
:
370 raise CannotAcceptRequests('Instance has no additional capacity')
371 self
._num
_outstanding
_requests
+= 1
373 self
._request
_data
.set_request_instance(request_id
, self
)
374 self
._total
_requests
+= 1
377 # Force the generator to complete so the code in the finally block runs
379 return list(self
._runtime
_proxy
.handle(environ
,
386 logging
.debug('Request handled by %s in %0.4fs',
387 self
, time
.time() - start_time
)
388 with self
._condition
:
389 if request_type
== READY_REQUEST
:
390 self
._expecting
_ready
_request
= False
391 if request_type
== BACKGROUND_REQUEST
:
392 self
._num
_running
_background
_threads
-= 1
393 elif request_type
!= SHUTDOWN_REQUEST
:
394 self
._num
_outstanding
_requests
-= 1
395 self
._last
_request
_end
_time
= time
.time()
396 self
._trim
_request
_history
_to
_60s
()
397 self
._request
_history
.append((start_time
, self
._last
_request
_end
_time
))
398 if request_type
== READY_REQUEST
:
399 self
._condition
.notify(self
._max
_concurrent
_requests
)
400 elif request_type
== SHUTDOWN_REQUEST
:
401 self
._expecting
_shutdown
_request
= False
402 self
.quit(allow_async
=True)
403 elif request_type
== NORMAL_REQUEST
:
404 self
._condition
.notify()
405 if (not self
._num
_outstanding
_requests
and
406 not self
._num
_running
_background
_threads
):
410 def wait(self
, timeout_time
):
411 """Wait for this instance to have capacity to serve a request.
414 timeout_time: A float containing a time in seconds since the epoch to wait
415 until before timing out.
418 True if the instance has request capacity or False if the timeout time was
419 reached or the instance has been quit.
421 with self
._condition
:
422 while (time
.time() < timeout_time
and not
423 (self
.remaining_request_capacity
and self
.can_accept_requests
)
424 and not self
.has_quit
):
425 self
._condition
.wait(timeout_time
- time
.time())
426 return bool(self
.remaining_request_capacity
and self
.can_accept_requests
)
428 def set_health(self
, health
):
429 self
._healthy
= health
436 class InstanceFactory(object):
437 """An abstract factory that creates instances for an InstancePool.
440 max_concurrent_requests: The maximum number of concurrent requests that
441 Instances created by this factory can handle. If the Instances do not
442 support concurrent requests then the value should be 1.
443 START_URL_MAP: An apinfo.URLMap that should be used as the default
444 /_ah/start handler if no user-specified script handler matches.
445 WARMUP_URL_MAP: An apinfo.URLMap that should be used as the default
446 /_ah/warmup handler if no user-specified script handler matches.
450 WARMUP_URL_MAP
= None
451 # If True then the runtime supports interactive command evaluation e.g. for
452 # use in interactive shells.
453 SUPPORTS_INTERACTIVE_REQUESTS
= False
454 # Controls how instances are restarted when a file relevant to the application
455 # is changed. Possible values: NEVER, AFTER_FIRST_RESTART, ALWAYS.
456 FILE_CHANGE_INSTANCE_RESTART_POLICY
= None
458 def __init__(self
, request_data
, max_concurrent_requests
,
459 max_background_threads
=0):
460 """Initializer for InstanceFactory.
463 request_data: A wsgi_request_info.WSGIRequestInfo instance that will be
464 populated with Instance data for use by the API stubs.
465 max_concurrent_requests: The maximum number of concurrent requests that
466 Instances created by this factory can handle. If the Instances do not
467 support concurrent requests then the value should be 1.
468 max_background_threads: The maximum number of background threads that
469 the instance can handle. If the instance does not support background
470 threads then the value should be 0.
472 self
.request_data
= request_data
473 self
.max_concurrent_requests
= max_concurrent_requests
474 self
.max_background_threads
= max_background_threads
476 def get_restart_directories(self
):
477 """Returns a list of directories changes in which should trigger a restart.
480 A list of directory paths. Changes (i.e. files added, deleted or modified)
481 in these directories will trigger the restart of all instances created
486 def files_changed(self
):
487 """Called when a file relevant to the factory *might* have changed."""
489 def configuration_changed(self
, config_changes
):
490 """Called when the configuration of the module has changed.
493 config_changes: A set containing the changes that occured. See the
494 *_CHANGED constants in the application_configuration module.
497 def new_instance(self
, instance_id
, expect_ready_request
=False):
498 """Create and return a new Instance.
501 instance_id: A string or integer representing the unique (per module) id
503 expect_ready_request: If True then the instance will be sent a special
504 request (i.e. /_ah/warmup or /_ah/start) before it can handle external
508 The newly created instance.Instance.
510 raise NotImplementedError()