3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Manage the lifecycle of runtime processes and dispatch requests to them."""
35 import wsgiref
.headers
37 from concurrent
import futures
39 from google
.appengine
.api
import api_base_pb
40 from google
.appengine
.api
import apiproxy_stub_map
41 from google
.appengine
.api
import appinfo
42 from google
.appengine
.api
import request_info
43 from google
.appengine
.api
.logservice
import log_service_pb
44 from google
.appengine
.tools
.devappserver2
import application_configuration
45 from google
.appengine
.tools
.devappserver2
import blob_image
46 from google
.appengine
.tools
.devappserver2
import blob_upload
47 from google
.appengine
.tools
.devappserver2
import channel
48 from google
.appengine
.tools
.devappserver2
import constants
49 from google
.appengine
.tools
.devappserver2
import endpoints
50 from google
.appengine
.tools
.devappserver2
import errors
51 from google
.appengine
.tools
.devappserver2
import file_watcher
52 from google
.appengine
.tools
.devappserver2
import gcs_server
53 from google
.appengine
.tools
.devappserver2
import go_runtime
54 from google
.appengine
.tools
.devappserver2
import health_check_service
55 from google
.appengine
.tools
.devappserver2
import http_runtime_constants
56 from google
.appengine
.tools
.devappserver2
import instance
58 from google
.appengine
.tools
.devappserver2
import java_runtime
61 from google
.appengine
.tools
.devappserver2
import login
62 from google
.appengine
.tools
.devappserver2
import php_runtime
63 from google
.appengine
.tools
.devappserver2
import python_runtime
64 from google
.appengine
.tools
.devappserver2
import request_rewriter
65 from google
.appengine
.tools
.devappserver2
import runtime_config_pb2
66 from google
.appengine
.tools
.devappserver2
import start_response_utils
67 from google
.appengine
.tools
.devappserver2
import static_files_handler
68 from google
.appengine
.tools
.devappserver2
import thread_executor
69 from google
.appengine
.tools
.devappserver2
import url_handler
70 from google
.appengine
.tools
.devappserver2
import util
71 from google
.appengine
.tools
.devappserver2
import vm_runtime_factory
72 from google
.appengine
.tools
.devappserver2
import wsgi_handler
73 from google
.appengine
.tools
.devappserver2
import wsgi_server
76 _LOWER_HEX_DIGITS
= string
.hexdigits
.lower()
77 _UPPER_HEX_DIGITS
= string
.hexdigits
.upper()
78 _REQUEST_ID_HASH_LENGTH
= 8
80 _THREAD_POOL
= thread_executor
.ThreadExecutor()
81 _RESTART_INSTANCES_CONFIG_CHANGES
= frozenset(
82 [application_configuration
.NORMALIZED_LIBRARIES_CHANGED
,
83 application_configuration
.SKIP_FILES_CHANGED
,
84 application_configuration
.NOBUILD_FILES_CHANGED
,
85 # The server must be restarted when the handlers change because files
86 # appearing in static content handlers make them unavailable to the
88 application_configuration
.HANDLERS_CHANGED
,
89 application_configuration
.ENV_VARIABLES_CHANGED
])
91 _REQUEST_LOGGING_BLACKLIST_RE
= re
.compile(
92 r
'^/_ah/(?:channel/(?:dev|jsapi)|img|login|upload)')
94 # Fake arguments for _handle_script_request for request types that don't use
95 # user-specified handlers.
96 _EMPTY_MATCH
= re
.match('', '')
97 _DUMMY_URLMAP
= appinfo
.URLMap(script
='/')
98 _SHUTDOWN_TIMEOUT
= 30
100 _MAX_UPLOAD_MEGABYTES
= 32
101 _MAX_UPLOAD_BYTES
= _MAX_UPLOAD_MEGABYTES
* 1024 * 1024
102 _MAX_UPLOAD_NO_TRIGGER_BAD_CLIENT_BYTES
= 64 * 1024 * 1024
104 _REDIRECT_HTML
= '''\
105 <HTML><HEAD><meta http-equiv="content-type" content="%(content-type)s">
106 <TITLE>%(status)d Moved</TITLE></HEAD>
107 <BODY><H1>%(status)d Moved</H1>
108 The document has moved'
109 <A HREF="%(correct-url)s">here</A>.
112 _TIMEOUT_HTML
= '<HTML><BODY>503 - This request has timed out.</BODY></HTML>'
114 # Factor applied to the request timeouts to compensate for the
115 # long vmengines reloads. TODO eventually remove that once we have
116 # optimized the vm_engine reload.
117 _VMENGINE_SLOWDOWN_FACTOR
= 2
120 def _static_files_regex_from_handlers(handlers
):
122 for url_map
in handlers
:
123 handler_type
= url_map
.GetHandlerType()
124 if url_map
.application_readable
:
126 if handler_type
== appinfo
.STATIC_FILES
:
127 patterns
.append(r
'(%s)' % url_map
.upload
)
128 elif handler_type
== appinfo
.STATIC_DIR
:
129 patterns
.append('(%s%s%s)' % (url_map
.static_dir
.rstrip(os
.path
.sep
),
130 re
.escape(os
.path
.sep
), r
'.*'))
131 return r
'^%s$' % '|'.join(patterns
)
134 class InteractiveCommandError(errors
.Error
):
138 class _ScriptHandler(url_handler
.UserConfiguredURLHandler
):
139 """A URL handler that will cause the request to be dispatched to an instance.
141 This handler is special in that it does not have a working handle() method
142 since the Module's dispatch logic is used to select the appropriate Instance.
145 def __init__(self
, url_map
):
146 """Initializer for _ScriptHandler.
149 url_map: An appinfo.URLMap instance containing the configuration for this
153 url_pattern
= re
.compile('%s$' % url_map
.url
)
155 raise errors
.InvalidAppConfigError(
156 'invalid url %r in script handler: %s' % (url_map
.url
, e
))
158 super(_ScriptHandler
, self
).__init
__(url_map
, url_pattern
)
159 self
.url_map
= url_map
161 def handle(self
, match
, environ
, start_response
):
162 """This is a dummy method that should never be called."""
163 raise NotImplementedError()
166 class Module(object):
167 """The abstract base for all instance pool implementations."""
169 _RUNTIME_INSTANCE_FACTORIES
= {
170 'go': go_runtime
.GoRuntimeInstanceFactory
,
171 'php': php_runtime
.PHPRuntimeInstanceFactory
,
172 'python': python_runtime
.PythonRuntimeInstanceFactory
,
173 'python27': python_runtime
.PythonRuntimeInstanceFactory
,
174 # TODO: uncomment for GA.
175 # 'vm': vm_runtime_factory.VMRuntimeInstanceFactory,
178 _RUNTIME_INSTANCE_FACTORIES
.update({
179 'java': java_runtime
.JavaRuntimeInstanceFactory
,
180 'java7': java_runtime
.JavaRuntimeInstanceFactory
,
183 _MAX_REQUEST_WAIT_TIME
= 10
185 def _get_wait_time(self
):
186 """Gets the wait time before timing out a request.
189 The timeout value in seconds.
191 if self
.vm_enabled():
192 return self
._MAX
_REQUEST
_WAIT
_TIME
* _VMENGINE_SLOWDOWN_FACTOR
193 return self
._MAX
_REQUEST
_WAIT
_TIME
195 def _create_instance_factory(self
,
196 module_configuration
):
197 """Create an instance.InstanceFactory.
200 module_configuration: An application_configuration.ModuleConfiguration
201 instance storing the configuration data for a module.
204 A instance.InstanceFactory subclass that can be used to create instances
205 with the provided configuration.
208 RuntimeError: if the configuration specifies an unknown runtime.
210 # TODO: a bad runtime should be caught before we get here.
211 if module_configuration
.runtime
not in self
._RUNTIME
_INSTANCE
_FACTORIES
:
213 'Unknown runtime %r; supported runtimes are %s.' %
214 (module_configuration
.runtime
,
216 sorted(repr(k
) for k
in self
._RUNTIME
_INSTANCE
_FACTORIES
))))
217 instance_factory
= self
._RUNTIME
_INSTANCE
_FACTORIES
[
218 module_configuration
.runtime
]
219 return instance_factory(
220 request_data
=self
._request
_data
,
221 runtime_config_getter
=self
._get
_runtime
_config
,
222 module_configuration
=module_configuration
)
224 def _create_url_handlers(self
):
225 """Constructs URLHandlers based on the module configuration.
228 A list of url_handler.URLHandlers corresponding that can react as
229 described in the given configuration.
232 # Add special URL handlers (taking precedence over user-defined handlers)
233 url_pattern
= '/%s$' % login
.LOGIN_URL_RELATIVE
234 handlers
.append(wsgi_handler
.WSGIHandler(login
.application
,
236 url_pattern
= '/%s' % blob_upload
.UPLOAD_URL_PATH
237 # The blobstore upload handler forwards successful requests back to self
239 wsgi_handler
.WSGIHandler(blob_upload
.Application(self
), url_pattern
))
241 url_pattern
= '/%s' % blob_image
.BLOBIMAGE_URL_PATTERN
243 wsgi_handler
.WSGIHandler(blob_image
.Application(), url_pattern
))
245 url_pattern
= '/%s' % channel
.CHANNEL_URL_PATTERN
247 wsgi_handler
.WSGIHandler(channel
.application
, url_pattern
))
249 url_pattern
= '/%s' % gcs_server
.GCS_URL_PATTERN
251 wsgi_handler
.WSGIHandler(gcs_server
.Application(), url_pattern
))
253 url_pattern
= '/%s' % endpoints
.API_SERVING_PATTERN
255 wsgi_handler
.WSGIHandler(
256 endpoints
.EndpointsDispatcher(self
._dispatcher
), url_pattern
))
258 found_start_handler
= False
259 found_warmup_handler
= False
260 # Add user-defined URL handlers
261 for url_map
in self
._module
_configuration
.handlers
:
262 handler_type
= url_map
.GetHandlerType()
263 if handler_type
== appinfo
.HANDLER_SCRIPT
:
264 handlers
.append(_ScriptHandler(url_map
))
265 if not found_start_handler
and re
.match('%s$' % url_map
.url
,
267 found_start_handler
= True
268 if not found_warmup_handler
and re
.match('%s$' % url_map
.url
,
270 found_warmup_handler
= True
271 elif handler_type
== appinfo
.STATIC_FILES
:
273 static_files_handler
.StaticFilesHandler(
274 self
._module
_configuration
.application_root
,
276 elif handler_type
== appinfo
.STATIC_DIR
:
278 static_files_handler
.StaticDirHandler(
279 self
._module
_configuration
.application_root
,
282 assert 0, 'unexpected handler %r for %r' % (handler_type
, url_map
)
283 # Add a handler for /_ah/start if no script handler matches.
284 if not found_start_handler
:
285 handlers
.insert(0, _ScriptHandler(self
._instance
_factory
.START_URL_MAP
))
286 # Add a handler for /_ah/warmup if no script handler matches and warmup is
288 if (not found_warmup_handler
and
289 'warmup' in (self
._module
_configuration
.inbound_services
or [])):
290 handlers
.insert(0, _ScriptHandler(self
._instance
_factory
.WARMUP_URL_MAP
))
293 def _get_runtime_config(self
):
294 """Returns the configuration for the runtime.
297 A runtime_config_pb2.Config instance representing the configuration to be
298 passed to an instance. NOTE: This does *not* include the instance_id
299 field, which must be populated elsewhere.
301 runtime_config
= runtime_config_pb2
.Config()
302 runtime_config
.app_id
= self
._module
_configuration
.application
303 runtime_config
.version_id
= self
._module
_configuration
.version_id
304 if self
._threadsafe
_override
is None:
305 runtime_config
.threadsafe
= self
._module
_configuration
.threadsafe
or False
307 runtime_config
.threadsafe
= self
._threadsafe
_override
308 runtime_config
.application_root
= (
309 self
._module
_configuration
.application_root
)
310 if not self
._allow
_skipped
_files
:
311 runtime_config
.skip_files
= str(self
._module
_configuration
.skip_files
)
312 runtime_config
.static_files
= _static_files_regex_from_handlers(
313 self
._module
_configuration
.handlers
)
314 runtime_config
.api_host
= self
._api
_host
315 runtime_config
.api_port
= self
._api
_port
316 runtime_config
.server_port
= self
._balanced
_port
317 runtime_config
.stderr_log_level
= self
._runtime
_stderr
_loglevel
318 runtime_config
.datacenter
= 'us1'
319 runtime_config
.auth_domain
= self
._auth
_domain
320 if self
._max
_instances
is not None:
321 runtime_config
.max_instances
= self
._max
_instances
323 for library
in self
._module
_configuration
.normalized_libraries
:
324 runtime_config
.libraries
.add(name
=library
.name
, version
=library
.version
)
326 for key
, value
in (self
._module
_configuration
.env_variables
or {}).items():
327 runtime_config
.environ
.add(key
=str(key
), value
=str(value
))
329 if self
._cloud
_sql
_config
:
330 runtime_config
.cloud_sql_config
.CopyFrom(self
._cloud
_sql
_config
)
332 if self
._php
_config
and self
._module
_configuration
.runtime
== 'php':
333 runtime_config
.php_config
.CopyFrom(self
._php
_config
)
334 if (self
._python
_config
and
335 self
._module
_configuration
.runtime
.startswith('python')):
336 runtime_config
.python_config
.CopyFrom(self
._python
_config
)
337 if (self
._java
_config
and
338 self
._module
_configuration
.runtime
.startswith('java')):
339 runtime_config
.java_config
.CopyFrom(self
._java
_config
)
342 runtime_config
.vm_config
.CopyFrom(self
._vm
_config
)
344 return runtime_config
346 def _maybe_restart_instances(self
, config_changed
, file_changed
):
347 """Restarts instances. May avoid some restarts depending on policy.
349 One of config_changed or file_changed must be True.
352 config_changed: True if the configuration for the application has changed.
353 file_changed: True if any file relevant to the application has changed.
355 if not config_changed
and not file_changed
:
358 logging
.debug('Restarting instances.')
359 policy
= self
._instance
_factory
.FILE_CHANGE_INSTANCE_RESTART_POLICY
360 assert policy
is not None, 'FILE_CHANGE_INSTANCE_RESTART_POLICY not set'
362 with self
._condition
:
363 instances_to_quit
= set()
364 for inst
in self
._instances
:
365 if (config_changed
or
366 (policy
== instance
.ALWAYS
) or
367 (policy
== instance
.AFTER_FIRST_REQUEST
and inst
.total_requests
)):
368 instances_to_quit
.add(inst
)
369 self
._instances
-= instances_to_quit
371 for inst
in instances_to_quit
:
372 inst
.quit(allow_async
=True)
374 def _handle_changes(self
):
375 """Handle file or configuration changes."""
376 # Always check for config and file changes because checking also clears
378 config_changes
= self
._module
_configuration
.check_for_updates()
379 file_changes
= self
._watcher
.changes()
380 if application_configuration
.HANDLERS_CHANGED
in config_changes
:
381 handlers
= self
._create
_url
_handlers
()
382 with self
._handler
_lock
:
383 self
._handlers
= handlers
387 'Detected file changes:\n %s', '\n '.join(sorted(file_changes
)))
388 self
._instance
_factory
.files_changed()
390 if config_changes
& _RESTART_INSTANCES_CONFIG_CHANGES
:
391 self
._instance
_factory
.configuration_changed(config_changes
)
393 self
._maybe
_restart
_instances
(
394 config_changed
=bool(config_changes
& _RESTART_INSTANCES_CONFIG_CHANGES
),
395 file_changed
=bool(file_changes
))
398 module_configuration
,
404 runtime_stderr_loglevel
,
410 default_version_port
,
415 use_mtime_file_watcher
,
418 threadsafe_override
):
419 """Initializer for Module.
421 module_configuration: An application_configuration.ModuleConfiguration
422 instance storing the configuration data for a module.
423 host: A string containing the host that any HTTP servers should bind to
425 balanced_port: An int specifying the port where the balanced module for
426 the pool should listen.
427 api_host: The host that APIModule listens for RPC requests on.
428 api_port: The port that APIModule listens for RPC requests on.
429 auth_domain: A string containing the auth domain to set in the environment
431 runtime_stderr_loglevel: An int reprenting the minimum logging level at
432 which runtime log messages should be written to stderr. See
433 devappserver2.py for possible values.
434 php_config: A runtime_config_pb2.PhpConfig instances containing PHP
435 runtime-specific configuration. If None then defaults are used.
436 python_config: A runtime_config_pb2.PythonConfig instance containing
437 Python runtime-specific configuration. If None then defaults are used.
438 java_config: A runtime_config_pb2.JavaConfig instance containing
439 Java runtime-specific configuration. If None then defaults are used.
440 cloud_sql_config: A runtime_config_pb2.CloudSQL instance containing the
441 required configuration for local Google Cloud SQL development. If None
442 then Cloud SQL will not be available.
443 vm_config: A runtime_config_pb2.VMConfig instance containing
444 VM runtime-specific configuration. If None all docker-related stuff
446 default_version_port: An int containing the port of the default version.
447 port_registry: A dispatcher.PortRegistry used to provide the Dispatcher
448 with a mapping of port to Module and Instance.
449 request_data: A wsgi_request_info.WSGIRequestInfo that will be provided
450 with request information for use by API stubs.
451 dispatcher: A Dispatcher instance that can be used to make HTTP requests.
452 max_instances: The maximum number of instances to create for this module.
453 If None then there is no limit on the number of created instances.
454 use_mtime_file_watcher: A bool containing whether to use mtime polling to
455 monitor file changes even if other options are available on the
457 automatic_restarts: If True then instances will be restarted when a
458 file or configuration change that effects them is detected.
459 allow_skipped_files: If True then all files in the application's directory
460 are readable, even if they appear in a static handler or "skip_files"
462 threadsafe_override: If not None, ignore the YAML file value of threadsafe
463 and use this value instead.
465 self
._module
_configuration
= module_configuration
466 self
._name
= module_configuration
.module_name
467 self
._version
= module_configuration
.major_version
468 self
._app
_name
_external
= module_configuration
.application_external_name
470 self
._api
_host
= api_host
471 self
._api
_port
= api_port
472 self
._auth
_domain
= auth_domain
473 self
._runtime
_stderr
_loglevel
= runtime_stderr_loglevel
474 self
._balanced
_port
= balanced_port
475 self
._php
_config
= php_config
476 self
._python
_config
= python_config
477 self
._java
_config
= java_config
478 self
._cloud
_sql
_config
= cloud_sql_config
479 self
._vm
_config
= vm_config
480 self
._request
_data
= request_data
481 self
._allow
_skipped
_files
= allow_skipped_files
482 self
._threadsafe
_override
= threadsafe_override
483 self
._dispatcher
= dispatcher
484 self
._max
_instances
= max_instances
485 self
._automatic
_restarts
= automatic_restarts
486 self
._use
_mtime
_file
_watcher
= use_mtime_file_watcher
487 self
._default
_version
_port
= default_version_port
488 self
._port
_registry
= port_registry
490 if self
.vm_enabled():
491 self
._RUNTIME
_INSTANCE
_FACTORIES
['vm'] = (
492 vm_runtime_factory
.VMRuntimeInstanceFactory
)
494 self
._instance
_factory
= self
._create
_instance
_factory
(
495 self
._module
_configuration
)
496 if self
._automatic
_restarts
:
497 self
._watcher
= file_watcher
.get_file_watcher(
498 [self
._module
_configuration
.application_root
] +
499 self
._instance
_factory
.get_restart_directories(),
500 self
._use
_mtime
_file
_watcher
)
503 self
._handler
_lock
= threading
.Lock()
504 self
._handlers
= self
._create
_url
_handlers
()
505 self
._balanced
_module
= wsgi_server
.WsgiServer(
506 (self
._host
, self
._balanced
_port
), self
)
507 self
._quit
_event
= threading
.Event() # Set when quit() has been called.
509 def vm_enabled(self
):
510 # TODO: change when GA
511 return self
._vm
_config
515 """The name of the module, as defined in app.yaml.
517 This value will be constant for the lifetime of the module even in the
518 module configuration changes.
524 """The version of the module, as defined in app.yaml.
526 This value will be constant for the lifetime of the module even in the
527 module configuration changes.
532 def app_name_external(self
):
533 """The external application name of the module, as defined in app.yaml.
535 This value will be constant for the lifetime of the module even in the
536 module configuration changes.
538 return self
._app
_name
_external
542 """The module is ready to handle HTTP requests."""
543 return self
._balanced
_module
.ready
546 def balanced_port(self
):
547 """The port that the balanced HTTP server for the Module is listening on."""
548 assert self
._balanced
_module
.ready
, 'balanced module not running'
549 return self
._balanced
_module
.port
553 """The host that the HTTP server(s) for this Module is listening on."""
557 def balanced_address(self
):
558 """The address of the balanced HTTP server e.g. "localhost:8080"."""
559 if self
.balanced_port
!= 80:
560 return '%s:%s' % (self
.host
, self
.balanced_port
)
565 def max_instance_concurrent_requests(self
):
566 """The number of concurrent requests that each Instance can handle."""
567 return self
._instance
_factory
.max_concurrent_requests
570 def module_configuration(self
):
571 """The application_configuration.ModuleConfiguration for this module."""
572 return self
._module
_configuration
576 """Runtime property for this module."""
577 return self
._module
_configuration
.runtime
580 def effective_runtime(self
):
581 """Effective_runtime property for this module."""
582 return self
._module
_configuration
.effective_runtime
585 def supports_interactive_commands(self
):
586 """True if the module can evaluate arbitrary code and return the result."""
587 return self
._instance
_factory
.SUPPORTS_INTERACTIVE_REQUESTS
589 def _handle_script_request(self
,
595 """Handles a HTTP request that has matched a script handler.
598 environ: An environ dict for the request as defined in PEP-333.
599 start_response: A function with semantics defined in PEP-333.
600 url_map: An appinfo.URLMap instance containing the configuration for the
601 handler that matched.
602 match: A re.MatchObject containing the result of the matched URL pattern.
603 inst: The Instance to send the request to. If None then an appropriate
604 Instance will be chosen.
607 An iterable over strings containing the body of the HTTP response.
609 raise NotImplementedError()
611 def _no_handler_for_request(self
, environ
, start_response
, request_id
):
612 """Handle a HTTP request that does not match any user-defined handlers."""
613 self
._insert
_log
_message
('No handlers matched this URL.', 2, request_id
)
614 start_response('404 Not Found', [('Content-Type', 'text/plain')])
615 return ['The url "%s" does not match any handlers.' % environ
['PATH_INFO']]
617 def _error_response(self
, environ
, start_response
, status
, body
=None):
620 '%d %s' % (status
, httplib
.responses
[status
]),
621 [('Content-Type', 'text/html'),
622 ('Content-Length', str(len(body
)))])
624 start_response('%d %s' % (status
, httplib
.responses
[status
]), [])
627 def _handle_request(self
, environ
, start_response
, inst
=None,
628 request_type
=instance
.NORMAL_REQUEST
):
629 """Handles a HTTP request.
632 environ: An environ dict for the request as defined in PEP-333.
633 start_response: A function with semantics defined in PEP-333.
634 inst: The Instance to send the request to. If None then an appropriate
635 Instance will be chosen. Setting inst is not meaningful if the
636 request does not match a "script" handler.
637 request_type: The type of the request. See instance.*_REQUEST module
641 An iterable over strings containing the body of the HTTP response.
645 environ
['SERVER_PORT'] = str(self
.get_instance_port(inst
.instance_id
))
646 except request_info
.NotSupportedWithAutoScalingError
:
647 environ
['SERVER_PORT'] = str(self
.balanced_port
)
649 environ
['SERVER_PORT'] = str(self
.balanced_port
)
650 if 'HTTP_HOST' in environ
:
651 environ
['SERVER_NAME'] = environ
['HTTP_HOST'].split(':', 1)[0]
652 environ
['DEFAULT_VERSION_HOSTNAME'] = '%s:%s' % (
653 environ
['SERVER_NAME'], self
._default
_version
_port
)
654 with self
._request
_data
.request(
656 self
._module
_configuration
) as request_id
:
657 should_log_request
= not _REQUEST_LOGGING_BLACKLIST_RE
.match(
658 environ
['PATH_INFO'])
659 environ
['REQUEST_ID_HASH'] = self
.generate_request_id_hash()
660 if should_log_request
:
661 environ
['REQUEST_LOG_ID'] = self
.generate_request_log_id()
662 if 'HTTP_HOST' in environ
:
663 hostname
= environ
['HTTP_HOST']
664 elif environ
['SERVER_PORT'] == '80':
665 hostname
= environ
['SERVER_NAME']
667 hostname
= '%s:%s' % (environ
['SERVER_NAME'], environ
['SERVER_PORT'])
669 if environ
.get('QUERY_STRING'):
670 resource
= '%s?%s' % (urllib
.quote(environ
['PATH_INFO']),
671 environ
['QUERY_STRING'])
673 resource
= urllib
.quote(environ
['PATH_INFO'])
674 email
, _
, _
= login
.get_user_info(environ
.get('HTTP_COOKIE', ''))
675 method
= environ
.get('REQUEST_METHOD', 'GET')
676 http_version
= environ
.get('SERVER_PROTOCOL', 'HTTP/1.0')
678 logservice
= apiproxy_stub_map
.apiproxy
.GetStub('logservice')
679 logservice
.start_request(
680 request_id
=request_id
,
681 user_request_id
=environ
['REQUEST_LOG_ID'],
682 ip
=environ
.get('REMOTE_ADDR', ''),
683 app_id
=self
._module
_configuration
.application
,
684 version_id
=self
._module
_configuration
.major_version
,
685 nickname
=email
.split('@', 1)[0],
686 user_agent
=environ
.get('HTTP_USER_AGENT', ''),
690 http_version
=http_version
,
691 module
=self
._module
_configuration
.module_name
)
693 def wrapped_start_response(status
, response_headers
, exc_info
=None):
694 response_headers
.append(('Server',
695 http_runtime_constants
.SERVER_SOFTWARE
))
696 if should_log_request
:
697 headers
= wsgiref
.headers
.Headers(response_headers
)
698 status_code
= int(status
.split(' ', 1)[0])
699 content_length
= int(headers
.get('Content-Length', 0))
700 logservice
.end_request(request_id
, status_code
, content_length
)
701 logging
.info('%(module_name)s: '
702 '"%(method)s %(resource)s %(http_version)s" '
703 '%(status)d %(content_length)s',
704 {'module_name': self
.name
,
706 'resource': resource
,
707 'http_version': http_version
,
708 'status': status_code
,
709 'content_length': content_length
or '-'})
710 return start_response(status
, response_headers
, exc_info
)
712 content_length
= int(environ
.get('CONTENT_LENGTH', '0'))
714 if (environ
['REQUEST_METHOD'] in ('GET', 'HEAD', 'DELETE', 'TRACE') and
715 content_length
!= 0):
716 # CONTENT_LENGTH may be empty or absent.
717 wrapped_start_response('400 Bad Request', [])
718 return ['"%s" requests may not contain bodies.' %
719 environ
['REQUEST_METHOD']]
721 # Do not apply request limits to internal _ah handlers (known to break
723 # TODO: research if _ah handlers need limits.
724 if (not environ
.get('REQUEST_URI', '/').startswith('/_ah/') and
725 content_length
> _MAX_UPLOAD_BYTES
):
726 # As allowed by the RFC, cherrypy closes the connection for 413 errors.
727 # Most clients do not handle this correctly and treat the page as
728 # unavailable if the connection is closed before the client can send
729 # all the data. To match the behavior of production, for large files
730 # < 64M read the data to prevent the client bug from being triggered.
738 if content_length
<= _MAX_UPLOAD_NO_TRIGGER_BAD_CLIENT_BYTES
:
739 environ
['wsgi.input'].read(content_length
)
740 status
= '%d %s' % (httplib
.REQUEST_ENTITY_TOO_LARGE
,
741 httplib
.responses
[httplib
.REQUEST_ENTITY_TOO_LARGE
])
742 wrapped_start_response(status
, [])
743 return ['Upload limited to %d megabytes.' % _MAX_UPLOAD_MEGABYTES
]
745 with self
._handler
_lock
:
746 handlers
= self
._handlers
749 path_info
= environ
['PATH_INFO']
750 path_info_normal
= self
._normpath
(path_info
)
751 if path_info_normal
!= path_info
:
752 # While a 301 Moved Permanently makes more sense for non-normal
753 # paths, prod issues a 302 so we do the same.
754 return self
._redirect
_302_path
_info
(path_info_normal
,
756 wrapped_start_response
)
757 if request_type
in (instance
.BACKGROUND_REQUEST
,
758 instance
.INTERACTIVE_REQUEST
,
759 instance
.SHUTDOWN_REQUEST
):
760 app
= functools
.partial(self
._handle
_script
_request
,
761 url_map
=_DUMMY_URLMAP
,
763 request_id
=request_id
,
765 request_type
=request_type
)
766 return request_rewriter
.frontend_rewriter_middleware(app
)(
767 environ
, wrapped_start_response
)
768 for handler
in handlers
:
769 match
= handler
.match(path_info
)
771 auth_failure
= handler
.handle_authorization(environ
,
772 wrapped_start_response
)
773 if auth_failure
is not None:
776 if isinstance(handler
, _ScriptHandler
):
777 app
= functools
.partial(self
._handle
_script
_request
,
778 url_map
=handler
.url_map
,
780 request_id
=request_id
,
782 request_type
=request_type
)
783 return request_rewriter
.frontend_rewriter_middleware(app
)(
784 environ
, wrapped_start_response
)
786 return handler
.handle(match
, environ
, wrapped_start_response
)
787 return self
._no
_handler
_for
_request
(environ
, wrapped_start_response
,
789 except StandardError, e
:
790 logging
.exception('Request to %r failed', path_info
)
791 wrapped_start_response('500 Internal Server Error', [], e
)
794 def _async_shutdown_instance(self
, inst
, port
):
795 return _THREAD_POOL
.submit(self
._shutdown
_instance
, inst
, port
)
797 def _shutdown_instance(self
, inst
, port
):
798 force_shutdown_time
= time
.time() + _SHUTDOWN_TIMEOUT
800 environ
= self
.build_request_environ(
801 'GET', '/_ah/stop', [], '', '0.1.0.3', port
, fake_login
=True)
802 self
._handle
_request
(environ
,
803 start_response_utils
.null_start_response
,
805 request_type
=instance
.SHUTDOWN_REQUEST
)
806 logging
.debug('Sent shutdown request: %s', inst
)
808 logging
.exception('Internal error while handling shutdown request.')
810 time_to_wait
= force_shutdown_time
- time
.time()
811 self
._quit
_event
.wait(time_to_wait
)
812 inst
.quit(force
=True)
815 def _quote_querystring(qs
):
816 """Quote a query string to protect against XSS."""
818 parsed_qs
= urlparse
.parse_qs(qs
, keep_blank_values
=True)
819 # urlparse.parse returns a dictionary with values as lists while
820 # urllib.urlencode does not handle those. Expand to a list of
823 for key
, multivalue
in parsed_qs
.items():
824 for value
in multivalue
:
825 expanded_qs
.append((key
, value
))
826 return urllib
.urlencode(expanded_qs
)
828 def _redirect_302_path_info(self
, updated_path_info
, environ
, start_response
):
829 """Redirect to an updated path.
831 Respond to the current request with a 302 Found status with an updated path
832 but preserving the rest of the request.
835 - WSGI does not make the fragment available so we are not able to preserve
836 it. Luckily prod does not preserve the fragment so it works out.
839 updated_path_info: the new HTTP path to redirect to.
840 environ: WSGI environ object.
841 start_response: WSGI start response callable.
844 WSGI-compatible iterable object representing the body of the response.
846 correct_url
= urlparse
.urlunsplit(
847 (environ
['wsgi.url_scheme'],
848 environ
['HTTP_HOST'],
849 urllib
.quote(updated_path_info
),
850 self
._quote
_querystring
(environ
['QUERY_STRING']),
853 content_type
= 'text/html; charset=utf-8'
854 output
= _REDIRECT_HTML
% {
855 'content-type': content_type
,
856 'status': httplib
.FOUND
,
857 'correct-url': correct_url
860 start_response('%d %s' % (httplib
.FOUND
, httplib
.responses
[httplib
.FOUND
]),
861 [('Content-Type', content_type
),
862 ('Location', correct_url
),
863 ('Content-Length', str(len(output
)))])
868 """Normalize the path by handling . and .. directory entries.
870 Normalizes the path. A directory entry of . is just dropped while a
871 directory entry of .. removes the previous entry. Note that unlike
872 os.path.normpath, redundant separators remain in place to match prod.
878 A normalized HTTP path.
880 normalized_path_entries
= []
881 for entry
in path
.split('/'):
883 if normalized_path_entries
:
884 normalized_path_entries
.pop()
886 normalized_path_entries
.append(entry
)
887 return '/'.join(normalized_path_entries
)
889 def _insert_log_message(self
, message
, level
, request_id
):
890 logs_group
= log_service_pb
.UserAppLogGroup()
891 log_line
= logs_group
.add_log_line()
892 log_line
.set_timestamp_usec(int(time
.time() * 1e6
))
893 log_line
.set_level(level
)
894 log_line
.set_message(message
)
895 request
= log_service_pb
.FlushRequest()
896 request
.set_logs(logs_group
.Encode())
897 response
= api_base_pb
.VoidProto()
898 logservice
= apiproxy_stub_map
.apiproxy
.GetStub('logservice')
899 logservice
._Dynamic
_Flush
(request
, response
, request_id
)
902 def generate_request_log_id():
903 """Generate a random REQUEST_LOG_ID.
906 A string suitable for use as a REQUEST_LOG_ID. The returned string is
907 variable length to emulate the production values, which encapsulate
908 the application id, version and some log state.
910 return ''.join(random
.choice(_LOWER_HEX_DIGITS
)
911 for _
in range(random
.randrange(30, 100)))
914 def generate_request_id_hash():
915 """Generate a random REQUEST_ID_HASH."""
916 return ''.join(random
.choice(_UPPER_HEX_DIGITS
)
917 for _
in range(_REQUEST_ID_HASH_LENGTH
))
919 def set_num_instances(self
, instances
):
920 """Sets the number of instances for this module to run.
923 instances: An int containing the number of instances to run.
925 request_info.NotSupportedWithAutoScalingError: Always.
927 raise request_info
.NotSupportedWithAutoScalingError()
929 def get_num_instances(self
):
930 """Returns the number of instances for this module to run."""
931 raise request_info
.NotSupportedWithAutoScalingError()
934 """Stops the module from serving requests."""
935 raise request_info
.NotSupportedWithAutoScalingError()
938 """Restarts the module."""
939 raise request_info
.NotSupportedWithAutoScalingError()
941 def get_instance_address(self
, instance_id
):
942 """Returns the address of the HTTP server for an instance."""
943 return '%s:%s' % (self
.host
, self
.get_instance_port(instance_id
))
945 def get_instance_port(self
, instance_id
):
946 """Returns the port of the HTTP server for an instance."""
947 raise request_info
.NotSupportedWithAutoScalingError()
949 def get_instance(self
, instance_id
):
950 """Returns the instance with the provided instance ID."""
951 raise request_info
.NotSupportedWithAutoScalingError()
954 def supports_individually_addressable_instances(self
):
957 def create_interactive_command_module(self
):
958 """Returns a InteractiveCommandModule that can be sent user commands."""
959 if self
._instance
_factory
.SUPPORTS_INTERACTIVE_REQUESTS
:
960 return InteractiveCommandModule(self
._module
_configuration
,
966 self
._runtime
_stderr
_loglevel
,
970 self
._cloud
_sql
_config
,
972 self
._default
_version
_port
,
976 self
._use
_mtime
_file
_watcher
,
977 self
._allow
_skipped
_files
,
978 self
._threadsafe
_override
)
980 raise NotImplementedError('runtime does not support interactive commands')
982 def build_request_environ(self
, method
, relative_url
, headers
, body
,
983 source_ip
, port
, fake_login
=False):
984 if isinstance(body
, unicode):
985 body
= body
.encode('ascii')
987 url
= urlparse
.urlsplit(relative_url
)
989 host
= '%s:%s' % (self
.host
, port
)
992 environ
= {constants
.FAKE_IS_ADMIN_HEADER
: '1',
993 'CONTENT_LENGTH': str(len(body
)),
994 'PATH_INFO': url
.path
,
995 'QUERY_STRING': url
.query
,
996 'REQUEST_METHOD': method
,
997 'REMOTE_ADDR': source_ip
,
998 'SERVER_NAME': self
.host
,
999 'SERVER_PORT': str(port
),
1000 'SERVER_PROTOCOL': 'HTTP/1.1',
1001 'wsgi.version': (1, 0),
1002 'wsgi.url_scheme': 'http',
1003 'wsgi.errors': cStringIO
.StringIO(),
1004 'wsgi.multithread': True,
1005 'wsgi.multiprocess': True,
1006 'wsgi.input': cStringIO
.StringIO(body
)}
1008 environ
[constants
.FAKE_LOGGED_IN_HEADER
] = '1'
1009 util
.put_headers_in_environ(headers
, environ
)
1010 environ
['HTTP_HOST'] = host
1014 class AutoScalingModule(Module
):
1015 """A pool of instances that is autoscaled based on traffic."""
1017 # The minimum number of seconds to wait, after quitting an idle instance,
1018 # before quitting another idle instance.
1019 _MIN_SECONDS_BETWEEN_QUITS
= 60
1020 # The time horizon to use when calculating the number of instances required
1021 # to serve the current level of traffic.
1022 _REQUIRED_INSTANCE_WINDOW_SECONDS
= 60
1024 _DEFAULT_AUTOMATIC_SCALING
= appinfo
.AutomaticScaling(
1025 min_pending_latency
='0.1s',
1026 max_pending_latency
='0.5s',
1027 min_idle_instances
=1,
1028 max_idle_instances
=1000)
1031 def _parse_pending_latency(timing
):
1032 """Parse a pending latency string into a float of the value in seconds.
1035 timing: A str of the form 1.0s or 1000ms.
1038 A float representation of the value in seconds.
1040 if timing
.endswith('ms'):
1041 return float(timing
[:-2]) / 1000
1043 return float(timing
[:-1])
1046 def _populate_default_automatic_scaling(cls
, automatic_scaling
):
1047 for attribute
in automatic_scaling
.ATTRIBUTES
:
1048 if getattr(automatic_scaling
, attribute
) in ('automatic', None):
1049 setattr(automatic_scaling
, attribute
,
1050 getattr(cls
._DEFAULT
_AUTOMATIC
_SCALING
, attribute
))
1052 def _process_automatic_scaling(self
, automatic_scaling
):
1053 if automatic_scaling
:
1054 self
._populate
_default
_automatic
_scaling
(automatic_scaling
)
1056 automatic_scaling
= self
._DEFAULT
_AUTOMATIC
_SCALING
1057 self
._min
_pending
_latency
= self
._parse
_pending
_latency
(
1058 automatic_scaling
.min_pending_latency
)
1059 self
._max
_pending
_latency
= self
._parse
_pending
_latency
(
1060 automatic_scaling
.max_pending_latency
)
1061 self
._min
_idle
_instances
= int(automatic_scaling
.min_idle_instances
)
1062 self
._max
_idle
_instances
= int(automatic_scaling
.max_idle_instances
)
1065 module_configuration
,
1071 runtime_stderr_loglevel
,
1077 default_version_port
,
1082 use_mtime_file_watcher
,
1084 allow_skipped_files
,
1085 threadsafe_override
):
1086 """Initializer for AutoScalingModule.
1089 module_configuration: An application_configuration.ModuleConfiguration
1090 instance storing the configuration data for a module.
1091 host: A string containing the host that any HTTP servers should bind to
1093 balanced_port: An int specifying the port where the balanced module for
1094 the pool should listen.
1095 api_host: The host that APIServer listens for RPC requests on.
1096 api_port: The port that APIServer listens for RPC requests on.
1097 auth_domain: A string containing the auth domain to set in the environment
1099 runtime_stderr_loglevel: An int reprenting the minimum logging level at
1100 which runtime log messages should be written to stderr. See
1101 devappserver2.py for possible values.
1102 php_config: A runtime_config_pb2.PhpConfig instances containing PHP
1103 runtime-specific configuration. If None then defaults are used.
1104 python_config: A runtime_config_pb2.PythonConfig instance containing
1105 Python runtime-specific configuration. If None then defaults are used.
1106 java_config: A runtime_config_pb2.JavaConfig instance containing
1107 Java runtime-specific configuration. If None then defaults are used.
1108 cloud_sql_config: A runtime_config_pb2.CloudSQL instance containing the
1109 required configuration for local Google Cloud SQL development. If None
1110 then Cloud SQL will not be available.
1111 unused_vm_config: A runtime_config_pb2.VMConfig instance containing
1112 VM runtime-specific configuration. Ignored by AutoScalingModule as
1113 autoscaling is not yet supported by VM runtimes.
1114 default_version_port: An int containing the port of the default version.
1115 port_registry: A dispatcher.PortRegistry used to provide the Dispatcher
1116 with a mapping of port to Module and Instance.
1117 request_data: A wsgi_request_info.WSGIRequestInfo that will be provided
1118 with request information for use by API stubs.
1119 dispatcher: A Dispatcher instance that can be used to make HTTP requests.
1120 max_instances: The maximum number of instances to create for this module.
1121 If None then there is no limit on the number of created instances.
1122 use_mtime_file_watcher: A bool containing whether to use mtime polling to
1123 monitor file changes even if other options are available on the
1125 automatic_restarts: If True then instances will be restarted when a
1126 file or configuration change that effects them is detected.
1127 allow_skipped_files: If True then all files in the application's directory
1128 are readable, even if they appear in a static handler or "skip_files"
1130 threadsafe_override: If not None, ignore the YAML file value of threadsafe
1131 and use this value instead.
1133 super(AutoScalingModule
, self
).__init
__(module_configuration
,
1139 runtime_stderr_loglevel
,
1144 # VM runtimes does not support
1147 default_version_port
,
1152 use_mtime_file_watcher
,
1154 allow_skipped_files
,
1155 threadsafe_override
)
1157 self
._process
_automatic
_scaling
(
1158 self
._module
_configuration
.automatic_scaling
)
1160 self
._instances
= set() # Protected by self._condition.
1161 # A deque containg (time, num_outstanding_instance_requests) 2-tuples.
1162 # This is used to track the maximum number of outstanding requests in a time
1163 # period. Protected by self._condition.
1164 self
._outstanding
_request
_history
= collections
.deque()
1165 self
._num
_outstanding
_instance
_requests
= 0 # Protected by self._condition.
1166 # The time when the last instance was quit in seconds since the epoch.
1167 self
._last
_instance
_quit
_time
= 0 # Protected by self._condition.
1169 self
._condition
= threading
.Condition() # Protects instance state.
1170 self
._instance
_adjustment
_thread
= threading
.Thread(
1171 target
=self
._loop
_adjusting
_instances
)
1174 """Start background management of the Module."""
1175 self
._balanced
_module
.start()
1176 self
._port
_registry
.add(self
.balanced_port
, self
, None)
1178 self
._watcher
.start()
1179 self
._instance
_adjustment
_thread
.start()
1182 """Stops the Module."""
1183 self
._quit
_event
.set()
1184 self
._instance
_adjustment
_thread
.join()
1185 # The instance adjustment thread depends on the balanced module and the
1186 # watcher so wait for it exit before quitting them.
1188 self
._watcher
.quit()
1189 self
._balanced
_module
.quit()
1190 with self
._condition
:
1191 instances
= self
._instances
1192 self
._instances
= set()
1193 self
._condition
.notify_all()
1194 for inst
in instances
:
1195 inst
.quit(force
=True)
1198 def instances(self
):
1199 """A set of all the instances currently in the Module."""
1200 with self
._condition
:
1201 return set(self
._instances
)
1204 def num_outstanding_instance_requests(self
):
1205 """The number of requests that instances are currently handling."""
1206 with self
._condition
:
1207 return self
._num
_outstanding
_instance
_requests
1209 def _handle_instance_request(self
,
1217 """Handles a request routed a particular Instance.
1220 environ: An environ dict for the request as defined in PEP-333.
1221 start_response: A function with semantics defined in PEP-333.
1222 url_map: An appinfo.URLMap instance containing the configuration for the
1223 handler that matched.
1224 match: A re.MatchObject containing the result of the matched URL pattern.
1225 request_id: A unique string id associated with the request.
1226 inst: The instance.Instance to send the request to.
1227 request_type: The type of the request. See instance.*_REQUEST module
1231 An iterable over strings containing the body of the HTTP response.
1233 if request_type
!= instance
.READY_REQUEST
:
1234 with self
._condition
:
1235 self
._num
_outstanding
_instance
_requests
+= 1
1236 self
._outstanding
_request
_history
.append(
1237 (time
.time(), self
.num_outstanding_instance_requests
))
1239 logging
.debug('Dispatching request to %s', inst
)
1240 return inst
.handle(environ
, start_response
, url_map
, match
, request_id
,
1243 with self
._condition
:
1244 if request_type
!= instance
.READY_REQUEST
:
1245 self
._num
_outstanding
_instance
_requests
-= 1
1246 self
._condition
.notify()
1248 def _handle_script_request(self
,
1255 request_type
=instance
.NORMAL_REQUEST
):
1256 """Handles a HTTP request that has matched a script handler.
1259 environ: An environ dict for the request as defined in PEP-333.
1260 start_response: A function with semantics defined in PEP-333.
1261 url_map: An appinfo.URLMap instance containing the configuration for the
1262 handler that matched.
1263 match: A re.MatchObject containing the result of the matched URL pattern.
1264 request_id: A unique string id associated with the request.
1265 inst: The instance.Instance to send the request to. If None then an
1266 appropriate instance.Instance will be chosen.
1267 request_type: The type of the request. See instance.*_REQUEST module
1271 An iterable over strings containing the body of the HTTP response.
1273 if inst
is not None:
1274 return self
._handle
_instance
_request
(
1275 environ
, start_response
, url_map
, match
, request_id
, inst
,
1278 with self
._condition
:
1279 self
._num
_outstanding
_instance
_requests
+= 1
1280 self
._outstanding
_request
_history
.append(
1281 (time
.time(), self
.num_outstanding_instance_requests
))
1284 start_time
= time
.time()
1285 timeout_time
= start_time
+ self
._min
_pending
_latency
1286 # Loop until an instance is available to handle the request.
1288 if self
._quit
_event
.is_set():
1289 return self
._error
_response
(environ
, start_response
, 404)
1290 inst
= self
._choose
_instance
(timeout_time
)
1292 inst
= self
._add
_instance
(permit_warmup
=False)
1294 # No instance is available nor can a new one be created, so loop
1295 # waiting for one to be free.
1296 timeout_time
= time
.time() + 0.2
1300 logging
.debug('Dispatching request to %s after %0.4fs pending',
1301 inst
, time
.time() - start_time
)
1302 return inst
.handle(environ
,
1308 except instance
.CannotAcceptRequests
:
1311 with self
._condition
:
1312 self
._num
_outstanding
_instance
_requests
-= 1
1313 self
._condition
.notify()
1315 def _add_instance(self
, permit_warmup
):
1316 """Creates and adds a new instance.Instance to the Module.
1319 permit_warmup: If True then the new instance.Instance will be sent a new
1320 warmup request if it is configured to receive them.
1323 The newly created instance.Instance. Returns None if no new instance
1324 could be created because the maximum number of instances have already
1327 if self
._max
_instances
is not None:
1328 with self
._condition
:
1329 if len(self
._instances
) >= self
._max
_instances
:
1332 perform_warmup
= permit_warmup
and (
1333 'warmup' in (self
._module
_configuration
.inbound_services
or []))
1335 inst
= self
._instance
_factory
.new_instance(
1336 self
.generate_instance_id(),
1337 expect_ready_request
=perform_warmup
)
1339 with self
._condition
:
1340 if self
._quit
_event
.is_set():
1342 self
._instances
.add(inst
)
1344 if not inst
.start():
1348 self
._async
_warmup
(inst
)
1350 with self
._condition
:
1351 self
._condition
.notify(self
.max_instance_concurrent_requests
)
1352 logging
.debug('Created instance: %s', inst
)
1356 def generate_instance_id():
1357 return ''.join(random
.choice(_LOWER_HEX_DIGITS
) for _
in range(36))
1359 def _warmup(self
, inst
):
1360 """Send a warmup request to the given instance."""
1363 environ
= self
.build_request_environ(
1364 'GET', '/_ah/warmup', [], '', '0.1.0.3', self
.balanced_port
,
1366 self
._handle
_request
(environ
,
1367 start_response_utils
.null_start_response
,
1369 request_type
=instance
.READY_REQUEST
)
1370 with self
._condition
:
1371 self
._condition
.notify(self
.max_instance_concurrent_requests
)
1373 logging
.exception('Internal error while handling warmup request.')
1375 def _async_warmup(self
, inst
):
1376 """Asynchronously send a markup request to the given Instance."""
1377 return _THREAD_POOL
.submit(self
._warmup
, inst
)
1379 def _trim_outstanding_request_history(self
):
1380 """Removes obsolete entries from _outstanding_request_history."""
1381 window_start
= time
.time() - self
._REQUIRED
_INSTANCE
_WINDOW
_SECONDS
1382 with self
._condition
:
1383 while self
._outstanding
_request
_history
:
1384 t
, _
= self
._outstanding
_request
_history
[0]
1385 if t
< window_start
:
1386 self
._outstanding
_request
_history
.popleft()
1390 def _get_num_required_instances(self
):
1391 """Returns the number of Instances required to handle the request load."""
1392 with self
._condition
:
1393 self
._trim
_outstanding
_request
_history
()
1394 if not self
._outstanding
_request
_history
:
1397 peak_concurrent_requests
= max(
1399 for (t
, current_requests
)
1400 in self
._outstanding
_request
_history
)
1401 return int(math
.ceil(peak_concurrent_requests
/
1402 self
.max_instance_concurrent_requests
))
1404 def _split_instances(self
):
1405 """Returns a 2-tuple representing the required and extra Instances.
1408 A 2-tuple of (required_instances, not_required_instances):
1409 required_instances: The set of the instance.Instances, in a state that
1410 can handle requests, required to handle the current
1412 not_required_instances: The set of the Instances contained in this
1413 Module that not are not required.
1415 with self
._condition
:
1416 num_required_instances
= self
._get
_num
_required
_instances
()
1418 available
= [inst
for inst
in self
._instances
1419 if inst
.can_accept_requests
]
1420 available
.sort(key
=lambda inst
: -inst
.num_outstanding_requests
)
1422 required
= set(available
[:num_required_instances
])
1423 return required
, self
._instances
- required
1425 def _choose_instance(self
, timeout_time
):
1426 """Returns the best Instance to handle a request or None if all are busy."""
1427 with self
._condition
:
1428 while time
.time() < timeout_time
:
1429 required_instances
, not_required_instances
= self
._split
_instances
()
1430 if required_instances
:
1431 # Pick the instance with the most remaining capacity to handle
1433 required_instances
= sorted(
1435 key
=lambda inst
: inst
.remaining_request_capacity
)
1436 if required_instances
[-1].remaining_request_capacity
:
1437 return required_instances
[-1]
1439 available_instances
= [inst
for inst
in not_required_instances
1440 if inst
.remaining_request_capacity
> 0 and
1441 inst
.can_accept_requests
]
1442 if available_instances
:
1443 # Pick the instance with the *least* capacity to handle requests
1444 # to avoid using unnecessary idle instances.
1445 available_instances
.sort(
1446 key
=lambda instance
: instance
.num_outstanding_requests
)
1447 return available_instances
[-1]
1449 self
._condition
.wait(timeout_time
- time
.time())
1452 def _adjust_instances(self
):
1453 """Creates new Instances or deletes idle Instances based on current load."""
1455 with self
._condition
:
1456 _
, not_required_instances
= self
._split
_instances
()
1458 if len(not_required_instances
) < self
._min
_idle
_instances
:
1459 self
._add
_instance
(permit_warmup
=True)
1460 elif (len(not_required_instances
) > self
._max
_idle
_instances
and
1462 (self
._last
_instance
_quit
_time
+ self
._MIN
_SECONDS
_BETWEEN
_QUITS
)):
1463 for inst
in not_required_instances
:
1464 if not inst
.num_outstanding_requests
:
1467 except instance
.CannotQuitServingInstance
:
1470 self
._last
_instance
_quit
_time
= now
1471 logging
.debug('Quit instance: %s', inst
)
1472 with self
._condition
:
1473 self
._instances
.discard(inst
)
1476 def _loop_adjusting_instances(self
):
1477 """Loops until the Module exits, reloading, adding or removing Instances."""
1478 while not self
._quit
_event
.is_set():
1480 if self
._automatic
_restarts
:
1481 self
._handle
_changes
()
1482 self
._adjust
_instances
()
1483 self
._quit
_event
.wait(timeout
=1)
1485 def __call__(self
, environ
, start_response
):
1486 return self
._handle
_request
(environ
, start_response
)
1489 class ManualScalingModule(Module
):
1490 """A pool of instances that is manually-scaled."""
1492 _DEFAULT_MANUAL_SCALING
= appinfo
.ManualScaling(instances
='1')
1495 def _populate_default_manual_scaling(cls
, manual_scaling
):
1496 for attribute
in manual_scaling
.ATTRIBUTES
:
1497 if getattr(manual_scaling
, attribute
) in ('manual', None):
1498 setattr(manual_scaling
, attribute
,
1499 getattr(cls
._DEFAULT
_MANUAL
_SCALING
, attribute
))
1501 def _process_manual_scaling(self
, manual_scaling
):
1503 self
._populate
_default
_manual
_scaling
(manual_scaling
)
1505 manual_scaling
= self
._DEFAULT
_MANUAL
_SCALING
1506 self
._initial
_num
_instances
= int(manual_scaling
.instances
)
1509 module_configuration
,
1515 runtime_stderr_loglevel
,
1521 default_version_port
,
1526 use_mtime_file_watcher
,
1528 allow_skipped_files
,
1529 threadsafe_override
):
1530 """Initializer for ManualScalingModule.
1533 module_configuration: An application_configuration.ModuleConfiguration
1534 instance storing the configuration data for a module.
1535 host: A string containing the host that any HTTP servers should bind to
1537 balanced_port: An int specifying the port where the balanced module for
1538 the pool should listen.
1539 api_host: The host that APIServer listens for RPC requests on.
1540 api_port: The port that APIServer listens for RPC requests on.
1541 auth_domain: A string containing the auth domain to set in the environment
1543 runtime_stderr_loglevel: An int reprenting the minimum logging level at
1544 which runtime log messages should be written to stderr. See
1545 devappserver2.py for possible values.
1546 php_config: A runtime_config_pb2.PhpConfig instances containing PHP
1547 runtime-specific configuration. If None then defaults are used.
1548 python_config: A runtime_config_pb2.PythonConfig instance containing
1549 Python runtime-specific configuration. If None then defaults are used.
1550 java_config: A runtime_config_pb2.JavaConfig instance containing
1551 Java runtime-specific configuration. If None then defaults are used.
1552 cloud_sql_config: A runtime_config_pb2.CloudSQL instance containing the
1553 required configuration for local Google Cloud SQL development. If None
1554 then Cloud SQL will not be available.
1555 vm_config: A runtime_config_pb2.VMConfig instance containing
1556 VM runtime-specific configuration. If None all docker-related stuff
1558 default_version_port: An int containing the port of the default version.
1559 port_registry: A dispatcher.PortRegistry used to provide the Dispatcher
1560 with a mapping of port to Module and Instance.
1561 request_data: A wsgi_request_info.WSGIRequestInfo that will be provided
1562 with request information for use by API stubs.
1563 dispatcher: A Dispatcher instance that can be used to make HTTP requests.
1564 max_instances: The maximum number of instances to create for this module.
1565 If None then there is no limit on the number of created instances.
1566 use_mtime_file_watcher: A bool containing whether to use mtime polling to
1567 monitor file changes even if other options are available on the
1569 automatic_restarts: If True then instances will be restarted when a
1570 file or configuration change that effects them is detected.
1571 allow_skipped_files: If True then all files in the application's directory
1572 are readable, even if they appear in a static handler or "skip_files"
1574 threadsafe_override: If not None, ignore the YAML file value of threadsafe
1575 and use this value instead.
1577 super(ManualScalingModule
, self
).__init
__(module_configuration
,
1583 runtime_stderr_loglevel
,
1589 default_version_port
,
1594 use_mtime_file_watcher
,
1596 allow_skipped_files
,
1597 threadsafe_override
)
1599 self
._process
_manual
_scaling
(module_configuration
.manual_scaling
)
1601 self
._instances
= [] # Protected by self._condition.
1602 self
._wsgi
_servers
= [] # Protected by self._condition.
1603 # Whether the module has been stopped. Protected by self._condition.
1604 self
._suspended
= False
1606 self
._condition
= threading
.Condition() # Protects instance state.
1608 # Serializes operations that modify the serving state of or number of
1610 self
._instances
_change
_lock
= threading
.RLock()
1612 self
._change
_watcher
_thread
= threading
.Thread(
1613 target
=self
._loop
_watching
_for
_changes
)
1616 """Start background management of the Module."""
1617 self
._balanced
_module
.start()
1618 self
._port
_registry
.add(self
.balanced_port
, self
, None)
1620 self
._watcher
.start()
1621 self
._change
_watcher
_thread
.start()
1622 with self
._instances
_change
_lock
:
1623 if self
._max
_instances
is not None:
1624 initial_num_instances
= min(self
._max
_instances
,
1625 self
._initial
_num
_instances
)
1627 initial_num_instances
= self
._initial
_num
_instances
1628 for _
in xrange(initial_num_instances
):
1629 self
._add
_instance
()
1632 """Stops the Module."""
1633 self
._quit
_event
.set()
1634 self
._change
_watcher
_thread
.join()
1635 # The instance adjustment thread depends on the balanced module and the
1636 # watcher so wait for it exit before quitting them.
1638 self
._watcher
.quit()
1639 self
._balanced
_module
.quit()
1640 for wsgi_servr
in self
._wsgi
_servers
:
1642 with self
._condition
:
1643 instances
= self
._instances
1644 self
._instances
= []
1645 self
._condition
.notify_all()
1646 for inst
in instances
:
1647 inst
.quit(force
=True)
1649 def get_instance_port(self
, instance_id
):
1650 """Returns the port of the HTTP server for an instance."""
1652 instance_id
= int(instance_id
)
1654 raise request_info
.InvalidInstanceIdError()
1655 with self
._condition
:
1656 if 0 <= instance_id
< len(self
._instances
):
1657 wsgi_servr
= self
._wsgi
_servers
[instance_id
]
1659 raise request_info
.InvalidInstanceIdError()
1660 return wsgi_servr
.port
1663 def instances(self
):
1664 """A set of all the instances currently in the Module."""
1665 with self
._condition
:
1666 return set(self
._instances
)
1668 def _handle_instance_request(self
,
1676 """Handles a request routed a particular Instance.
1679 environ: An environ dict for the request as defined in PEP-333.
1680 start_response: A function with semantics defined in PEP-333.
1681 url_map: An appinfo.URLMap instance containing the configuration for the
1682 handler that matched.
1683 match: A re.MatchObject containing the result of the matched URL pattern.
1684 request_id: A unique string id associated with the request.
1685 inst: The instance.Instance to send the request to.
1686 request_type: The type of the request. See instance.*_REQUEST module
1690 An iterable over strings containing the body of the HTTP response.
1692 start_time
= time
.time()
1693 timeout_time
= start_time
+ self
._get
_wait
_time
()
1695 while time
.time() < timeout_time
:
1696 logging
.debug('Dispatching request to %s after %0.4fs pending',
1697 inst
, time
.time() - start_time
)
1699 return inst
.handle(environ
, start_response
, url_map
, match
,
1700 request_id
, request_type
)
1701 except instance
.CannotAcceptRequests
:
1703 inst
.wait(timeout_time
)
1705 return self
._error
_response
(environ
, start_response
, 503)
1707 return self
._error
_response
(environ
, start_response
, 503)
1709 with self
._condition
:
1710 self
._condition
.notify()
1712 def _handle_script_request(self
,
1719 request_type
=instance
.NORMAL_REQUEST
):
1720 """Handles a HTTP request that has matched a script handler.
1723 environ: An environ dict for the request as defined in PEP-333.
1724 start_response: A function with semantics defined in PEP-333.
1725 url_map: An appinfo.URLMap instance containing the configuration for the
1726 handler that matched.
1727 match: A re.MatchObject containing the result of the matched URL pattern.
1728 request_id: A unique string id associated with the request.
1729 inst: The instance.Instance to send the request to. If None then an
1730 appropriate instance.Instance will be chosen.
1731 request_type: The type of the request. See instance.*_REQUEST module
1735 An iterable over strings containing the body of the HTTP response.
1737 if ((request_type
in (instance
.NORMAL_REQUEST
, instance
.READY_REQUEST
) and
1738 self
._suspended
) or self
._quit
_event
.is_set()):
1739 return self
._error
_response
(environ
, start_response
, 404)
1740 if self
._module
_configuration
.is_backend
:
1741 environ
['BACKEND_ID'] = self
._module
_configuration
.module_name
1743 environ
['BACKEND_ID'] = (
1744 self
._module
_configuration
.version_id
.split('.', 1)[0])
1745 if inst
is not None:
1746 return self
._handle
_instance
_request
(
1747 environ
, start_response
, url_map
, match
, request_id
, inst
,
1750 start_time
= time
.time()
1751 timeout_time
= start_time
+ self
._get
_wait
_time
()
1753 while time
.time() < timeout_time
:
1754 if ((request_type
in (instance
.NORMAL_REQUEST
, instance
.READY_REQUEST
) and
1755 self
._suspended
) or self
._quit
_event
.is_set()):
1756 return self
._error
_response
(environ
, start_response
, 404)
1757 inst
= self
._choose
_instance
(timeout_time
)
1760 logging
.debug('Dispatching request to %s after %0.4fs pending',
1761 inst
, time
.time() - start_time
)
1762 return inst
.handle(environ
, start_response
, url_map
, match
,
1763 request_id
, request_type
)
1764 except instance
.CannotAcceptRequests
:
1767 with self
._condition
:
1768 self
._condition
.notify()
1770 return self
._error
_response
(environ
, start_response
, 503, _TIMEOUT_HTML
)
1772 def _add_instance(self
):
1773 """Creates and adds a new instance.Instance to the Module.
1775 This must be called with _instances_change_lock held.
1777 instance_id
= self
.get_num_instances()
1778 assert self
._max
_instances
is None or instance_id
< self
._max
_instances
1779 inst
= self
._instance
_factory
.new_instance(instance_id
,
1780 expect_ready_request
=True)
1781 wsgi_servr
= wsgi_server
.WsgiServer(
1782 (self
._host
, 0), functools
.partial(self
._handle
_request
, inst
=inst
))
1784 self
._port
_registry
.add(wsgi_servr
.port
, self
, inst
)
1786 health_check_config
= self
.module_configuration
.vm_health_check
1787 if (self
.module_configuration
.runtime
== 'vm' and
1788 health_check_config
.enable_health_check
):
1789 self
._add
_health
_checks
(inst
, wsgi_servr
, health_check_config
)
1791 with self
._condition
:
1792 if self
._quit
_event
.is_set():
1794 self
._wsgi
_servers
.append(wsgi_servr
)
1795 self
._instances
.append(inst
)
1796 suspended
= self
._suspended
1798 self
._async
_start
_instance
(wsgi_servr
, inst
)
1800 def _add_health_checks(self
, inst
, wsgi_servr
, config
):
1801 do_health_check
= functools
.partial(
1802 self
._do
_health
_check
, wsgi_servr
, inst
)
1803 restart_instance
= functools
.partial(
1804 self
._restart
_instance
, inst
)
1805 health_checker
= health_check_service
.HealthChecker(
1806 inst
, config
, do_health_check
, restart_instance
)
1807 health_checker
.start()
1809 def _async_start_instance(self
, wsgi_servr
, inst
):
1810 return _THREAD_POOL
.submit(self
._start
_instance
, wsgi_servr
, inst
)
1812 def _start_instance(self
, wsgi_servr
, inst
):
1814 if not inst
.start():
1817 logging
.exception('Internal error while starting instance.')
1820 logging
.debug('Started instance: %s at http://%s:%s', inst
, self
.host
,
1823 environ
= self
.build_request_environ(
1824 'GET', '/_ah/start', [], '', '0.1.0.3', wsgi_servr
.port
,
1826 self
._handle
_request
(environ
,
1827 start_response_utils
.null_start_response
,
1829 request_type
=instance
.READY_REQUEST
)
1830 logging
.debug('Sent start request: %s', inst
)
1831 with self
._condition
:
1832 self
._condition
.notify(self
.max_instance_concurrent_requests
)
1833 except Exception, e
: # pylint: disable=broad-except
1834 logging
.exception('Internal error while handling start request: %s', e
)
1836 def _do_health_check(self
, wsgi_servr
, inst
, start_response
,
1837 is_last_successful
):
1838 is_last_successful
= 'yes' if is_last_successful
else 'no'
1839 url
= '/_ah/health?%s' % urllib
.urlencode(
1840 [('IsLastSuccessful', is_last_successful
)])
1841 environ
= self
.build_request_environ(
1842 'GET', url
, [], '', '', wsgi_servr
.port
,
1844 return self
._handle
_request
(
1848 request_type
=instance
.NORMAL_REQUEST
)
1850 def _choose_instance(self
, timeout_time
):
1851 """Returns an Instance to handle a request or None if all are busy."""
1852 with self
._condition
:
1853 while time
.time() < timeout_time
:
1854 for inst
in self
._instances
:
1855 if inst
.can_accept_requests
:
1857 self
._condition
.wait(timeout_time
- time
.time())
1860 def _handle_changes(self
):
1861 """Handle file or configuration changes."""
1862 # Always check for config and file changes because checking also clears
1864 config_changes
= self
._module
_configuration
.check_for_updates()
1865 file_changes
= self
._watcher
.changes()
1866 if application_configuration
.HANDLERS_CHANGED
in config_changes
:
1867 handlers
= self
._create
_url
_handlers
()
1868 with self
._handler
_lock
:
1869 self
._handlers
= handlers
1873 'Detected file changes:\n %s', '\n '.join(sorted(file_changes
)))
1874 self
._instance
_factory
.files_changed()
1876 if config_changes
& _RESTART_INSTANCES_CONFIG_CHANGES
:
1877 self
._instance
_factory
.configuration_changed(config_changes
)
1879 if config_changes
& _RESTART_INSTANCES_CONFIG_CHANGES
or file_changes
:
1880 with self
._instances
_change
_lock
:
1881 if not self
._suspended
:
1884 def _loop_watching_for_changes(self
):
1885 """Loops until the InstancePool is done watching for file changes."""
1886 while not self
._quit
_event
.is_set():
1888 if self
._automatic
_restarts
:
1889 self
._handle
_changes
()
1890 self
._quit
_event
.wait(timeout
=1)
1892 def get_num_instances(self
):
1893 with self
._instances
_change
_lock
:
1894 with self
._condition
:
1895 return len(self
._instances
)
1897 def set_num_instances(self
, instances
):
1898 if self
._max
_instances
is not None:
1899 instances
= min(instances
, self
._max
_instances
)
1901 with self
._instances
_change
_lock
:
1902 with self
._condition
:
1903 running_instances
= self
.get_num_instances()
1904 if running_instances
> instances
:
1905 wsgi_servers_to_quit
= self
._wsgi
_servers
[instances
:]
1906 del self
._wsgi
_servers
[instances
:]
1907 instances_to_quit
= self
._instances
[instances
:]
1908 del self
._instances
[instances
:]
1909 if running_instances
< instances
:
1910 for _
in xrange(instances
- running_instances
):
1911 self
._add
_instance
()
1912 if running_instances
> instances
:
1913 for inst
, wsgi_servr
in zip(instances_to_quit
, wsgi_servers_to_quit
):
1914 self
._async
_quit
_instance
(inst
, wsgi_servr
)
1916 def _async_quit_instance(self
, inst
, wsgi_servr
):
1917 return _THREAD_POOL
.submit(self
._quit
_instance
, inst
, wsgi_servr
)
1919 def _quit_instance(self
, inst
, wsgi_servr
):
1920 port
= wsgi_servr
.port
1922 inst
.quit(expect_shutdown
=True)
1923 self
._shutdown
_instance
(inst
, port
)
1926 """Suspends serving for this module, quitting all running instances."""
1927 with self
._instances
_change
_lock
:
1929 raise request_info
.VersionAlreadyStoppedError()
1930 self
._suspended
= True
1931 with self
._condition
:
1932 instances_to_stop
= zip(self
._instances
, self
._wsgi
_servers
)
1933 for wsgi_servr
in self
._wsgi
_servers
:
1934 wsgi_servr
.set_error(404)
1935 for inst
, wsgi_servr
in instances_to_stop
:
1936 self
._async
_suspend
_instance
(inst
, wsgi_servr
.port
)
1938 def _async_suspend_instance(self
, inst
, port
):
1939 return _THREAD_POOL
.submit(self
._suspend
_instance
, inst
, port
)
1941 def _suspend_instance(self
, inst
, port
):
1942 inst
.quit(expect_shutdown
=True)
1943 self
._shutdown
_instance
(inst
, port
)
1946 """Resumes serving for this module."""
1947 with self
._instances
_change
_lock
:
1948 if not self
._suspended
:
1949 raise request_info
.VersionAlreadyStartedError()
1950 self
._suspended
= False
1951 with self
._condition
:
1952 if self
._quit
_event
.is_set():
1954 wsgi_servers
= self
._wsgi
_servers
1955 instances_to_start
= []
1956 for instance_id
, wsgi_servr
in enumerate(wsgi_servers
):
1957 inst
= self
._instance
_factory
.new_instance(instance_id
,
1958 expect_ready_request
=True)
1959 wsgi_servr
.set_app(functools
.partial(self
._handle
_request
, inst
=inst
))
1960 self
._port
_registry
.add(wsgi_servr
.port
, self
, inst
)
1961 with self
._condition
:
1962 if self
._quit
_event
.is_set():
1964 self
._instances
[instance_id
] = inst
1966 instances_to_start
.append((wsgi_servr
, inst
))
1967 for wsgi_servr
, inst
in instances_to_start
:
1968 self
._async
_start
_instance
(wsgi_servr
, inst
)
1971 """Restarts the module, replacing all running instances."""
1972 with self
._instances
_change
_lock
:
1973 with self
._condition
:
1974 if self
._quit
_event
.is_set():
1976 instances_to_stop
= self
._instances
[:]
1977 wsgi_servers
= self
._wsgi
_servers
[:]
1978 instances_to_start
= []
1979 for instance_id
, wsgi_servr
in enumerate(wsgi_servers
):
1980 inst
= self
._instance
_factory
.new_instance(instance_id
,
1981 expect_ready_request
=True)
1982 wsgi_servr
.set_app(functools
.partial(self
._handle
_request
, inst
=inst
))
1983 self
._port
_registry
.add(wsgi_servr
.port
, self
, inst
)
1984 instances_to_start
.append(inst
)
1985 with self
._condition
:
1986 if self
._quit
_event
.is_set():
1988 self
._instances
[:] = instances_to_start
1990 # Just force instances to stop for a faster restart.
1991 for inst
in instances_to_stop
:
1992 inst
.quit(force
=True)
1995 self
._async
_start
_instance
(wsgi_servr
, inst
)
1996 for wsgi_servr
, inst
in zip(wsgi_servers
, instances_to_start
)]
1997 logging
.info('Waiting for instances to restart')
1999 health_check_config
= self
.module_configuration
.vm_health_check
2000 for (inst
, wsgi_servr
) in zip(instances_to_start
, wsgi_servers
):
2001 if (self
.module_configuration
.runtime
== 'vm'
2002 and health_check_config
.enable_health_check
):
2003 self
._add
_health
_checks
(inst
, wsgi_servr
, health_check_config
)
2005 _
, not_done
= futures
.wait(start_futures
, timeout
=_SHUTDOWN_TIMEOUT
)
2007 logging
.warning('All instances may not have restarted')
2009 logging
.info('Instances restarted')
2011 def _restart_instance(self
, inst
):
2012 """Restarts the specified instance."""
2013 with self
._instances
_change
_lock
:
2014 # Quit the old instance.
2015 inst
.quit(force
=True)
2016 # Create the new instance.
2017 new_instance
= self
._instance
_factory
.new_instance(inst
.instance_id
)
2018 wsgi_servr
= self
._wsgi
_servers
[inst
.instance_id
]
2020 functools
.partial(self
._handle
_request
, inst
=new_instance
))
2021 self
._port
_registry
.add(wsgi_servr
.port
, self
, new_instance
)
2022 # Start the new instance.
2023 self
._start
_instance
(wsgi_servr
, new_instance
)
2024 health_check_config
= self
.module_configuration
.vm_health_check
2025 if (self
.module_configuration
.runtime
== 'vm'
2026 and health_check_config
.enable_health_check
):
2027 self
._add
_health
_checks
(new_instance
, wsgi_servr
, health_check_config
)
2028 # Replace it in the module registry.
2029 with self
._instances
_change
_lock
:
2030 with self
._condition
:
2031 self
._instances
[new_instance
.instance_id
] = new_instance
2033 def get_instance(self
, instance_id
):
2034 """Returns the instance with the provided instance ID."""
2036 with self
._condition
:
2037 return self
._instances
[int(instance_id
)]
2038 except (ValueError, IndexError):
2039 raise request_info
.InvalidInstanceIdError()
2041 def __call__(self
, environ
, start_response
, inst
=None):
2042 return self
._handle
_request
(environ
, start_response
, inst
)
2045 def supports_individually_addressable_instances(self
):
2049 class BasicScalingModule(Module
):
2050 """A pool of instances that is basic-scaled."""
2052 _DEFAULT_BASIC_SCALING
= appinfo
.BasicScaling(max_instances
='1',
2056 def _parse_idle_timeout(timing
):
2057 """Parse a idle timeout string into an int of the value in seconds.
2060 timing: A str of the form 1m or 10s.
2063 An int representation of the value in seconds.
2065 if timing
.endswith('m'):
2066 return int(timing
[:-1]) * 60
2068 return int(timing
[:-1])
2071 def _populate_default_basic_scaling(cls
, basic_scaling
):
2072 for attribute
in basic_scaling
.ATTRIBUTES
:
2073 if getattr(basic_scaling
, attribute
) in ('basic', None):
2074 setattr(basic_scaling
, attribute
,
2075 getattr(cls
._DEFAULT
_BASIC
_SCALING
, attribute
))
2077 def _process_basic_scaling(self
, basic_scaling
):
2079 self
._populate
_default
_basic
_scaling
(basic_scaling
)
2081 basic_scaling
= self
._DEFAULT
_BASIC
_SCALING
2082 if self
._max
_instances
is not None:
2083 self
._max
_instances
= min(self
._max
_instances
,
2084 int(basic_scaling
.max_instances
))
2086 self
._max
_instances
= int(basic_scaling
.max_instances
)
2087 self
._instance
_idle
_timeout
= self
._parse
_idle
_timeout
(
2088 basic_scaling
.idle_timeout
)
2091 module_configuration
,
2097 runtime_stderr_loglevel
,
2103 default_version_port
,
2108 use_mtime_file_watcher
,
2110 allow_skipped_files
,
2111 threadsafe_override
):
2112 """Initializer for BasicScalingModule.
2115 module_configuration: An application_configuration.ModuleConfiguration
2116 instance storing the configuration data for a module.
2117 host: A string containing the host that any HTTP servers should bind to
2119 balanced_port: An int specifying the port where the balanced module for
2120 the pool should listen.
2121 api_host: The host that APIServer listens for RPC requests on.
2122 api_port: The port that APIServer listens for RPC requests on.
2123 auth_domain: A string containing the auth domain to set in the environment
2125 runtime_stderr_loglevel: An int reprenting the minimum logging level at
2126 which runtime log messages should be written to stderr. See
2127 devappserver2.py for possible values.
2128 php_config: A runtime_config_pb2.PhpConfig instances containing PHP
2129 runtime-specific configuration. If None then defaults are used.
2130 python_config: A runtime_config_pb2.PythonConfig instance containing
2131 Python runtime-specific configuration. If None then defaults are used.
2132 java_config: A runtime_config_pb2.JavaConfig instance containing
2133 Java runtime-specific configuration. If None then defaults are used.
2134 cloud_sql_config: A runtime_config_pb2.CloudSQL instance containing the
2135 required configuration for local Google Cloud SQL development. If None
2136 then Cloud SQL will not be available.
2137 vm_config: A runtime_config_pb2.VMConfig instance containing
2138 VM runtime-specific configuration. If None all docker-related stuff
2140 default_version_port: An int containing the port of the default version.
2141 port_registry: A dispatcher.PortRegistry used to provide the Dispatcher
2142 with a mapping of port to Module and Instance.
2143 request_data: A wsgi_request_info.WSGIRequestInfo that will be provided
2144 with request information for use by API stubs.
2145 dispatcher: A Dispatcher instance that can be used to make HTTP requests.
2146 max_instances: The maximum number of instances to create for this module.
2147 If None then there is no limit on the number of created instances.
2148 use_mtime_file_watcher: A bool containing whether to use mtime polling to
2149 monitor file changes even if other options are available on the
2151 automatic_restarts: If True then instances will be restarted when a
2152 file or configuration change that effects them is detected.
2153 allow_skipped_files: If True then all files in the application's directory
2154 are readable, even if they appear in a static handler or "skip_files"
2156 threadsafe_override: If not None, ignore the YAML file value of threadsafe
2157 and use this value instead.
2159 super(BasicScalingModule
, self
).__init
__(module_configuration
,
2165 runtime_stderr_loglevel
,
2171 default_version_port
,
2176 use_mtime_file_watcher
,
2178 allow_skipped_files
,
2179 threadsafe_override
)
2181 self
._process
_basic
_scaling
(module_configuration
.basic_scaling
)
2183 self
._instances
= [] # Protected by self._condition.
2184 self
._wsgi
_servers
= [] # Protected by self._condition.
2185 # A list of booleans signifying whether the corresponding instance in
2186 # self._instances has been or is being started.
2187 self
._instance
_running
= [] # Protected by self._condition.
2189 for instance_id
in xrange(self
._max
_instances
):
2190 inst
= self
._instance
_factory
.new_instance(instance_id
,
2191 expect_ready_request
=True)
2192 self
._instances
.append(inst
)
2193 self
._wsgi
_servers
.append(wsgi_server
.WsgiServer(
2194 (self
._host
, 0), functools
.partial(self
._handle
_request
, inst
=inst
)))
2195 self
._instance
_running
.append(False)
2197 self
._condition
= threading
.Condition() # Protects instance state.
2199 self
._change
_watcher
_thread
= threading
.Thread(
2200 target
=self
._loop
_watching
_for
_changes
_and
_idle
_instances
)
2203 """Start background management of the Module."""
2204 self
._balanced
_module
.start()
2205 self
._port
_registry
.add(self
.balanced_port
, self
, None)
2207 self
._watcher
.start()
2208 self
._change
_watcher
_thread
.start()
2209 for wsgi_servr
, inst
in zip(self
._wsgi
_servers
, self
._instances
):
2211 self
._port
_registry
.add(wsgi_servr
.port
, self
, inst
)
2214 """Stops the Module."""
2215 self
._quit
_event
.set()
2216 self
._change
_watcher
_thread
.join()
2217 # The instance adjustment thread depends on the balanced module and the
2218 # watcher so wait for it exit before quitting them.
2220 self
._watcher
.quit()
2221 self
._balanced
_module
.quit()
2222 for wsgi_servr
in self
._wsgi
_servers
:
2224 with self
._condition
:
2225 instances
= self
._instances
2226 self
._instances
= []
2227 self
._condition
.notify_all()
2228 for inst
in instances
:
2229 inst
.quit(force
=True)
2231 def get_instance_port(self
, instance_id
):
2232 """Returns the port of the HTTP server for an instance."""
2234 instance_id
= int(instance_id
)
2236 raise request_info
.InvalidInstanceIdError()
2237 with self
._condition
:
2238 if 0 <= instance_id
< len(self
._instances
):
2239 wsgi_servr
= self
._wsgi
_servers
[instance_id
]
2241 raise request_info
.InvalidInstanceIdError()
2242 return wsgi_servr
.port
2245 def instances(self
):
2246 """A set of all the instances currently in the Module."""
2247 with self
._condition
:
2248 return set(self
._instances
)
2250 def _handle_instance_request(self
,
2258 """Handles a request routed a particular Instance.
2261 environ: An environ dict for the request as defined in PEP-333.
2262 start_response: A function with semantics defined in PEP-333.
2263 url_map: An appinfo.URLMap instance containing the configuration for the
2264 handler that matched.
2265 match: A re.MatchObject containing the result of the matched URL pattern.
2266 request_id: A unique string id associated with the request.
2267 inst: The instance.Instance to send the request to.
2268 request_type: The type of the request. See instance.*_REQUEST module
2272 An iterable over strings containing the body of the HTTP response.
2274 instance_id
= inst
.instance_id
2275 start_time
= time
.time()
2276 timeout_time
= start_time
+ self
._get
_wait
_time
()
2278 while time
.time() < timeout_time
:
2279 logging
.debug('Dispatching request to %s after %0.4fs pending',
2280 inst
, time
.time() - start_time
)
2282 return inst
.handle(environ
, start_response
, url_map
, match
,
2283 request_id
, request_type
)
2284 except instance
.CannotAcceptRequests
:
2287 return self
._error
_response
(environ
, start_response
, 503)
2288 with self
._condition
:
2289 if self
._instance
_running
[instance_id
]:
2290 should_start
= False
2292 self
._instance
_running
[instance_id
] = True
2295 self
._start
_instance
(instance_id
)
2297 inst
.wait(timeout_time
)
2299 return self
._error
_response
(environ
, start_response
, 503)
2301 with self
._condition
:
2302 self
._condition
.notify()
2304 def _handle_script_request(self
,
2311 request_type
=instance
.NORMAL_REQUEST
):
2312 """Handles a HTTP request that has matched a script handler.
2315 environ: An environ dict for the request as defined in PEP-333.
2316 start_response: A function with semantics defined in PEP-333.
2317 url_map: An appinfo.URLMap instance containing the configuration for the
2318 handler that matched.
2319 match: A re.MatchObject containing the result of the matched URL pattern.
2320 request_id: A unique string id associated with the request.
2321 inst: The instance.Instance to send the request to. If None then an
2322 appropriate instance.Instance will be chosen.
2323 request_type: The type of the request. See instance.*_REQUEST module
2327 An iterable over strings containing the body of the HTTP response.
2329 if self
._quit
_event
.is_set():
2330 return self
._error
_response
(environ
, start_response
, 404)
2331 if self
._module
_configuration
.is_backend
:
2332 environ
['BACKEND_ID'] = self
._module
_configuration
.module_name
2334 environ
['BACKEND_ID'] = (
2335 self
._module
_configuration
.version_id
.split('.', 1)[0])
2336 if inst
is not None:
2337 return self
._handle
_instance
_request
(
2338 environ
, start_response
, url_map
, match
, request_id
, inst
,
2341 start_time
= time
.time()
2342 timeout_time
= start_time
+ self
._get
_wait
_time
()
2343 while time
.time() < timeout_time
:
2344 if self
._quit
_event
.is_set():
2345 return self
._error
_response
(environ
, start_response
, 404)
2346 inst
= self
._choose
_instance
(timeout_time
)
2349 logging
.debug('Dispatching request to %s after %0.4fs pending',
2350 inst
, time
.time() - start_time
)
2351 return inst
.handle(environ
, start_response
, url_map
, match
,
2352 request_id
, request_type
)
2353 except instance
.CannotAcceptRequests
:
2356 with self
._condition
:
2357 self
._condition
.notify()
2359 return self
._error
_response
(environ
, start_response
, 503, _TIMEOUT_HTML
)
2361 def _start_any_instance(self
):
2362 """Choose an inactive instance and start it asynchronously.
2365 An instance.Instance that will be started asynchronously or None if all
2366 instances are already running.
2368 with self
._condition
:
2369 for instance_id
, running
in enumerate(self
._instance
_running
):
2371 self
._instance
_running
[instance_id
] = True
2372 inst
= self
._instances
[instance_id
]
2376 self
._async
_start
_instance
(instance_id
)
2379 def _async_start_instance(self
, instance_id
):
2380 return _THREAD_POOL
.submit(self
._start
_instance
, instance_id
)
2382 def _start_instance(self
, instance_id
):
2383 with self
._condition
:
2384 if self
._quit
_event
.is_set():
2386 wsgi_servr
= self
._wsgi
_servers
[instance_id
]
2387 inst
= self
._instances
[instance_id
]
2389 logging
.debug('Started instance: %s at http://%s:%s', inst
, self
.host
,
2392 environ
= self
.build_request_environ(
2393 'GET', '/_ah/start', [], '', '0.1.0.3', wsgi_servr
.port
,
2395 self
._handle
_request
(environ
,
2396 start_response_utils
.null_start_response
,
2398 request_type
=instance
.READY_REQUEST
)
2399 logging
.debug('Sent start request: %s', inst
)
2400 with self
._condition
:
2401 self
._condition
.notify(self
.max_instance_concurrent_requests
)
2403 logging
.exception('Internal error while handling start request.')
2405 def _choose_instance(self
, timeout_time
):
2406 """Returns an Instance to handle a request or None if all are busy."""
2407 with self
._condition
:
2408 while time
.time() < timeout_time
and not self
._quit
_event
.is_set():
2409 for inst
in self
._instances
:
2410 if inst
.can_accept_requests
:
2413 inst
= self
._start
_any
_instance
()
2416 self
._condition
.wait(timeout_time
- time
.time())
2420 inst
.wait(timeout_time
)
2423 def _handle_changes(self
):
2424 """Handle file or configuration changes."""
2425 # Always check for config and file changes because checking also clears
2427 config_changes
= self
._module
_configuration
.check_for_updates()
2428 file_changes
= self
._watcher
.changes()
2430 if application_configuration
.HANDLERS_CHANGED
in config_changes
:
2431 handlers
= self
._create
_url
_handlers
()
2432 with self
._handler
_lock
:
2433 self
._handlers
= handlers
2436 self
._instance
_factory
.files_changed()
2438 if config_changes
& _RESTART_INSTANCES_CONFIG_CHANGES
:
2439 self
._instance
_factory
.configuration_changed(config_changes
)
2441 if config_changes
& _RESTART_INSTANCES_CONFIG_CHANGES
or file_changes
:
2444 def _loop_watching_for_changes_and_idle_instances(self
):
2445 """Loops until the InstancePool is done watching for file changes."""
2446 while not self
._quit
_event
.is_set():
2448 self
._shutdown
_idle
_instances
()
2449 if self
._automatic
_restarts
:
2450 self
._handle
_changes
()
2451 self
._quit
_event
.wait(timeout
=1)
2453 def _shutdown_idle_instances(self
):
2454 instances_to_stop
= []
2455 with self
._condition
:
2456 for instance_id
, inst
in enumerate(self
._instances
):
2457 if (self
._instance
_running
[instance_id
] and
2458 inst
.idle_seconds
> self
._instance
_idle
_timeout
):
2459 instances_to_stop
.append((self
._instances
[instance_id
],
2460 self
._wsgi
_servers
[instance_id
]))
2461 self
._instance
_running
[instance_id
] = False
2462 new_instance
= self
._instance
_factory
.new_instance(
2463 instance_id
, expect_ready_request
=True)
2464 self
._instances
[instance_id
] = new_instance
2465 wsgi_servr
= self
._wsgi
_servers
[instance_id
]
2467 functools
.partial(self
._handle
_request
, inst
=new_instance
))
2468 self
._port
_registry
.add(wsgi_servr
.port
, self
, new_instance
)
2469 for inst
, wsgi_servr
in instances_to_stop
:
2470 logging
.debug('Shutting down %r', inst
)
2471 self
._stop
_instance
(inst
, wsgi_servr
)
2473 def _stop_instance(self
, inst
, wsgi_servr
):
2474 inst
.quit(expect_shutdown
=True)
2475 self
._async
_shutdown
_instance
(inst
, wsgi_servr
.port
)
2478 """Restarts the module, replacing all running instances."""
2479 instances_to_stop
= []
2480 instances_to_start
= []
2481 with self
._condition
:
2482 if self
._quit
_event
.is_set():
2484 for instance_id
, inst
in enumerate(self
._instances
):
2485 if self
._instance
_running
[instance_id
]:
2486 instances_to_stop
.append((inst
, self
._wsgi
_servers
[instance_id
]))
2487 new_instance
= self
._instance
_factory
.new_instance(
2488 instance_id
, expect_ready_request
=True)
2489 self
._instances
[instance_id
] = new_instance
2490 instances_to_start
.append(instance_id
)
2491 wsgi_servr
= self
._wsgi
_servers
[instance_id
]
2493 functools
.partial(self
._handle
_request
, inst
=new_instance
))
2494 self
._port
_registry
.add(wsgi_servr
.port
, self
, new_instance
)
2495 for instance_id
in instances_to_start
:
2496 self
._async
_start
_instance
(instance_id
)
2497 for inst
, wsgi_servr
in instances_to_stop
:
2498 self
._stop
_instance
(inst
, wsgi_servr
)
2500 def get_instance(self
, instance_id
):
2501 """Returns the instance with the provided instance ID."""
2503 with self
._condition
:
2504 return self
._instances
[int(instance_id
)]
2505 except (ValueError, IndexError):
2506 raise request_info
.InvalidInstanceIdError()
2508 def __call__(self
, environ
, start_response
, inst
=None):
2509 return self
._handle
_request
(environ
, start_response
, inst
)
2512 def supports_individually_addressable_instances(self
):
2516 class InteractiveCommandModule(Module
):
2517 """A Module that can evaluate user commands.
2519 This module manages a single Instance which is started lazily.
2522 _MAX_REQUEST_WAIT_TIME
= 15
2525 module_configuration
,
2531 runtime_stderr_loglevel
,
2537 default_version_port
,
2541 use_mtime_file_watcher
,
2542 allow_skipped_files
,
2543 threadsafe_override
):
2544 """Initializer for InteractiveCommandModule.
2547 module_configuration: An application_configuration.ModuleConfiguration
2548 instance storing the configuration data for this module.
2549 host: A string containing the host that will be used when constructing
2550 HTTP headers sent to the Instance executing the interactive command
2552 balanced_port: An int specifying the port that will be used when
2553 constructing HTTP headers sent to the Instance executing the
2554 interactive command e.g. "localhost".
2555 api_host: The host that APIServer listens for RPC requests on.
2556 api_port: The port that APIServer listens for RPC requests on.
2557 auth_domain: A string containing the auth domain to set in the environment
2559 runtime_stderr_loglevel: An int reprenting the minimum logging level at
2560 which runtime log messages should be written to stderr. See
2561 devappserver2.py for possible values.
2562 php_config: A runtime_config_pb2.PhpConfig instances containing PHP
2563 runtime-specific configuration. If None then defaults are used.
2564 python_config: A runtime_config_pb2.PythonConfig instance containing
2565 Python runtime-specific configuration. If None then defaults are used.
2566 java_config: A runtime_config_pb2.JavaConfig instance containing
2567 Java runtime-specific configuration. If None then defaults are used.
2568 cloud_sql_config: A runtime_config_pb2.CloudSQL instance containing the
2569 required configuration for local Google Cloud SQL development. If None
2570 then Cloud SQL will not be available.
2571 vm_config: A runtime_config_pb2.VMConfig instance containing
2572 VM runtime-specific configuration. If None all docker-related stuff
2574 default_version_port: An int containing the port of the default version.
2575 port_registry: A dispatcher.PortRegistry used to provide the Dispatcher
2576 with a mapping of port to Module and Instance.
2577 request_data: A wsgi_request_info.WSGIRequestInfo that will be provided
2578 with request information for use by API stubs.
2579 dispatcher: A Dispatcher instance that can be used to make HTTP requests.
2580 use_mtime_file_watcher: A bool containing whether to use mtime polling to
2581 monitor file changes even if other options are available on the
2583 allow_skipped_files: If True then all files in the application's directory
2584 are readable, even if they appear in a static handler or "skip_files"
2586 threadsafe_override: If not None, ignore the YAML file value of threadsafe
2587 and use this value instead.
2589 super(InteractiveCommandModule
, self
).__init
__(
2590 module_configuration
,
2596 runtime_stderr_loglevel
,
2602 default_version_port
,
2607 use_mtime_file_watcher
=use_mtime_file_watcher
,
2608 automatic_restarts
=True,
2609 allow_skipped_files
=allow_skipped_files
,
2610 threadsafe_override
=threadsafe_override
)
2611 # Use a single instance so that state is consistent across requests.
2612 self
._inst
_lock
= threading
.Lock()
2616 def balanced_port(self
):
2617 """The port that the balanced HTTP server for the Module is listening on.
2619 The InteractiveCommandModule does not actually listen on this port but it is
2620 used when constructing the "SERVER_PORT" in the WSGI-environment.
2622 return self
._balanced
_port
2625 """Stops the InteractiveCommandModule."""
2627 self
._inst
.quit(force
=True)
2630 def _handle_script_request(self
,
2637 request_type
=instance
.INTERACTIVE_REQUEST
):
2638 """Handles a interactive request by forwarding it to the managed Instance.
2641 environ: An environ dict for the request as defined in PEP-333.
2642 start_response: A function with semantics defined in PEP-333.
2643 url_map: An appinfo.URLMap instance containing the configuration for the
2644 handler that matched.
2645 match: A re.MatchObject containing the result of the matched URL pattern.
2646 request_id: A unique string id associated with the request.
2647 inst: The instance.Instance to send the request to.
2648 request_type: The type of the request. See instance.*_REQUEST module
2649 constants. This must be instance.INTERACTIVE_REQUEST.
2652 An iterable over strings containing the body of the HTTP response.
2655 assert request_type
== instance
.INTERACTIVE_REQUEST
2657 start_time
= time
.time()
2658 timeout_time
= start_time
+ self
._get
_wait
_time
()
2660 while time
.time() < timeout_time
:
2661 new_instance
= False
2662 with self
._inst
_lock
:
2664 self
._inst
= self
._instance
_factory
.new_instance(
2665 AutoScalingModule
.generate_instance_id(),
2666 expect_ready_request
=False)
2674 return inst
.handle(environ
, start_response
, url_map
, match
,
2675 request_id
, request_type
)
2676 except instance
.CannotAcceptRequests
:
2677 inst
.wait(timeout_time
)
2679 # If the instance is restarted while handling a request then the
2680 # exception raises is unpredictable.
2681 if inst
!= self
._inst
:
2682 start_response('503 Service Unavailable', [])
2683 return ['Instance was restarted while executing command']
2684 logging
.exception('Unexpected exception handling command: %r', environ
)
2687 start_response('503 Service Unavailable', [])
2688 return ['The command timed-out while waiting for another one to complete']
2691 """Restarts the module."""
2692 with self
._inst
_lock
:
2694 self
._inst
.quit(force
=True)
2697 def send_interactive_command(self
, command
):
2698 """Sends an interactive command to the module.
2701 command: The command to send e.g. "print 5+5".
2704 A string representing the result of the command e.g. "10\n".
2707 InteractiveCommandError: if the command failed for any reason.
2709 start_response
= start_response_utils
.CapturingStartResponse()
2711 # 192.0.2.0 is an example address defined in RFC 5737.
2712 environ
= self
.build_request_environ(
2713 'POST', '/', [], command
, '192.0.2.0', self
.balanced_port
)
2716 response
= self
._handle
_request
(
2719 request_type
=instance
.INTERACTIVE_REQUEST
)
2720 except Exception as e
:
2721 raise InteractiveCommandError('Unexpected command failure: ', str(e
))
2723 if start_response
.status
!= '200 OK':
2724 raise InteractiveCommandError(start_response
.merged_response(response
))
2726 return start_response
.merged_response(response
)