3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Manage the lifecycle of runtime processes and dispatch requests to them."""
34 import wsgiref
.headers
36 from concurrent
import futures
38 from google
.appengine
.api
import api_base_pb
39 from google
.appengine
.api
import apiproxy_stub_map
40 from google
.appengine
.api
import appinfo
41 from google
.appengine
.api
import request_info
42 from google
.appengine
.api
.logservice
import log_service_pb
43 from google
.appengine
.tools
.devappserver2
import application_configuration
44 from google
.appengine
.tools
.devappserver2
import blob_image
45 from google
.appengine
.tools
.devappserver2
import blob_upload
46 from google
.appengine
.tools
.devappserver2
import channel
47 from google
.appengine
.tools
.devappserver2
import constants
48 from google
.appengine
.tools
.devappserver2
import endpoints
49 from google
.appengine
.tools
.devappserver2
import errors
50 from google
.appengine
.tools
.devappserver2
import file_watcher
51 from google
.appengine
.tools
.devappserver2
import gcs_server
52 from google
.appengine
.tools
.devappserver2
import go_runtime
53 from google
.appengine
.tools
.devappserver2
import health_check_service
54 from google
.appengine
.tools
.devappserver2
import http_runtime_constants
55 from google
.appengine
.tools
.devappserver2
import instance
57 from google
.appengine
.tools
.devappserver2
import java_runtime
60 from google
.appengine
.tools
.devappserver2
import login
61 from google
.appengine
.tools
.devappserver2
import php_runtime
62 from google
.appengine
.tools
.devappserver2
import python_runtime
63 from google
.appengine
.tools
.devappserver2
import request_rewriter
64 from google
.appengine
.tools
.devappserver2
import runtime_config_pb2
65 from google
.appengine
.tools
.devappserver2
import start_response_utils
66 from google
.appengine
.tools
.devappserver2
import static_files_handler
67 from google
.appengine
.tools
.devappserver2
import thread_executor
68 from google
.appengine
.tools
.devappserver2
import url_handler
69 from google
.appengine
.tools
.devappserver2
import util
70 from google
.appengine
.tools
.devappserver2
import vm_runtime_factory
71 from google
.appengine
.tools
.devappserver2
import wsgi_handler
72 from google
.appengine
.tools
.devappserver2
import wsgi_server
75 _LOWER_HEX_DIGITS
= string
.hexdigits
.lower()
76 _UPPER_HEX_DIGITS
= string
.hexdigits
.upper()
77 _REQUEST_ID_HASH_LENGTH
= 8
79 _THREAD_POOL
= thread_executor
.ThreadExecutor()
80 _RESTART_INSTANCES_CONFIG_CHANGES
= frozenset(
81 [application_configuration
.NORMALIZED_LIBRARIES_CHANGED
,
82 application_configuration
.SKIP_FILES_CHANGED
,
83 application_configuration
.NOBUILD_FILES_CHANGED
,
84 # The server must be restarted when the handlers change because files
85 # appearing in static content handlers make them unavailable to the
87 application_configuration
.HANDLERS_CHANGED
,
88 application_configuration
.ENV_VARIABLES_CHANGED
])
90 _REQUEST_LOGGING_BLACKLIST_RE
= re
.compile(
91 r
'^/_ah/(?:channel/(?:dev|jsapi)|img|login|upload)')
93 # Fake arguments for _handle_script_request for request types that don't use
94 # user-specified handlers.
95 _EMPTY_MATCH
= re
.match('', '')
96 _DUMMY_URLMAP
= appinfo
.URLMap(script
='/')
97 _SHUTDOWN_TIMEOUT
= 30
99 _MAX_UPLOAD_MEGABYTES
= 32
100 _MAX_UPLOAD_BYTES
= _MAX_UPLOAD_MEGABYTES
* 1024 * 1024
101 _MAX_UPLOAD_NO_TRIGGER_BAD_CLIENT_BYTES
= 64 * 1024 * 1024
103 _REDIRECT_HTML
= '''\
104 <HTML><HEAD><meta http-equiv="content-type" content="%(content-type)s">
105 <TITLE>%(status)d Moved</TITLE></HEAD>
106 <BODY><H1>%(status)d Moved</H1>
107 The document has moved'
108 <A HREF="%(correct-url)s">here</A>.
112 def _static_files_regex_from_handlers(handlers
):
114 for url_map
in handlers
:
115 handler_type
= url_map
.GetHandlerType()
116 if url_map
.application_readable
:
118 if handler_type
== appinfo
.STATIC_FILES
:
119 patterns
.append(r
'(%s)' % url_map
.upload
)
120 elif handler_type
== appinfo
.STATIC_DIR
:
121 patterns
.append('(%s%s%s)' % (url_map
.static_dir
.rstrip(os
.path
.sep
),
122 re
.escape(os
.path
.sep
), r
'.*'))
123 return r
'^%s$' % '|'.join(patterns
)
126 class InteractiveCommandError(errors
.Error
):
130 class _ScriptHandler(url_handler
.UserConfiguredURLHandler
):
131 """A URL handler that will cause the request to be dispatched to an instance.
133 This handler is special in that it does not have a working handle() method
134 since the Module's dispatch logic is used to select the appropriate Instance.
137 def __init__(self
, url_map
):
138 """Initializer for _ScriptHandler.
141 url_map: An appinfo.URLMap instance containing the configuration for this
145 url_pattern
= re
.compile('%s$' % url_map
.url
)
147 raise errors
.InvalidAppConfigError(
148 'invalid url %r in script handler: %s' % (url_map
.url
, e
))
150 super(_ScriptHandler
, self
).__init
__(url_map
, url_pattern
)
151 self
.url_map
= url_map
153 def handle(self
, match
, environ
, start_response
):
154 """This is a dummy method that should never be called."""
155 raise NotImplementedError()
158 class Module(object):
159 """The abstract base for all instance pool implementations."""
161 _RUNTIME_INSTANCE_FACTORIES
= {
162 'go': go_runtime
.GoRuntimeInstanceFactory
,
163 'php': php_runtime
.PHPRuntimeInstanceFactory
,
164 'python': python_runtime
.PythonRuntimeInstanceFactory
,
165 'python27': python_runtime
.PythonRuntimeInstanceFactory
,
166 # TODO: uncomment for GA.
167 # 'vm': vm_runtime_factory.VMRuntimeInstanceFactory,
170 _RUNTIME_INSTANCE_FACTORIES
.update({
171 'java': java_runtime
.JavaRuntimeInstanceFactory
,
172 'java7': java_runtime
.JavaRuntimeInstanceFactory
,
175 def _create_instance_factory(self
,
176 module_configuration
):
177 """Create an instance.InstanceFactory.
180 module_configuration: An application_configuration.ModuleConfiguration
181 instance storing the configuration data for a module.
184 A instance.InstanceFactory subclass that can be used to create instances
185 with the provided configuration.
188 RuntimeError: if the configuration specifies an unknown runtime.
190 # TODO: a bad runtime should be caught before we get here.
191 if module_configuration
.runtime
not in self
._RUNTIME
_INSTANCE
_FACTORIES
:
193 'Unknown runtime %r; supported runtimes are %s.' %
194 (module_configuration
.runtime
,
196 sorted(repr(k
) for k
in self
._RUNTIME
_INSTANCE
_FACTORIES
))))
197 instance_factory
= self
._RUNTIME
_INSTANCE
_FACTORIES
[
198 module_configuration
.runtime
]
199 return instance_factory(
200 request_data
=self
._request
_data
,
201 runtime_config_getter
=self
._get
_runtime
_config
,
202 module_configuration
=module_configuration
)
204 def _create_url_handlers(self
):
205 """Constructs URLHandlers based on the module configuration.
208 A list of url_handler.URLHandlers corresponding that can react as
209 described in the given configuration.
212 # Add special URL handlers (taking precedence over user-defined handlers)
213 url_pattern
= '/%s$' % login
.LOGIN_URL_RELATIVE
214 handlers
.append(wsgi_handler
.WSGIHandler(login
.application
,
216 url_pattern
= '/%s' % blob_upload
.UPLOAD_URL_PATH
217 # The blobstore upload handler forwards successful requests back to self
219 wsgi_handler
.WSGIHandler(blob_upload
.Application(self
), url_pattern
))
221 url_pattern
= '/%s' % blob_image
.BLOBIMAGE_URL_PATTERN
223 wsgi_handler
.WSGIHandler(blob_image
.Application(), url_pattern
))
225 url_pattern
= '/%s' % channel
.CHANNEL_URL_PATTERN
227 wsgi_handler
.WSGIHandler(channel
.application
, url_pattern
))
229 url_pattern
= '/%s' % gcs_server
.GCS_URL_PATTERN
231 wsgi_handler
.WSGIHandler(gcs_server
.Application(), url_pattern
))
233 url_pattern
= '/%s' % endpoints
.API_SERVING_PATTERN
235 wsgi_handler
.WSGIHandler(
236 endpoints
.EndpointsDispatcher(self
._dispatcher
), url_pattern
))
238 found_start_handler
= False
239 found_warmup_handler
= False
240 # Add user-defined URL handlers
241 for url_map
in self
._module
_configuration
.handlers
:
242 handler_type
= url_map
.GetHandlerType()
243 if handler_type
== appinfo
.HANDLER_SCRIPT
:
244 handlers
.append(_ScriptHandler(url_map
))
245 if not found_start_handler
and re
.match('%s$' % url_map
.url
,
247 found_start_handler
= True
248 if not found_warmup_handler
and re
.match('%s$' % url_map
.url
,
250 found_warmup_handler
= True
251 elif handler_type
== appinfo
.STATIC_FILES
:
253 static_files_handler
.StaticFilesHandler(
254 self
._module
_configuration
.application_root
,
256 elif handler_type
== appinfo
.STATIC_DIR
:
258 static_files_handler
.StaticDirHandler(
259 self
._module
_configuration
.application_root
,
262 assert 0, 'unexpected handler %r for %r' % (handler_type
, url_map
)
263 # Add a handler for /_ah/start if no script handler matches.
264 if not found_start_handler
:
265 handlers
.insert(0, _ScriptHandler(self
._instance
_factory
.START_URL_MAP
))
266 # Add a handler for /_ah/warmup if no script handler matches and warmup is
268 if (not found_warmup_handler
and
269 'warmup' in (self
._module
_configuration
.inbound_services
or [])):
270 handlers
.insert(0, _ScriptHandler(self
._instance
_factory
.WARMUP_URL_MAP
))
273 def _get_runtime_config(self
):
274 """Returns the configuration for the runtime.
277 A runtime_config_pb2.Config instance representing the configuration to be
278 passed to an instance. NOTE: This does *not* include the instance_id
279 field, which must be populated elsewhere.
281 runtime_config
= runtime_config_pb2
.Config()
282 runtime_config
.app_id
= self
._module
_configuration
.application
283 runtime_config
.version_id
= self
._module
_configuration
.version_id
284 if self
._threadsafe
_override
is None:
285 runtime_config
.threadsafe
= self
._module
_configuration
.threadsafe
or False
287 runtime_config
.threadsafe
= self
._threadsafe
_override
288 runtime_config
.application_root
= (
289 self
._module
_configuration
.application_root
)
290 if not self
._allow
_skipped
_files
:
291 runtime_config
.skip_files
= str(self
._module
_configuration
.skip_files
)
292 runtime_config
.static_files
= _static_files_regex_from_handlers(
293 self
._module
_configuration
.handlers
)
294 runtime_config
.api_host
= self
._api
_host
295 runtime_config
.api_port
= self
._api
_port
296 runtime_config
.server_port
= self
._balanced
_port
297 runtime_config
.stderr_log_level
= self
._runtime
_stderr
_loglevel
298 runtime_config
.datacenter
= 'us1'
299 runtime_config
.auth_domain
= self
._auth
_domain
300 if self
._max
_instances
is not None:
301 runtime_config
.max_instances
= self
._max
_instances
303 for library
in self
._module
_configuration
.normalized_libraries
:
304 runtime_config
.libraries
.add(name
=library
.name
, version
=library
.version
)
306 for key
, value
in (self
._module
_configuration
.env_variables
or {}).items():
307 runtime_config
.environ
.add(key
=str(key
), value
=str(value
))
309 if self
._cloud
_sql
_config
:
310 runtime_config
.cloud_sql_config
.CopyFrom(self
._cloud
_sql
_config
)
312 if self
._php
_config
and self
._module
_configuration
.runtime
== 'php':
313 runtime_config
.php_config
.CopyFrom(self
._php
_config
)
314 if (self
._python
_config
and
315 self
._module
_configuration
.runtime
.startswith('python')):
316 runtime_config
.python_config
.CopyFrom(self
._python
_config
)
317 if (self
._java
_config
and
318 self
._module
_configuration
.runtime
.startswith('java')):
319 runtime_config
.java_config
.CopyFrom(self
._java
_config
)
322 runtime_config
.vm_config
.CopyFrom(self
._vm
_config
)
324 return runtime_config
326 def _maybe_restart_instances(self
, config_changed
, file_changed
):
327 """Restarts instances. May avoid some restarts depending on policy.
329 One of config_changed or file_changed must be True.
332 config_changed: True if the configuration for the application has changed.
333 file_changed: True if any file relevant to the application has changed.
335 if not config_changed
and not file_changed
:
338 logging
.debug('Restarting instances.')
339 policy
= self
._instance
_factory
.FILE_CHANGE_INSTANCE_RESTART_POLICY
340 assert policy
is not None, 'FILE_CHANGE_INSTANCE_RESTART_POLICY not set'
342 with self
._condition
:
343 instances_to_quit
= set()
344 for inst
in self
._instances
:
345 if (config_changed
or
346 (policy
== instance
.ALWAYS
) or
347 (policy
== instance
.AFTER_FIRST_REQUEST
and inst
.total_requests
)):
348 instances_to_quit
.add(inst
)
349 self
._instances
-= instances_to_quit
351 for inst
in instances_to_quit
:
352 inst
.quit(allow_async
=True)
354 def _handle_changes(self
):
355 """Handle file or configuration changes."""
356 # Always check for config and file changes because checking also clears
358 config_changes
= self
._module
_configuration
.check_for_updates()
359 has_file_changes
= self
._watcher
.has_changes()
361 if application_configuration
.HANDLERS_CHANGED
in config_changes
:
362 handlers
= self
._create
_url
_handlers
()
363 with self
._handler
_lock
:
364 self
._handlers
= handlers
367 self
._instance
_factory
.files_changed()
369 if config_changes
& _RESTART_INSTANCES_CONFIG_CHANGES
:
370 self
._instance
_factory
.configuration_changed(config_changes
)
372 self
._maybe
_restart
_instances
(
373 config_changed
=bool(config_changes
& _RESTART_INSTANCES_CONFIG_CHANGES
),
374 file_changed
=has_file_changes
)
377 module_configuration
,
383 runtime_stderr_loglevel
,
389 default_version_port
,
394 use_mtime_file_watcher
,
397 threadsafe_override
):
398 """Initializer for Module.
400 module_configuration: An application_configuration.ModuleConfiguration
401 instance storing the configuration data for a module.
402 host: A string containing the host that any HTTP servers should bind to
404 balanced_port: An int specifying the port where the balanced module for
405 the pool should listen.
406 api_host: The host that APIModule listens for RPC requests on.
407 api_port: The port that APIModule listens for RPC requests on.
408 auth_domain: A string containing the auth domain to set in the environment
410 runtime_stderr_loglevel: An int reprenting the minimum logging level at
411 which runtime log messages should be written to stderr. See
412 devappserver2.py for possible values.
413 php_config: A runtime_config_pb2.PhpConfig instances containing PHP
414 runtime-specific configuration. If None then defaults are used.
415 python_config: A runtime_config_pb2.PythonConfig instance containing
416 Python runtime-specific configuration. If None then defaults are used.
417 java_config: A runtime_config_pb2.JavaConfig instance containing
418 Java runtime-specific configuration. If None then defaults are used.
419 cloud_sql_config: A runtime_config_pb2.CloudSQL instance containing the
420 required configuration for local Google Cloud SQL development. If None
421 then Cloud SQL will not be available.
422 vm_config: A runtime_config_pb2.VMConfig instance containing
423 VM runtime-specific configuration. If None all docker-related stuff
425 default_version_port: An int containing the port of the default version.
426 port_registry: A dispatcher.PortRegistry used to provide the Dispatcher
427 with a mapping of port to Module and Instance.
428 request_data: A wsgi_request_info.WSGIRequestInfo that will be provided
429 with request information for use by API stubs.
430 dispatcher: A Dispatcher instance that can be used to make HTTP requests.
431 max_instances: The maximum number of instances to create for this module.
432 If None then there is no limit on the number of created instances.
433 use_mtime_file_watcher: A bool containing whether to use mtime polling to
434 monitor file changes even if other options are available on the
436 automatic_restarts: If True then instances will be restarted when a
437 file or configuration change that effects them is detected.
438 allow_skipped_files: If True then all files in the application's directory
439 are readable, even if they appear in a static handler or "skip_files"
441 threadsafe_override: If not None, ignore the YAML file value of threadsafe
442 and use this value instead.
444 self
._module
_configuration
= module_configuration
445 self
._name
= module_configuration
.module_name
447 self
._api
_host
= api_host
448 self
._api
_port
= api_port
449 self
._auth
_domain
= auth_domain
450 self
._runtime
_stderr
_loglevel
= runtime_stderr_loglevel
451 self
._balanced
_port
= balanced_port
452 self
._php
_config
= php_config
453 self
._python
_config
= python_config
454 self
._java
_config
= java_config
455 self
._cloud
_sql
_config
= cloud_sql_config
456 self
._vm
_config
= vm_config
457 self
._request
_data
= request_data
458 self
._allow
_skipped
_files
= allow_skipped_files
459 self
._threadsafe
_override
= threadsafe_override
460 self
._dispatcher
= dispatcher
461 self
._max
_instances
= max_instances
462 self
._automatic
_restarts
= automatic_restarts
463 self
._use
_mtime
_file
_watcher
= use_mtime_file_watcher
464 self
._default
_version
_port
= default_version_port
465 self
._port
_registry
= port_registry
467 # TODO: remove when GA.
468 if self
._vm
_config
and self
._vm
_config
.HasField('docker_daemon_url'):
469 self
._RUNTIME
_INSTANCE
_FACTORIES
['vm'] = (
470 vm_runtime_factory
.VMRuntimeInstanceFactory
)
472 self
._instance
_factory
= self
._create
_instance
_factory
(
473 self
._module
_configuration
)
474 if self
._automatic
_restarts
:
475 self
._watcher
= file_watcher
.get_file_watcher(
476 [self
._module
_configuration
.application_root
] +
477 self
._instance
_factory
.get_restart_directories(),
478 self
._use
_mtime
_file
_watcher
)
481 self
._handler
_lock
= threading
.Lock()
482 self
._handlers
= self
._create
_url
_handlers
()
483 self
._balanced
_module
= wsgi_server
.WsgiServer(
484 (self
._host
, self
._balanced
_port
), self
)
485 self
._quit
_event
= threading
.Event() # Set when quit() has been called.
489 """The name of the module, as defined in app.yaml.
491 This value will be constant for the lifetime of the module even in the
492 module configuration changes.
498 """The module is ready to handle HTTP requests."""
499 return self
._balanced
_module
.ready
502 def balanced_port(self
):
503 """The port that the balanced HTTP server for the Module is listening on."""
504 assert self
._balanced
_module
.ready
, 'balanced module not running'
505 return self
._balanced
_module
.port
509 """The host that the HTTP server(s) for this Module is listening on."""
513 def balanced_address(self
):
514 """The address of the balanced HTTP server e.g. "localhost:8080"."""
515 if self
.balanced_port
!= 80:
516 return '%s:%s' % (self
.host
, self
.balanced_port
)
521 def max_instance_concurrent_requests(self
):
522 """The number of concurrent requests that each Instance can handle."""
523 return self
._instance
_factory
.max_concurrent_requests
526 def module_configuration(self
):
527 """The application_configuration.ModuleConfiguration for this module."""
528 return self
._module
_configuration
532 """Runtime property for this module."""
533 return self
._module
_configuration
.runtime
536 def effective_runtime(self
):
537 """Effective_runtime property for this module."""
538 return self
._module
_configuration
.effective_runtime
541 def supports_interactive_commands(self
):
542 """True if the module can evaluate arbitrary code and return the result."""
543 return self
._instance
_factory
.SUPPORTS_INTERACTIVE_REQUESTS
545 def _handle_script_request(self
,
551 """Handles a HTTP request that has matched a script handler.
554 environ: An environ dict for the request as defined in PEP-333.
555 start_response: A function with semantics defined in PEP-333.
556 url_map: An appinfo.URLMap instance containing the configuration for the
557 handler that matched.
558 match: A re.MatchObject containing the result of the matched URL pattern.
559 inst: The Instance to send the request to. If None then an appropriate
560 Instance will be chosen.
563 An iterable over strings containing the body of the HTTP response.
565 raise NotImplementedError()
567 def _no_handler_for_request(self
, environ
, start_response
, request_id
):
568 """Handle a HTTP request that does not match any user-defined handlers."""
569 self
._insert
_log
_message
('No handlers matched this URL.', 2, request_id
)
570 start_response('404 Not Found', [('Content-Type', 'text/plain')])
571 return ['The url "%s" does not match any handlers.' % environ
['PATH_INFO']]
573 def _error_response(self
, environ
, start_response
, status
):
574 start_response('%d %s' % (status
, httplib
.responses
[status
]), [])
577 def _handle_request(self
, environ
, start_response
, inst
=None,
578 request_type
=instance
.NORMAL_REQUEST
):
579 """Handles a HTTP request.
582 environ: An environ dict for the request as defined in PEP-333.
583 start_response: A function with semantics defined in PEP-333.
584 inst: The Instance to send the request to. If None then an appropriate
585 Instance will be chosen. Setting inst is not meaningful if the
586 request does not match a "script" handler.
587 request_type: The type of the request. See instance.*_REQUEST module
591 An iterable over strings containing the body of the HTTP response.
595 environ
['SERVER_PORT'] = str(self
.get_instance_port(inst
.instance_id
))
596 except request_info
.NotSupportedWithAutoScalingError
:
597 environ
['SERVER_PORT'] = str(self
.balanced_port
)
599 environ
['SERVER_PORT'] = str(self
.balanced_port
)
600 if 'HTTP_HOST' in environ
:
601 environ
['SERVER_NAME'] = environ
['HTTP_HOST'].split(':', 1)[0]
602 environ
['DEFAULT_VERSION_HOSTNAME'] = '%s:%s' % (
603 environ
['SERVER_NAME'], self
._default
_version
_port
)
604 with self
._request
_data
.request(
606 self
._module
_configuration
) as request_id
:
607 should_log_request
= not _REQUEST_LOGGING_BLACKLIST_RE
.match(
608 environ
['PATH_INFO'])
609 environ
['REQUEST_ID_HASH'] = self
.generate_request_id_hash()
610 if should_log_request
:
611 environ
['REQUEST_LOG_ID'] = self
.generate_request_log_id()
612 if 'HTTP_HOST' in environ
:
613 hostname
= environ
['HTTP_HOST']
614 elif environ
['SERVER_PORT'] == '80':
615 hostname
= environ
['SERVER_NAME']
617 hostname
= '%s:%s' % (environ
['SERVER_NAME'], environ
['SERVER_PORT'])
619 if environ
.get('QUERY_STRING'):
620 resource
= '%s?%s' % (urllib
.quote(environ
['PATH_INFO']),
621 environ
['QUERY_STRING'])
623 resource
= urllib
.quote(environ
['PATH_INFO'])
624 email
, _
, _
= login
.get_user_info(environ
.get('HTTP_COOKIE', ''))
625 method
= environ
.get('REQUEST_METHOD', 'GET')
626 http_version
= environ
.get('SERVER_PROTOCOL', 'HTTP/1.0')
628 logservice
= apiproxy_stub_map
.apiproxy
.GetStub('logservice')
629 logservice
.start_request(
630 request_id
=request_id
,
631 user_request_id
=environ
['REQUEST_LOG_ID'],
632 ip
=environ
.get('REMOTE_ADDR', ''),
633 app_id
=self
._module
_configuration
.application
,
634 version_id
=self
._module
_configuration
.major_version
,
635 nickname
=email
.split('@', 1)[0],
636 user_agent
=environ
.get('HTTP_USER_AGENT', ''),
640 http_version
=http_version
,
641 module
=self
._module
_configuration
.module_name
)
643 def wrapped_start_response(status
, response_headers
, exc_info
=None):
644 response_headers
.append(('Server',
645 http_runtime_constants
.SERVER_SOFTWARE
))
646 if should_log_request
:
647 headers
= wsgiref
.headers
.Headers(response_headers
)
648 status_code
= int(status
.split(' ', 1)[0])
649 content_length
= int(headers
.get('Content-Length', 0))
650 logservice
.end_request(request_id
, status_code
, content_length
)
651 logging
.info('%(module_name)s: '
652 '"%(method)s %(resource)s %(http_version)s" '
653 '%(status)d %(content_length)s',
654 {'module_name': self
.name
,
656 'resource': resource
,
657 'http_version': http_version
,
658 'status': status_code
,
659 'content_length': content_length
or '-'})
660 return start_response(status
, response_headers
, exc_info
)
662 content_length
= int(environ
.get('CONTENT_LENGTH', '0'))
664 if (environ
['REQUEST_METHOD'] in ('GET', 'HEAD', 'DELETE', 'TRACE') and
665 content_length
!= 0):
666 # CONTENT_LENGTH may be empty or absent.
667 wrapped_start_response('400 Bad Request', [])
668 return ['"%s" requests may not contain bodies.' %
669 environ
['REQUEST_METHOD']]
671 # Do not apply request limits to internal _ah handlers (known to break
673 # TODO: research if _ah handlers need limits.
674 if (not environ
.get('REQUEST_URI', '/').startswith('/_ah/') and
675 content_length
> _MAX_UPLOAD_BYTES
):
676 # As allowed by the RFC, cherrypy closes the connection for 413 errors.
677 # Most clients do not handle this correctly and treat the page as
678 # unavailable if the connection is closed before the client can send
679 # all the data. To match the behavior of production, for large files
680 # < 64M read the data to prevent the client bug from being triggered.
682 if content_length
<= _MAX_UPLOAD_NO_TRIGGER_BAD_CLIENT_BYTES
:
683 environ
['wsgi.input'].read(content_length
)
684 status
= '%d %s' % (httplib
.REQUEST_ENTITY_TOO_LARGE
,
685 httplib
.responses
[httplib
.REQUEST_ENTITY_TOO_LARGE
])
686 wrapped_start_response(status
, [])
687 return ['Upload limited to %d megabytes.' % _MAX_UPLOAD_MEGABYTES
]
689 with self
._handler
_lock
:
690 handlers
= self
._handlers
693 path_info
= environ
['PATH_INFO']
694 path_info_normal
= self
._normpath
(path_info
)
695 if path_info_normal
!= path_info
:
696 # While a 301 Moved Permanently makes more sense for non-normal
697 # paths, prod issues a 302 so we do the same.
698 return self
._redirect
_302_path
_info
(path_info_normal
,
700 wrapped_start_response
)
701 if request_type
in (instance
.BACKGROUND_REQUEST
,
702 instance
.INTERACTIVE_REQUEST
,
703 instance
.SHUTDOWN_REQUEST
):
704 app
= functools
.partial(self
._handle
_script
_request
,
705 url_map
=_DUMMY_URLMAP
,
707 request_id
=request_id
,
709 request_type
=request_type
)
710 return request_rewriter
.frontend_rewriter_middleware(app
)(
711 environ
, wrapped_start_response
)
712 for handler
in handlers
:
713 match
= handler
.match(path_info
)
715 auth_failure
= handler
.handle_authorization(environ
,
716 wrapped_start_response
)
717 if auth_failure
is not None:
720 if isinstance(handler
, _ScriptHandler
):
721 app
= functools
.partial(self
._handle
_script
_request
,
722 url_map
=handler
.url_map
,
724 request_id
=request_id
,
726 request_type
=request_type
)
727 return request_rewriter
.frontend_rewriter_middleware(app
)(
728 environ
, wrapped_start_response
)
730 return handler
.handle(match
, environ
, wrapped_start_response
)
731 return self
._no
_handler
_for
_request
(environ
, wrapped_start_response
,
733 except StandardError, e
:
734 logging
.exception('Request to %r failed', path_info
)
735 wrapped_start_response('500 Internal Server Error', [], e
)
738 def _async_shutdown_instance(self
, inst
, port
):
739 return _THREAD_POOL
.submit(self
._shutdown
_instance
, inst
, port
)
741 def _shutdown_instance(self
, inst
, port
):
742 force_shutdown_time
= time
.time() + _SHUTDOWN_TIMEOUT
744 environ
= self
.build_request_environ(
745 'GET', '/_ah/stop', [], '', '0.1.0.3', port
, fake_login
=True)
746 self
._handle
_request
(environ
,
747 start_response_utils
.null_start_response
,
749 request_type
=instance
.SHUTDOWN_REQUEST
)
750 logging
.debug('Sent shutdown request: %s', inst
)
752 logging
.exception('Internal error while handling shutdown request.')
754 time_to_wait
= force_shutdown_time
- time
.time()
755 self
._quit
_event
.wait(time_to_wait
)
756 inst
.quit(force
=True)
759 def _quote_querystring(qs
):
760 """Quote a query string to protect against XSS."""
762 parsed_qs
= urlparse
.parse_qs(qs
, keep_blank_values
=True)
763 # urlparse.parse returns a dictionary with values as lists while
764 # urllib.urlencode does not handle those. Expand to a list of
767 for key
, multivalue
in parsed_qs
.items():
768 for value
in multivalue
:
769 expanded_qs
.append((key
, value
))
770 return urllib
.urlencode(expanded_qs
)
772 def _redirect_302_path_info(self
, updated_path_info
, environ
, start_response
):
773 """Redirect to an updated path.
775 Respond to the current request with a 302 Found status with an updated path
776 but preserving the rest of the request.
779 - WSGI does not make the fragment available so we are not able to preserve
780 it. Luckily prod does not preserve the fragment so it works out.
783 updated_path_info: the new HTTP path to redirect to.
784 environ: WSGI environ object.
785 start_response: WSGI start response callable.
788 WSGI-compatible iterable object representing the body of the response.
790 correct_url
= urlparse
.urlunsplit(
791 (environ
['wsgi.url_scheme'],
792 environ
['HTTP_HOST'],
793 urllib
.quote(updated_path_info
),
794 self
._quote
_querystring
(environ
['QUERY_STRING']),
797 content_type
= 'text/html; charset=utf-8'
798 output
= _REDIRECT_HTML
% {
799 'content-type': content_type
,
800 'status': httplib
.FOUND
,
801 'correct-url': correct_url
804 start_response('%d %s' % (httplib
.FOUND
, httplib
.responses
[httplib
.FOUND
]),
805 [('Content-Type', content_type
),
806 ('Location', correct_url
),
807 ('Content-Length', str(len(output
)))])
812 """Normalize the path by handling . and .. directory entries.
814 Normalizes the path. A directory entry of . is just dropped while a
815 directory entry of .. removes the previous entry. Note that unlike
816 os.path.normpath, redundant separators remain in place to match prod.
822 A normalized HTTP path.
824 normalized_path_entries
= []
825 for entry
in path
.split('/'):
827 if normalized_path_entries
:
828 normalized_path_entries
.pop()
830 normalized_path_entries
.append(entry
)
831 return '/'.join(normalized_path_entries
)
833 def _insert_log_message(self
, message
, level
, request_id
):
834 logs_group
= log_service_pb
.UserAppLogGroup()
835 log_line
= logs_group
.add_log_line()
836 log_line
.set_timestamp_usec(int(time
.time() * 1e6
))
837 log_line
.set_level(level
)
838 log_line
.set_message(message
)
839 request
= log_service_pb
.FlushRequest()
840 request
.set_logs(logs_group
.Encode())
841 response
= api_base_pb
.VoidProto()
842 logservice
= apiproxy_stub_map
.apiproxy
.GetStub('logservice')
843 logservice
._Dynamic
_Flush
(request
, response
, request_id
)
846 def generate_request_log_id():
847 """Generate a random REQUEST_LOG_ID.
850 A string suitable for use as a REQUEST_LOG_ID. The returned string is
851 variable length to emulate the production values, which encapsulate
852 the application id, version and some log state.
854 return ''.join(random
.choice(_LOWER_HEX_DIGITS
)
855 for _
in range(random
.randrange(30, 100)))
858 def generate_request_id_hash():
859 """Generate a random REQUEST_ID_HASH."""
860 return ''.join(random
.choice(_UPPER_HEX_DIGITS
)
861 for _
in range(_REQUEST_ID_HASH_LENGTH
))
863 def set_num_instances(self
, instances
):
864 """Sets the number of instances for this module to run.
867 instances: An int containing the number of instances to run.
869 request_info.NotSupportedWithAutoScalingError: Always.
871 raise request_info
.NotSupportedWithAutoScalingError()
873 def get_num_instances(self
):
874 """Returns the number of instances for this module to run."""
875 raise request_info
.NotSupportedWithAutoScalingError()
878 """Stops the module from serving requests."""
879 raise request_info
.NotSupportedWithAutoScalingError()
882 """Restarts the module."""
883 raise request_info
.NotSupportedWithAutoScalingError()
885 def get_instance_address(self
, instance_id
):
886 """Returns the address of the HTTP server for an instance."""
887 return '%s:%s' % (self
.host
, self
.get_instance_port(instance_id
))
889 def get_instance_port(self
, instance_id
):
890 """Returns the port of the HTTP server for an instance."""
891 raise request_info
.NotSupportedWithAutoScalingError()
893 def get_instance(self
, instance_id
):
894 """Returns the instance with the provided instance ID."""
895 raise request_info
.NotSupportedWithAutoScalingError()
898 def supports_individually_addressable_instances(self
):
901 def create_interactive_command_module(self
):
902 """Returns a InteractiveCommandModule that can be sent user commands."""
903 if self
._instance
_factory
.SUPPORTS_INTERACTIVE_REQUESTS
:
904 return InteractiveCommandModule(self
._module
_configuration
,
910 self
._runtime
_stderr
_loglevel
,
914 self
._cloud
_sql
_config
,
916 self
._default
_version
_port
,
920 self
._use
_mtime
_file
_watcher
,
921 self
._allow
_skipped
_files
,
922 self
._threadsafe
_override
)
924 raise NotImplementedError('runtime does not support interactive commands')
926 def build_request_environ(self
, method
, relative_url
, headers
, body
,
927 source_ip
, port
, fake_login
=False):
928 if isinstance(body
, unicode):
929 body
= body
.encode('ascii')
931 url
= urlparse
.urlsplit(relative_url
)
933 host
= '%s:%s' % (self
.host
, port
)
936 environ
= {constants
.FAKE_IS_ADMIN_HEADER
: '1',
937 'CONTENT_LENGTH': str(len(body
)),
938 'PATH_INFO': url
.path
,
939 'QUERY_STRING': url
.query
,
940 'REQUEST_METHOD': method
,
941 'REMOTE_ADDR': source_ip
,
942 'SERVER_NAME': self
.host
,
943 'SERVER_PORT': str(port
),
944 'SERVER_PROTOCOL': 'HTTP/1.1',
945 'wsgi.version': (1, 0),
946 'wsgi.url_scheme': 'http',
947 'wsgi.errors': cStringIO
.StringIO(),
948 'wsgi.multithread': True,
949 'wsgi.multiprocess': True,
950 'wsgi.input': cStringIO
.StringIO(body
)}
952 environ
[constants
.FAKE_LOGGED_IN_HEADER
] = '1'
953 util
.put_headers_in_environ(headers
, environ
)
954 environ
['HTTP_HOST'] = host
958 class AutoScalingModule(Module
):
959 """A pool of instances that is autoscaled based on traffic."""
961 # The minimum number of seconds to wait, after quitting an idle instance,
962 # before quitting another idle instance.
963 _MIN_SECONDS_BETWEEN_QUITS
= 60
964 # The time horizon to use when calculating the number of instances required
965 # to serve the current level of traffic.
966 _REQUIRED_INSTANCE_WINDOW_SECONDS
= 60
968 _DEFAULT_AUTOMATIC_SCALING
= appinfo
.AutomaticScaling(
969 min_pending_latency
='0.1s',
970 max_pending_latency
='0.5s',
971 min_idle_instances
=1,
972 max_idle_instances
=1000)
975 def _parse_pending_latency(timing
):
976 """Parse a pending latency string into a float of the value in seconds.
979 timing: A str of the form 1.0s or 1000ms.
982 A float representation of the value in seconds.
984 if timing
.endswith('ms'):
985 return float(timing
[:-2]) / 1000
987 return float(timing
[:-1])
990 def _populate_default_automatic_scaling(cls
, automatic_scaling
):
991 for attribute
in automatic_scaling
.ATTRIBUTES
:
992 if getattr(automatic_scaling
, attribute
) in ('automatic', None):
993 setattr(automatic_scaling
, attribute
,
994 getattr(cls
._DEFAULT
_AUTOMATIC
_SCALING
, attribute
))
996 def _process_automatic_scaling(self
, automatic_scaling
):
997 if automatic_scaling
:
998 self
._populate
_default
_automatic
_scaling
(automatic_scaling
)
1000 automatic_scaling
= self
._DEFAULT
_AUTOMATIC
_SCALING
1001 self
._min
_pending
_latency
= self
._parse
_pending
_latency
(
1002 automatic_scaling
.min_pending_latency
)
1003 self
._max
_pending
_latency
= self
._parse
_pending
_latency
(
1004 automatic_scaling
.max_pending_latency
)
1005 self
._min
_idle
_instances
= int(automatic_scaling
.min_idle_instances
)
1006 self
._max
_idle
_instances
= int(automatic_scaling
.max_idle_instances
)
1009 module_configuration
,
1015 runtime_stderr_loglevel
,
1021 default_version_port
,
1026 use_mtime_file_watcher
,
1028 allow_skipped_files
,
1029 threadsafe_override
):
1030 """Initializer for AutoScalingModule.
1033 module_configuration: An application_configuration.ModuleConfiguration
1034 instance storing the configuration data for a module.
1035 host: A string containing the host that any HTTP servers should bind to
1037 balanced_port: An int specifying the port where the balanced module for
1038 the pool should listen.
1039 api_host: The host that APIServer listens for RPC requests on.
1040 api_port: The port that APIServer listens for RPC requests on.
1041 auth_domain: A string containing the auth domain to set in the environment
1043 runtime_stderr_loglevel: An int reprenting the minimum logging level at
1044 which runtime log messages should be written to stderr. See
1045 devappserver2.py for possible values.
1046 php_config: A runtime_config_pb2.PhpConfig instances containing PHP
1047 runtime-specific configuration. If None then defaults are used.
1048 python_config: A runtime_config_pb2.PythonConfig instance containing
1049 Python runtime-specific configuration. If None then defaults are used.
1050 java_config: A runtime_config_pb2.JavaConfig instance containing
1051 Java runtime-specific configuration. If None then defaults are used.
1052 cloud_sql_config: A runtime_config_pb2.CloudSQL instance containing the
1053 required configuration for local Google Cloud SQL development. If None
1054 then Cloud SQL will not be available.
1055 unused_vm_config: A runtime_config_pb2.VMConfig instance containing
1056 VM runtime-specific configuration. Ignored by AutoScalingModule as
1057 autoscaling is not yet supported by VM runtimes.
1058 default_version_port: An int containing the port of the default version.
1059 port_registry: A dispatcher.PortRegistry used to provide the Dispatcher
1060 with a mapping of port to Module and Instance.
1061 request_data: A wsgi_request_info.WSGIRequestInfo that will be provided
1062 with request information for use by API stubs.
1063 dispatcher: A Dispatcher instance that can be used to make HTTP requests.
1064 max_instances: The maximum number of instances to create for this module.
1065 If None then there is no limit on the number of created instances.
1066 use_mtime_file_watcher: A bool containing whether to use mtime polling to
1067 monitor file changes even if other options are available on the
1069 automatic_restarts: If True then instances will be restarted when a
1070 file or configuration change that effects them is detected.
1071 allow_skipped_files: If True then all files in the application's directory
1072 are readable, even if they appear in a static handler or "skip_files"
1074 threadsafe_override: If not None, ignore the YAML file value of threadsafe
1075 and use this value instead.
1077 super(AutoScalingModule
, self
).__init
__(module_configuration
,
1083 runtime_stderr_loglevel
,
1088 # VM runtimes does not support
1091 default_version_port
,
1096 use_mtime_file_watcher
,
1098 allow_skipped_files
,
1099 threadsafe_override
)
1102 self
._process
_automatic
_scaling
(
1103 self
._module
_configuration
.automatic_scaling
)
1105 self
._instances
= set() # Protected by self._condition.
1106 # A deque containg (time, num_outstanding_instance_requests) 2-tuples.
1107 # This is used to track the maximum number of outstanding requests in a time
1108 # period. Protected by self._condition.
1109 self
._outstanding
_request
_history
= collections
.deque()
1110 self
._num
_outstanding
_instance
_requests
= 0 # Protected by self._condition.
1111 # The time when the last instance was quit in seconds since the epoch.
1112 self
._last
_instance
_quit
_time
= 0 # Protected by self._condition.
1114 self
._condition
= threading
.Condition() # Protects instance state.
1115 self
._instance
_adjustment
_thread
= threading
.Thread(
1116 target
=self
._loop
_adjusting
_instances
)
1119 """Start background management of the Module."""
1120 self
._balanced
_module
.start()
1121 self
._port
_registry
.add(self
.balanced_port
, self
, None)
1123 self
._watcher
.start()
1124 self
._instance
_adjustment
_thread
.start()
1127 """Stops the Module."""
1128 self
._quit
_event
.set()
1129 self
._instance
_adjustment
_thread
.join()
1130 # The instance adjustment thread depends on the balanced module and the
1131 # watcher so wait for it exit before quitting them.
1133 self
._watcher
.quit()
1134 self
._balanced
_module
.quit()
1135 with self
._condition
:
1136 instances
= self
._instances
1137 self
._instances
= set()
1138 self
._condition
.notify_all()
1139 for inst
in instances
:
1140 inst
.quit(force
=True)
1143 def instances(self
):
1144 """A set of all the instances currently in the Module."""
1145 with self
._condition
:
1146 return set(self
._instances
)
1149 def num_outstanding_instance_requests(self
):
1150 """The number of requests that instances are currently handling."""
1151 with self
._condition
:
1152 return self
._num
_outstanding
_instance
_requests
1154 def _handle_instance_request(self
,
1162 """Handles a request routed a particular Instance.
1165 environ: An environ dict for the request as defined in PEP-333.
1166 start_response: A function with semantics defined in PEP-333.
1167 url_map: An appinfo.URLMap instance containing the configuration for the
1168 handler that matched.
1169 match: A re.MatchObject containing the result of the matched URL pattern.
1170 request_id: A unique string id associated with the request.
1171 inst: The instance.Instance to send the request to.
1172 request_type: The type of the request. See instance.*_REQUEST module
1176 An iterable over strings containing the body of the HTTP response.
1178 if request_type
!= instance
.READY_REQUEST
:
1179 with self
._condition
:
1180 self
._num
_outstanding
_instance
_requests
+= 1
1181 self
._outstanding
_request
_history
.append(
1182 (time
.time(), self
.num_outstanding_instance_requests
))
1184 logging
.debug('Dispatching request to %s', inst
)
1185 return inst
.handle(environ
, start_response
, url_map
, match
, request_id
,
1188 with self
._condition
:
1189 if request_type
!= instance
.READY_REQUEST
:
1190 self
._num
_outstanding
_instance
_requests
-= 1
1191 self
._condition
.notify()
1193 def _handle_script_request(self
,
1200 request_type
=instance
.NORMAL_REQUEST
):
1201 """Handles a HTTP request that has matched a script handler.
1204 environ: An environ dict for the request as defined in PEP-333.
1205 start_response: A function with semantics defined in PEP-333.
1206 url_map: An appinfo.URLMap instance containing the configuration for the
1207 handler that matched.
1208 match: A re.MatchObject containing the result of the matched URL pattern.
1209 request_id: A unique string id associated with the request.
1210 inst: The instance.Instance to send the request to. If None then an
1211 appropriate instance.Instance will be chosen.
1212 request_type: The type of the request. See instance.*_REQUEST module
1216 An iterable over strings containing the body of the HTTP response.
1218 if inst
is not None:
1219 return self
._handle
_instance
_request
(
1220 environ
, start_response
, url_map
, match
, request_id
, inst
,
1223 with self
._condition
:
1224 self
._num
_outstanding
_instance
_requests
+= 1
1225 self
._outstanding
_request
_history
.append(
1226 (time
.time(), self
.num_outstanding_instance_requests
))
1229 start_time
= time
.time()
1230 timeout_time
= start_time
+ self
._min
_pending
_latency
1231 # Loop until an instance is available to handle the request.
1233 if self
._quit
_event
.is_set():
1234 return self
._error
_response
(environ
, start_response
, 404)
1235 inst
= self
._choose
_instance
(timeout_time
)
1237 inst
= self
._add
_instance
(permit_warmup
=False)
1239 # No instance is available nor can a new one be created, so loop
1240 # waiting for one to be free.
1241 timeout_time
= time
.time() + 0.2
1245 logging
.debug('Dispatching request to %s after %0.4fs pending',
1246 inst
, time
.time() - start_time
)
1247 return inst
.handle(environ
,
1253 except instance
.CannotAcceptRequests
:
1256 with self
._condition
:
1257 self
._num
_outstanding
_instance
_requests
-= 1
1258 self
._condition
.notify()
1260 def _add_instance(self
, permit_warmup
):
1261 """Creates and adds a new instance.Instance to the Module.
1264 permit_warmup: If True then the new instance.Instance will be sent a new
1265 warmup request if it is configured to receive them.
1268 The newly created instance.Instance. Returns None if no new instance
1269 could be created because the maximum number of instances have already
1272 if self
._max
_instances
is not None:
1273 with self
._condition
:
1274 if len(self
._instances
) >= self
._max
_instances
:
1277 perform_warmup
= permit_warmup
and (
1278 'warmup' in (self
._module
_configuration
.inbound_services
or []))
1280 inst
= self
._instance
_factory
.new_instance(
1281 self
.generate_instance_id(),
1282 expect_ready_request
=perform_warmup
)
1284 with self
._condition
:
1285 if self
._quit
_event
.is_set():
1287 self
._instances
.add(inst
)
1289 if not inst
.start():
1293 self
._async
_warmup
(inst
)
1295 with self
._condition
:
1296 self
._condition
.notify(self
.max_instance_concurrent_requests
)
1297 logging
.debug('Created instance: %s', inst
)
1301 def generate_instance_id():
1302 return ''.join(random
.choice(_LOWER_HEX_DIGITS
) for _
in range(36))
1304 def _warmup(self
, inst
):
1305 """Send a warmup request to the given instance."""
1308 environ
= self
.build_request_environ(
1309 'GET', '/_ah/warmup', [], '', '0.1.0.3', self
.balanced_port
,
1311 self
._handle
_request
(environ
,
1312 start_response_utils
.null_start_response
,
1314 request_type
=instance
.READY_REQUEST
)
1315 with self
._condition
:
1316 self
._condition
.notify(self
.max_instance_concurrent_requests
)
1318 logging
.exception('Internal error while handling warmup request.')
1320 def _async_warmup(self
, inst
):
1321 """Asynchronously send a markup request to the given Instance."""
1322 return _THREAD_POOL
.submit(self
._warmup
, inst
)
1324 def _trim_outstanding_request_history(self
):
1325 """Removes obsolete entries from _outstanding_request_history."""
1326 window_start
= time
.time() - self
._REQUIRED
_INSTANCE
_WINDOW
_SECONDS
1327 with self
._condition
:
1328 while self
._outstanding
_request
_history
:
1329 t
, _
= self
._outstanding
_request
_history
[0]
1330 if t
< window_start
:
1331 self
._outstanding
_request
_history
.popleft()
1335 def _get_num_required_instances(self
):
1336 """Returns the number of Instances required to handle the request load."""
1337 with self
._condition
:
1338 self
._trim
_outstanding
_request
_history
()
1339 if not self
._outstanding
_request
_history
:
1342 peak_concurrent_requests
= max(
1344 for (t
, current_requests
)
1345 in self
._outstanding
_request
_history
)
1346 return int(math
.ceil(peak_concurrent_requests
/
1347 self
.max_instance_concurrent_requests
))
1349 def _split_instances(self
):
1350 """Returns a 2-tuple representing the required and extra Instances.
1353 A 2-tuple of (required_instances, not_required_instances):
1354 required_instances: The set of the instance.Instances, in a state that
1355 can handle requests, required to handle the current
1357 not_required_instances: The set of the Instances contained in this
1358 Module that not are not required.
1360 with self
._condition
:
1361 num_required_instances
= self
._get
_num
_required
_instances
()
1363 available
= [inst
for inst
in self
._instances
1364 if inst
.can_accept_requests
]
1365 available
.sort(key
=lambda inst
: -inst
.num_outstanding_requests
)
1367 required
= set(available
[:num_required_instances
])
1368 return required
, self
._instances
- required
1370 def _choose_instance(self
, timeout_time
):
1371 """Returns the best Instance to handle a request or None if all are busy."""
1372 with self
._condition
:
1373 while time
.time() < timeout_time
:
1374 required_instances
, not_required_instances
= self
._split
_instances
()
1375 if required_instances
:
1376 # Pick the instance with the most remaining capacity to handle
1378 required_instances
= sorted(
1380 key
=lambda inst
: inst
.remaining_request_capacity
)
1381 if required_instances
[-1].remaining_request_capacity
:
1382 return required_instances
[-1]
1384 available_instances
= [inst
for inst
in not_required_instances
1385 if inst
.remaining_request_capacity
> 0 and
1386 inst
.can_accept_requests
]
1387 if available_instances
:
1388 # Pick the instance with the *least* capacity to handle requests
1389 # to avoid using unnecessary idle instances.
1390 available_instances
.sort(
1391 key
=lambda instance
: instance
.num_outstanding_requests
)
1392 return available_instances
[-1]
1394 self
._condition
.wait(timeout_time
- time
.time())
1397 def _adjust_instances(self
):
1398 """Creates new Instances or deletes idle Instances based on current load."""
1400 with self
._condition
:
1401 _
, not_required_instances
= self
._split
_instances
()
1403 if len(not_required_instances
) < self
._min
_idle
_instances
:
1404 self
._add
_instance
(permit_warmup
=True)
1405 elif (len(not_required_instances
) > self
._max
_idle
_instances
and
1407 (self
._last
_instance
_quit
_time
+ self
._MIN
_SECONDS
_BETWEEN
_QUITS
)):
1408 for inst
in not_required_instances
:
1409 if not inst
.num_outstanding_requests
:
1412 except instance
.CannotQuitServingInstance
:
1415 self
._last
_instance
_quit
_time
= now
1416 logging
.debug('Quit instance: %s', inst
)
1417 with self
._condition
:
1418 self
._instances
.discard(inst
)
1421 def _loop_adjusting_instances(self
):
1422 """Loops until the Module exits, reloading, adding or removing Instances."""
1423 while not self
._quit
_event
.is_set():
1425 if self
._automatic
_restarts
:
1426 self
._handle
_changes
()
1427 self
._adjust
_instances
()
1428 self
._quit
_event
.wait(timeout
=1)
1430 def __call__(self
, environ
, start_response
):
1431 return self
._handle
_request
(environ
, start_response
)
1434 class ManualScalingModule(Module
):
1435 """A pool of instances that is manually-scaled."""
1437 _DEFAULT_MANUAL_SCALING
= appinfo
.ManualScaling(instances
='1')
1438 _MAX_REQUEST_WAIT_TIME
= 10
1441 def _populate_default_manual_scaling(cls
, manual_scaling
):
1442 for attribute
in manual_scaling
.ATTRIBUTES
:
1443 if getattr(manual_scaling
, attribute
) in ('manual', None):
1444 setattr(manual_scaling
, attribute
,
1445 getattr(cls
._DEFAULT
_MANUAL
_SCALING
, attribute
))
1447 def _process_manual_scaling(self
, manual_scaling
):
1449 self
._populate
_default
_manual
_scaling
(manual_scaling
)
1451 manual_scaling
= self
._DEFAULT
_MANUAL
_SCALING
1452 self
._initial
_num
_instances
= int(manual_scaling
.instances
)
1455 module_configuration
,
1461 runtime_stderr_loglevel
,
1467 default_version_port
,
1472 use_mtime_file_watcher
,
1474 allow_skipped_files
,
1475 threadsafe_override
):
1477 """Initializer for ManualScalingModule.
1480 module_configuration: An application_configuration.ModuleConfiguration
1481 instance storing the configuration data for a module.
1482 host: A string containing the host that any HTTP servers should bind to
1484 balanced_port: An int specifying the port where the balanced module for
1485 the pool should listen.
1486 api_host: The host that APIServer listens for RPC requests on.
1487 api_port: The port that APIServer listens for RPC requests on.
1488 auth_domain: A string containing the auth domain to set in the environment
1490 runtime_stderr_loglevel: An int reprenting the minimum logging level at
1491 which runtime log messages should be written to stderr. See
1492 devappserver2.py for possible values.
1493 php_config: A runtime_config_pb2.PhpConfig instances containing PHP
1494 runtime-specific configuration. If None then defaults are used.
1495 python_config: A runtime_config_pb2.PythonConfig instance containing
1496 Python runtime-specific configuration. If None then defaults are used.
1497 java_config: A runtime_config_pb2.JavaConfig instance containing
1498 Java runtime-specific configuration. If None then defaults are used.
1499 cloud_sql_config: A runtime_config_pb2.CloudSQL instance containing the
1500 required configuration for local Google Cloud SQL development. If None
1501 then Cloud SQL will not be available.
1502 vm_config: A runtime_config_pb2.VMConfig instance containing
1503 VM runtime-specific configuration. If None all docker-related stuff
1505 default_version_port: An int containing the port of the default version.
1506 port_registry: A dispatcher.PortRegistry used to provide the Dispatcher
1507 with a mapping of port to Module and Instance.
1508 request_data: A wsgi_request_info.WSGIRequestInfo that will be provided
1509 with request information for use by API stubs.
1510 dispatcher: A Dispatcher instance that can be used to make HTTP requests.
1511 max_instances: The maximum number of instances to create for this module.
1512 If None then there is no limit on the number of created instances.
1513 use_mtime_file_watcher: A bool containing whether to use mtime polling to
1514 monitor file changes even if other options are available on the
1516 automatic_restarts: If True then instances will be restarted when a
1517 file or configuration change that effects them is detected.
1518 allow_skipped_files: If True then all files in the application's directory
1519 are readable, even if they appear in a static handler or "skip_files"
1521 threadsafe_override: If not None, ignore the YAML file value of threadsafe
1522 and use this value instead.
1524 super(ManualScalingModule
, self
).__init
__(module_configuration
,
1530 runtime_stderr_loglevel
,
1536 default_version_port
,
1541 use_mtime_file_watcher
,
1543 allow_skipped_files
,
1544 threadsafe_override
)
1547 self
._process
_manual
_scaling
(module_configuration
.manual_scaling
)
1549 self
._instances
= [] # Protected by self._condition.
1550 self
._wsgi
_servers
= [] # Protected by self._condition.
1551 # Whether the module has been stopped. Protected by self._condition.
1552 self
._suspended
= False
1554 self
._condition
= threading
.Condition() # Protects instance state.
1556 # Serializes operations that modify the serving state of or number of
1558 self
._instances
_change
_lock
= threading
.RLock()
1560 self
._change
_watcher
_thread
= threading
.Thread(
1561 target
=self
._loop
_watching
_for
_changes
)
1564 """Start background management of the Module."""
1565 self
._balanced
_module
.start()
1566 self
._port
_registry
.add(self
.balanced_port
, self
, None)
1568 self
._watcher
.start()
1569 self
._change
_watcher
_thread
.start()
1570 with self
._instances
_change
_lock
:
1571 if self
._max
_instances
is not None:
1572 initial_num_instances
= min(self
._max
_instances
,
1573 self
._initial
_num
_instances
)
1575 initial_num_instances
= self
._initial
_num
_instances
1576 for _
in xrange(initial_num_instances
):
1577 self
._add
_instance
()
1580 """Stops the Module."""
1581 self
._quit
_event
.set()
1582 self
._change
_watcher
_thread
.join()
1583 # The instance adjustment thread depends on the balanced module and the
1584 # watcher so wait for it exit before quitting them.
1586 self
._watcher
.quit()
1587 self
._balanced
_module
.quit()
1588 for wsgi_servr
in self
._wsgi
_servers
:
1590 with self
._condition
:
1591 instances
= self
._instances
1592 self
._instances
= []
1593 self
._condition
.notify_all()
1594 for inst
in instances
:
1595 inst
.quit(force
=True)
1597 def get_instance_port(self
, instance_id
):
1598 """Returns the port of the HTTP server for an instance."""
1600 instance_id
= int(instance_id
)
1602 raise request_info
.InvalidInstanceIdError()
1603 with self
._condition
:
1604 if 0 <= instance_id
< len(self
._instances
):
1605 wsgi_servr
= self
._wsgi
_servers
[instance_id
]
1607 raise request_info
.InvalidInstanceIdError()
1608 return wsgi_servr
.port
1611 def instances(self
):
1612 """A set of all the instances currently in the Module."""
1613 with self
._condition
:
1614 return set(self
._instances
)
1616 def _handle_instance_request(self
,
1624 """Handles a request routed a particular Instance.
1627 environ: An environ dict for the request as defined in PEP-333.
1628 start_response: A function with semantics defined in PEP-333.
1629 url_map: An appinfo.URLMap instance containing the configuration for the
1630 handler that matched.
1631 match: A re.MatchObject containing the result of the matched URL pattern.
1632 request_id: A unique string id associated with the request.
1633 inst: The instance.Instance to send the request to.
1634 request_type: The type of the request. See instance.*_REQUEST module
1638 An iterable over strings containing the body of the HTTP response.
1640 start_time
= time
.time()
1641 timeout_time
= start_time
+ self
._MAX
_REQUEST
_WAIT
_TIME
1643 while time
.time() < timeout_time
:
1644 logging
.debug('Dispatching request to %s after %0.4fs pending',
1645 inst
, time
.time() - start_time
)
1647 return inst
.handle(environ
, start_response
, url_map
, match
,
1648 request_id
, request_type
)
1649 except instance
.CannotAcceptRequests
:
1651 inst
.wait(timeout_time
)
1653 return self
._error
_response
(environ
, start_response
, 503)
1655 return self
._error
_response
(environ
, start_response
, 503)
1657 with self
._condition
:
1658 self
._condition
.notify()
1660 def _handle_script_request(self
,
1667 request_type
=instance
.NORMAL_REQUEST
):
1668 """Handles a HTTP request that has matched a script handler.
1671 environ: An environ dict for the request as defined in PEP-333.
1672 start_response: A function with semantics defined in PEP-333.
1673 url_map: An appinfo.URLMap instance containing the configuration for the
1674 handler that matched.
1675 match: A re.MatchObject containing the result of the matched URL pattern.
1676 request_id: A unique string id associated with the request.
1677 inst: The instance.Instance to send the request to. If None then an
1678 appropriate instance.Instance will be chosen.
1679 request_type: The type of the request. See instance.*_REQUEST module
1683 An iterable over strings containing the body of the HTTP response.
1685 if ((request_type
in (instance
.NORMAL_REQUEST
, instance
.READY_REQUEST
) and
1686 self
._suspended
) or self
._quit
_event
.is_set()):
1687 return self
._error
_response
(environ
, start_response
, 404)
1688 if self
._module
_configuration
.is_backend
:
1689 environ
['BACKEND_ID'] = self
._module
_configuration
.module_name
1691 environ
['BACKEND_ID'] = (
1692 self
._module
_configuration
.version_id
.split('.', 1)[0])
1693 if inst
is not None:
1694 return self
._handle
_instance
_request
(
1695 environ
, start_response
, url_map
, match
, request_id
, inst
,
1698 start_time
= time
.time()
1699 timeout_time
= start_time
+ self
._MAX
_REQUEST
_WAIT
_TIME
1700 while time
.time() < timeout_time
:
1701 if ((request_type
in (instance
.NORMAL_REQUEST
, instance
.READY_REQUEST
) and
1702 self
._suspended
) or self
._quit
_event
.is_set()):
1703 return self
._error
_response
(environ
, start_response
, 404)
1704 inst
= self
._choose
_instance
(timeout_time
)
1707 logging
.debug('Dispatching request to %s after %0.4fs pending',
1708 inst
, time
.time() - start_time
)
1709 return inst
.handle(environ
, start_response
, url_map
, match
,
1710 request_id
, request_type
)
1711 except instance
.CannotAcceptRequests
:
1714 with self
._condition
:
1715 self
._condition
.notify()
1717 return self
._error
_response
(environ
, start_response
, 503)
1719 def _add_instance(self
):
1720 """Creates and adds a new instance.Instance to the Module.
1722 This must be called with _instances_change_lock held.
1724 instance_id
= self
.get_num_instances()
1725 assert self
._max
_instances
is None or instance_id
< self
._max
_instances
1726 inst
= self
._instance
_factory
.new_instance(instance_id
,
1727 expect_ready_request
=True)
1728 wsgi_servr
= wsgi_server
.WsgiServer(
1729 (self
._host
, 0), functools
.partial(self
._handle
_request
, inst
=inst
))
1731 self
._port
_registry
.add(wsgi_servr
.port
, self
, inst
)
1733 health_check_config
= self
.module_configuration
.vm_health_check
1734 if (self
.module_configuration
.runtime
== 'vm' and
1735 health_check_config
.enable_health_check
):
1736 self
._add
_health
_checks
(inst
, wsgi_servr
, health_check_config
)
1738 with self
._condition
:
1739 if self
._quit
_event
.is_set():
1741 self
._wsgi
_servers
.append(wsgi_servr
)
1742 self
._instances
.append(inst
)
1743 suspended
= self
._suspended
1745 self
._async
_start
_instance
(wsgi_servr
, inst
)
1747 def _add_health_checks(self
, inst
, wsgi_servr
, config
):
1748 do_health_check
= functools
.partial(
1749 self
._do
_health
_check
, wsgi_servr
, inst
)
1750 restart_instance
= functools
.partial(
1751 self
._restart
_instance
, inst
)
1752 health_checker
= health_check_service
.HealthChecker(
1753 inst
, config
, do_health_check
, restart_instance
)
1754 health_checker
.start()
1756 def _async_start_instance(self
, wsgi_servr
, inst
):
1757 return _THREAD_POOL
.submit(self
._start
_instance
, wsgi_servr
, inst
)
1759 def _start_instance(self
, wsgi_servr
, inst
):
1761 if not inst
.start():
1764 logging
.exception('Internal error while starting instance.')
1767 logging
.debug('Started instance: %s at http://%s:%s', inst
, self
.host
,
1770 environ
= self
.build_request_environ(
1771 'GET', '/_ah/start', [], '', '0.1.0.3', wsgi_servr
.port
,
1773 self
._handle
_request
(environ
,
1774 start_response_utils
.null_start_response
,
1776 request_type
=instance
.READY_REQUEST
)
1777 logging
.debug('Sent start request: %s', inst
)
1778 with self
._condition
:
1779 self
._condition
.notify(self
.max_instance_concurrent_requests
)
1780 except Exception, e
: # pylint: disable=broad-except
1781 logging
.exception('Internal error while handling start request: %s', e
)
1783 def _do_health_check(self
, wsgi_servr
, inst
, start_response
,
1784 is_last_successful
):
1785 is_last_successful
= 'yes' if is_last_successful
else 'no'
1786 url
= '/_ah/health?%s' % urllib
.urlencode(
1787 [('IsLastSuccessful', is_last_successful
)])
1788 environ
= self
.build_request_environ(
1789 'GET', url
, [], '', '', wsgi_servr
.port
,
1791 return self
._handle
_request
(
1795 request_type
=instance
.NORMAL_REQUEST
)
1797 def _choose_instance(self
, timeout_time
):
1798 """Returns an Instance to handle a request or None if all are busy."""
1799 with self
._condition
:
1800 while time
.time() < timeout_time
:
1801 for inst
in self
._instances
:
1802 if inst
.can_accept_requests
:
1804 self
._condition
.wait(timeout_time
- time
.time())
1807 def _handle_changes(self
):
1808 """Handle file or configuration changes."""
1809 # Always check for config and file changes because checking also clears
1811 config_changes
= self
._module
_configuration
.check_for_updates()
1812 has_file_changes
= self
._watcher
.has_changes()
1814 if application_configuration
.HANDLERS_CHANGED
in config_changes
:
1815 handlers
= self
._create
_url
_handlers
()
1816 with self
._handler
_lock
:
1817 self
._handlers
= handlers
1819 if has_file_changes
:
1820 self
._instance
_factory
.files_changed()
1822 if config_changes
& _RESTART_INSTANCES_CONFIG_CHANGES
:
1823 self
._instance
_factory
.configuration_changed(config_changes
)
1825 if config_changes
& _RESTART_INSTANCES_CONFIG_CHANGES
or has_file_changes
:
1826 with self
._instances
_change
_lock
:
1827 if not self
._suspended
:
1830 def _loop_watching_for_changes(self
):
1831 """Loops until the InstancePool is done watching for file changes."""
1832 while not self
._quit
_event
.is_set():
1834 if self
._automatic
_restarts
:
1835 self
._handle
_changes
()
1836 self
._quit
_event
.wait(timeout
=1)
1838 def get_num_instances(self
):
1839 with self
._instances
_change
_lock
:
1840 with self
._condition
:
1841 return len(self
._instances
)
1843 def set_num_instances(self
, instances
):
1844 if self
._max
_instances
is not None:
1845 instances
= min(instances
, self
._max
_instances
)
1847 with self
._instances
_change
_lock
:
1848 with self
._condition
:
1849 running_instances
= self
.get_num_instances()
1850 if running_instances
> instances
:
1851 wsgi_servers_to_quit
= self
._wsgi
_servers
[instances
:]
1852 del self
._wsgi
_servers
[instances
:]
1853 instances_to_quit
= self
._instances
[instances
:]
1854 del self
._instances
[instances
:]
1855 if running_instances
< instances
:
1856 for _
in xrange(instances
- running_instances
):
1857 self
._add
_instance
()
1858 if running_instances
> instances
:
1859 for inst
, wsgi_servr
in zip(instances_to_quit
, wsgi_servers_to_quit
):
1860 self
._async
_quit
_instance
(inst
, wsgi_servr
)
1862 def _async_quit_instance(self
, inst
, wsgi_servr
):
1863 return _THREAD_POOL
.submit(self
._quit
_instance
, inst
, wsgi_servr
)
1865 def _quit_instance(self
, inst
, wsgi_servr
):
1866 port
= wsgi_servr
.port
1868 inst
.quit(expect_shutdown
=True)
1869 self
._shutdown
_instance
(inst
, port
)
1872 """Suspends serving for this module, quitting all running instances."""
1873 with self
._instances
_change
_lock
:
1875 raise request_info
.VersionAlreadyStoppedError()
1876 self
._suspended
= True
1877 with self
._condition
:
1878 instances_to_stop
= zip(self
._instances
, self
._wsgi
_servers
)
1879 for wsgi_servr
in self
._wsgi
_servers
:
1880 wsgi_servr
.set_error(404)
1881 for inst
, wsgi_servr
in instances_to_stop
:
1882 self
._async
_suspend
_instance
(inst
, wsgi_servr
.port
)
1884 def _async_suspend_instance(self
, inst
, port
):
1885 return _THREAD_POOL
.submit(self
._suspend
_instance
, inst
, port
)
1887 def _suspend_instance(self
, inst
, port
):
1888 inst
.quit(expect_shutdown
=True)
1889 self
._shutdown
_instance
(inst
, port
)
1892 """Resumes serving for this module."""
1893 with self
._instances
_change
_lock
:
1894 if not self
._suspended
:
1895 raise request_info
.VersionAlreadyStartedError()
1896 self
._suspended
= False
1897 with self
._condition
:
1898 if self
._quit
_event
.is_set():
1900 wsgi_servers
= self
._wsgi
_servers
1901 instances_to_start
= []
1902 for instance_id
, wsgi_servr
in enumerate(wsgi_servers
):
1903 inst
= self
._instance
_factory
.new_instance(instance_id
,
1904 expect_ready_request
=True)
1905 wsgi_servr
.set_app(functools
.partial(self
._handle
_request
, inst
=inst
))
1906 self
._port
_registry
.add(wsgi_servr
.port
, self
, inst
)
1907 with self
._condition
:
1908 if self
._quit
_event
.is_set():
1910 self
._instances
[instance_id
] = inst
1912 instances_to_start
.append((wsgi_servr
, inst
))
1913 for wsgi_servr
, inst
in instances_to_start
:
1914 self
._async
_start
_instance
(wsgi_servr
, inst
)
1917 """Restarts the module, replacing all running instances."""
1918 with self
._instances
_change
_lock
:
1919 with self
._condition
:
1920 if self
._quit
_event
.is_set():
1922 instances_to_stop
= self
._instances
[:]
1923 wsgi_servers
= self
._wsgi
_servers
[:]
1924 instances_to_start
= []
1925 for instance_id
, wsgi_servr
in enumerate(wsgi_servers
):
1926 inst
= self
._instance
_factory
.new_instance(instance_id
,
1927 expect_ready_request
=True)
1928 wsgi_servr
.set_app(functools
.partial(self
._handle
_request
, inst
=inst
))
1929 self
._port
_registry
.add(wsgi_servr
.port
, self
, inst
)
1930 instances_to_start
.append(inst
)
1931 with self
._condition
:
1932 if self
._quit
_event
.is_set():
1934 self
._instances
[:] = instances_to_start
1936 # Just force instances to stop for a faster restart.
1937 for inst
in instances_to_stop
:
1938 inst
.quit(force
=True)
1941 self
._async
_start
_instance
(wsgi_servr
, inst
)
1942 for wsgi_servr
, inst
in zip(wsgi_servers
, instances_to_start
)]
1943 logging
.info('Waiting for instances to restart')
1945 health_check_config
= self
.module_configuration
.vm_health_check
1946 for (inst
, wsgi_servr
) in zip(instances_to_start
, wsgi_servers
):
1947 if (self
.module_configuration
.runtime
== 'vm'
1948 and health_check_config
.enable_health_check
):
1949 self
._add
_health
_checks
(inst
, wsgi_servr
, health_check_config
)
1951 _
, not_done
= futures
.wait(start_futures
, timeout
=_SHUTDOWN_TIMEOUT
)
1953 logging
.warning('All instances may not have restarted')
1955 logging
.info('Instances restarted')
1957 def _restart_instance(self
, inst
):
1958 """Restarts the specified instance."""
1959 with self
._instances
_change
_lock
:
1960 # Quit the old instance.
1961 inst
.quit(force
=True)
1962 # Create the new instance.
1963 new_instance
= self
._instance
_factory
.new_instance(inst
.instance_id
)
1964 wsgi_servr
= self
._wsgi
_servers
[inst
.instance_id
]
1966 functools
.partial(self
._handle
_request
, inst
=new_instance
))
1967 self
._port
_registry
.add(wsgi_servr
.port
, self
, new_instance
)
1968 # Start the new instance.
1969 self
._start
_instance
(wsgi_servr
, new_instance
)
1970 health_check_config
= self
.module_configuration
.vm_health_check
1971 if (self
.module_configuration
.runtime
== 'vm'
1972 and health_check_config
.enable_health_check
):
1973 self
._add
_health
_checks
(new_instance
, wsgi_servr
, health_check_config
)
1974 # Replace it in the module registry.
1975 with self
._instances
_change
_lock
:
1976 with self
._condition
:
1977 self
._instances
[new_instance
.instance_id
] = new_instance
1979 def get_instance(self
, instance_id
):
1980 """Returns the instance with the provided instance ID."""
1982 with self
._condition
:
1983 return self
._instances
[int(instance_id
)]
1984 except (ValueError, IndexError):
1985 raise request_info
.InvalidInstanceIdError()
1987 def __call__(self
, environ
, start_response
, inst
=None):
1988 return self
._handle
_request
(environ
, start_response
, inst
)
1991 def supports_individually_addressable_instances(self
):
1995 class BasicScalingModule(Module
):
1996 """A pool of instances that is basic-scaled."""
1998 _DEFAULT_BASIC_SCALING
= appinfo
.BasicScaling(max_instances
='1',
2000 _MAX_REQUEST_WAIT_TIME
= 10
2003 def _parse_idle_timeout(timing
):
2004 """Parse a idle timeout string into an int of the value in seconds.
2007 timing: A str of the form 1m or 10s.
2010 An int representation of the value in seconds.
2012 if timing
.endswith('m'):
2013 return int(timing
[:-1]) * 60
2015 return int(timing
[:-1])
2018 def _populate_default_basic_scaling(cls
, basic_scaling
):
2019 for attribute
in basic_scaling
.ATTRIBUTES
:
2020 if getattr(basic_scaling
, attribute
) in ('basic', None):
2021 setattr(basic_scaling
, attribute
,
2022 getattr(cls
._DEFAULT
_BASIC
_SCALING
, attribute
))
2024 def _process_basic_scaling(self
, basic_scaling
):
2026 self
._populate
_default
_basic
_scaling
(basic_scaling
)
2028 basic_scaling
= self
._DEFAULT
_BASIC
_SCALING
2029 if self
._max
_instances
is not None:
2030 self
._max
_instances
= min(self
._max
_instances
,
2031 int(basic_scaling
.max_instances
))
2033 self
._max
_instances
= int(basic_scaling
.max_instances
)
2034 self
._instance
_idle
_timeout
= self
._parse
_idle
_timeout
(
2035 basic_scaling
.idle_timeout
)
2038 module_configuration
,
2044 runtime_stderr_loglevel
,
2050 default_version_port
,
2055 use_mtime_file_watcher
,
2057 allow_skipped_files
,
2058 threadsafe_override
):
2060 """Initializer for BasicScalingModule.
2063 module_configuration: An application_configuration.ModuleConfiguration
2064 instance storing the configuration data for a module.
2065 host: A string containing the host that any HTTP servers should bind to
2067 balanced_port: An int specifying the port where the balanced module for
2068 the pool should listen.
2069 api_host: The host that APIServer listens for RPC requests on.
2070 api_port: The port that APIServer listens for RPC requests on.
2071 auth_domain: A string containing the auth domain to set in the environment
2073 runtime_stderr_loglevel: An int reprenting the minimum logging level at
2074 which runtime log messages should be written to stderr. See
2075 devappserver2.py for possible values.
2076 php_config: A runtime_config_pb2.PhpConfig instances containing PHP
2077 runtime-specific configuration. If None then defaults are used.
2078 python_config: A runtime_config_pb2.PythonConfig instance containing
2079 Python runtime-specific configuration. If None then defaults are used.
2080 java_config: A runtime_config_pb2.JavaConfig instance containing
2081 Java runtime-specific configuration. If None then defaults are used.
2082 cloud_sql_config: A runtime_config_pb2.CloudSQL instance containing the
2083 required configuration for local Google Cloud SQL development. If None
2084 then Cloud SQL will not be available.
2085 vm_config: A runtime_config_pb2.VMConfig instance containing
2086 VM runtime-specific configuration. If None all docker-related stuff
2088 default_version_port: An int containing the port of the default version.
2089 port_registry: A dispatcher.PortRegistry used to provide the Dispatcher
2090 with a mapping of port to Module and Instance.
2091 request_data: A wsgi_request_info.WSGIRequestInfo that will be provided
2092 with request information for use by API stubs.
2093 dispatcher: A Dispatcher instance that can be used to make HTTP requests.
2094 max_instances: The maximum number of instances to create for this module.
2095 If None then there is no limit on the number of created instances.
2096 use_mtime_file_watcher: A bool containing whether to use mtime polling to
2097 monitor file changes even if other options are available on the
2099 automatic_restarts: If True then instances will be restarted when a
2100 file or configuration change that effects them is detected.
2101 allow_skipped_files: If True then all files in the application's directory
2102 are readable, even if they appear in a static handler or "skip_files"
2104 threadsafe_override: If not None, ignore the YAML file value of threadsafe
2105 and use this value instead.
2107 super(BasicScalingModule
, self
).__init
__(module_configuration
,
2113 runtime_stderr_loglevel
,
2119 default_version_port
,
2124 use_mtime_file_watcher
,
2126 allow_skipped_files
,
2127 threadsafe_override
)
2129 self
._process
_basic
_scaling
(module_configuration
.basic_scaling
)
2131 self
._instances
= [] # Protected by self._condition.
2132 self
._wsgi
_servers
= [] # Protected by self._condition.
2133 # A list of booleans signifying whether the corresponding instance in
2134 # self._instances has been or is being started.
2135 self
._instance
_running
= [] # Protected by self._condition.
2137 for instance_id
in xrange(self
._max
_instances
):
2138 inst
= self
._instance
_factory
.new_instance(instance_id
,
2139 expect_ready_request
=True)
2140 self
._instances
.append(inst
)
2141 self
._wsgi
_servers
.append(wsgi_server
.WsgiServer(
2142 (self
._host
, 0), functools
.partial(self
._handle
_request
, inst
=inst
)))
2143 self
._instance
_running
.append(False)
2145 self
._condition
= threading
.Condition() # Protects instance state.
2147 self
._change
_watcher
_thread
= threading
.Thread(
2148 target
=self
._loop
_watching
_for
_changes
_and
_idle
_instances
)
2151 """Start background management of the Module."""
2152 self
._balanced
_module
.start()
2153 self
._port
_registry
.add(self
.balanced_port
, self
, None)
2155 self
._watcher
.start()
2156 self
._change
_watcher
_thread
.start()
2157 for wsgi_servr
, inst
in zip(self
._wsgi
_servers
, self
._instances
):
2159 self
._port
_registry
.add(wsgi_servr
.port
, self
, inst
)
2162 """Stops the Module."""
2163 self
._quit
_event
.set()
2164 self
._change
_watcher
_thread
.join()
2165 # The instance adjustment thread depends on the balanced module and the
2166 # watcher so wait for it exit before quitting them.
2168 self
._watcher
.quit()
2169 self
._balanced
_module
.quit()
2170 for wsgi_servr
in self
._wsgi
_servers
:
2172 with self
._condition
:
2173 instances
= self
._instances
2174 self
._instances
= []
2175 self
._condition
.notify_all()
2176 for inst
in instances
:
2177 inst
.quit(force
=True)
2179 def get_instance_port(self
, instance_id
):
2180 """Returns the port of the HTTP server for an instance."""
2182 instance_id
= int(instance_id
)
2184 raise request_info
.InvalidInstanceIdError()
2185 with self
._condition
:
2186 if 0 <= instance_id
< len(self
._instances
):
2187 wsgi_servr
= self
._wsgi
_servers
[instance_id
]
2189 raise request_info
.InvalidInstanceIdError()
2190 return wsgi_servr
.port
2193 def instances(self
):
2194 """A set of all the instances currently in the Module."""
2195 with self
._condition
:
2196 return set(self
._instances
)
2198 def _handle_instance_request(self
,
2206 """Handles a request routed a particular Instance.
2209 environ: An environ dict for the request as defined in PEP-333.
2210 start_response: A function with semantics defined in PEP-333.
2211 url_map: An appinfo.URLMap instance containing the configuration for the
2212 handler that matched.
2213 match: A re.MatchObject containing the result of the matched URL pattern.
2214 request_id: A unique string id associated with the request.
2215 inst: The instance.Instance to send the request to.
2216 request_type: The type of the request. See instance.*_REQUEST module
2220 An iterable over strings containing the body of the HTTP response.
2222 instance_id
= inst
.instance_id
2223 start_time
= time
.time()
2224 timeout_time
= start_time
+ self
._MAX
_REQUEST
_WAIT
_TIME
2226 while time
.time() < timeout_time
:
2227 logging
.debug('Dispatching request to %s after %0.4fs pending',
2228 inst
, time
.time() - start_time
)
2230 return inst
.handle(environ
, start_response
, url_map
, match
,
2231 request_id
, request_type
)
2232 except instance
.CannotAcceptRequests
:
2235 return self
._error
_response
(environ
, start_response
, 503)
2236 with self
._condition
:
2237 if self
._instance
_running
[instance_id
]:
2238 should_start
= False
2240 self
._instance
_running
[instance_id
] = True
2243 self
._start
_instance
(instance_id
)
2245 inst
.wait(timeout_time
)
2247 return self
._error
_response
(environ
, start_response
, 503)
2249 with self
._condition
:
2250 self
._condition
.notify()
2252 def _handle_script_request(self
,
2259 request_type
=instance
.NORMAL_REQUEST
):
2260 """Handles a HTTP request that has matched a script handler.
2263 environ: An environ dict for the request as defined in PEP-333.
2264 start_response: A function with semantics defined in PEP-333.
2265 url_map: An appinfo.URLMap instance containing the configuration for the
2266 handler that matched.
2267 match: A re.MatchObject containing the result of the matched URL pattern.
2268 request_id: A unique string id associated with the request.
2269 inst: The instance.Instance to send the request to. If None then an
2270 appropriate instance.Instance will be chosen.
2271 request_type: The type of the request. See instance.*_REQUEST module
2275 An iterable over strings containing the body of the HTTP response.
2277 if self
._quit
_event
.is_set():
2278 return self
._error
_response
(environ
, start_response
, 404)
2279 if self
._module
_configuration
.is_backend
:
2280 environ
['BACKEND_ID'] = self
._module
_configuration
.module_name
2282 environ
['BACKEND_ID'] = (
2283 self
._module
_configuration
.version_id
.split('.', 1)[0])
2284 if inst
is not None:
2285 return self
._handle
_instance
_request
(
2286 environ
, start_response
, url_map
, match
, request_id
, inst
,
2289 start_time
= time
.time()
2290 timeout_time
= start_time
+ self
._MAX
_REQUEST
_WAIT
_TIME
2291 while time
.time() < timeout_time
:
2292 if self
._quit
_event
.is_set():
2293 return self
._error
_response
(environ
, start_response
, 404)
2294 inst
= self
._choose
_instance
(timeout_time
)
2297 logging
.debug('Dispatching request to %s after %0.4fs pending',
2298 inst
, time
.time() - start_time
)
2299 return inst
.handle(environ
, start_response
, url_map
, match
,
2300 request_id
, request_type
)
2301 except instance
.CannotAcceptRequests
:
2304 with self
._condition
:
2305 self
._condition
.notify()
2307 return self
._error
_response
(environ
, start_response
, 503)
2309 def _start_any_instance(self
):
2310 """Choose an inactive instance and start it asynchronously.
2313 An instance.Instance that will be started asynchronously or None if all
2314 instances are already running.
2316 with self
._condition
:
2317 for instance_id
, running
in enumerate(self
._instance
_running
):
2319 self
._instance
_running
[instance_id
] = True
2320 inst
= self
._instances
[instance_id
]
2324 self
._async
_start
_instance
(instance_id
)
2327 def _async_start_instance(self
, instance_id
):
2328 return _THREAD_POOL
.submit(self
._start
_instance
, instance_id
)
2330 def _start_instance(self
, instance_id
):
2331 with self
._condition
:
2332 if self
._quit
_event
.is_set():
2334 wsgi_servr
= self
._wsgi
_servers
[instance_id
]
2335 inst
= self
._instances
[instance_id
]
2337 logging
.debug('Started instance: %s at http://%s:%s', inst
, self
.host
,
2340 environ
= self
.build_request_environ(
2341 'GET', '/_ah/start', [], '', '0.1.0.3', wsgi_servr
.port
,
2343 self
._handle
_request
(environ
,
2344 start_response_utils
.null_start_response
,
2346 request_type
=instance
.READY_REQUEST
)
2347 logging
.debug('Sent start request: %s', inst
)
2348 with self
._condition
:
2349 self
._condition
.notify(self
.max_instance_concurrent_requests
)
2351 logging
.exception('Internal error while handling start request.')
2353 def _choose_instance(self
, timeout_time
):
2354 """Returns an Instance to handle a request or None if all are busy."""
2355 with self
._condition
:
2356 while time
.time() < timeout_time
and not self
._quit
_event
.is_set():
2357 for inst
in self
._instances
:
2358 if inst
.can_accept_requests
:
2361 inst
= self
._start
_any
_instance
()
2364 self
._condition
.wait(timeout_time
- time
.time())
2368 inst
.wait(timeout_time
)
2371 def _handle_changes(self
):
2372 """Handle file or configuration changes."""
2373 # Always check for config and file changes because checking also clears
2375 config_changes
= self
._module
_configuration
.check_for_updates()
2376 has_file_changes
= self
._watcher
.has_changes()
2378 if application_configuration
.HANDLERS_CHANGED
in config_changes
:
2379 handlers
= self
._create
_url
_handlers
()
2380 with self
._handler
_lock
:
2381 self
._handlers
= handlers
2383 if has_file_changes
:
2384 self
._instance
_factory
.files_changed()
2386 if config_changes
& _RESTART_INSTANCES_CONFIG_CHANGES
:
2387 self
._instance
_factory
.configuration_changed(config_changes
)
2389 if config_changes
& _RESTART_INSTANCES_CONFIG_CHANGES
or has_file_changes
:
2392 def _loop_watching_for_changes_and_idle_instances(self
):
2393 """Loops until the InstancePool is done watching for file changes."""
2394 while not self
._quit
_event
.is_set():
2396 self
._shutdown
_idle
_instances
()
2397 if self
._automatic
_restarts
:
2398 self
._handle
_changes
()
2399 self
._quit
_event
.wait(timeout
=1)
2401 def _shutdown_idle_instances(self
):
2402 instances_to_stop
= []
2403 with self
._condition
:
2404 for instance_id
, inst
in enumerate(self
._instances
):
2405 if (self
._instance
_running
[instance_id
] and
2406 inst
.idle_seconds
> self
._instance
_idle
_timeout
):
2407 instances_to_stop
.append((self
._instances
[instance_id
],
2408 self
._wsgi
_servers
[instance_id
]))
2409 self
._instance
_running
[instance_id
] = False
2410 new_instance
= self
._instance
_factory
.new_instance(
2411 instance_id
, expect_ready_request
=True)
2412 self
._instances
[instance_id
] = new_instance
2413 wsgi_servr
= self
._wsgi
_servers
[instance_id
]
2415 functools
.partial(self
._handle
_request
, inst
=new_instance
))
2416 self
._port
_registry
.add(wsgi_servr
.port
, self
, new_instance
)
2417 for inst
, wsgi_servr
in instances_to_stop
:
2418 logging
.debug('Shutting down %r', inst
)
2419 self
._stop
_instance
(inst
, wsgi_servr
)
2421 def _stop_instance(self
, inst
, wsgi_servr
):
2422 inst
.quit(expect_shutdown
=True)
2423 self
._async
_shutdown
_instance
(inst
, wsgi_servr
.port
)
2426 """Restarts the module, replacing all running instances."""
2427 instances_to_stop
= []
2428 instances_to_start
= []
2429 with self
._condition
:
2430 if self
._quit
_event
.is_set():
2432 for instance_id
, inst
in enumerate(self
._instances
):
2433 if self
._instance
_running
[instance_id
]:
2434 instances_to_stop
.append((inst
, self
._wsgi
_servers
[instance_id
]))
2435 new_instance
= self
._instance
_factory
.new_instance(
2436 instance_id
, expect_ready_request
=True)
2437 self
._instances
[instance_id
] = new_instance
2438 instances_to_start
.append(instance_id
)
2439 wsgi_servr
= self
._wsgi
_servers
[instance_id
]
2441 functools
.partial(self
._handle
_request
, inst
=new_instance
))
2442 self
._port
_registry
.add(wsgi_servr
.port
, self
, new_instance
)
2443 for instance_id
in instances_to_start
:
2444 self
._async
_start
_instance
(instance_id
)
2445 for inst
, wsgi_servr
in instances_to_stop
:
2446 self
._stop
_instance
(inst
, wsgi_servr
)
2448 def get_instance(self
, instance_id
):
2449 """Returns the instance with the provided instance ID."""
2451 with self
._condition
:
2452 return self
._instances
[int(instance_id
)]
2453 except (ValueError, IndexError):
2454 raise request_info
.InvalidInstanceIdError()
2456 def __call__(self
, environ
, start_response
, inst
=None):
2457 return self
._handle
_request
(environ
, start_response
, inst
)
2460 def supports_individually_addressable_instances(self
):
2464 class InteractiveCommandModule(Module
):
2465 """A Module that can evaluate user commands.
2467 This module manages a single Instance which is started lazily.
2470 _MAX_REQUEST_WAIT_TIME
= 15
2473 module_configuration
,
2479 runtime_stderr_loglevel
,
2485 default_version_port
,
2489 use_mtime_file_watcher
,
2490 allow_skipped_files
,
2491 threadsafe_override
):
2492 """Initializer for InteractiveCommandModule.
2495 module_configuration: An application_configuration.ModuleConfiguration
2496 instance storing the configuration data for this module.
2497 host: A string containing the host that will be used when constructing
2498 HTTP headers sent to the Instance executing the interactive command
2500 balanced_port: An int specifying the port that will be used when
2501 constructing HTTP headers sent to the Instance executing the
2502 interactive command e.g. "localhost".
2503 api_host: The host that APIServer listens for RPC requests on.
2504 api_port: The port that APIServer listens for RPC requests on.
2505 auth_domain: A string containing the auth domain to set in the environment
2507 runtime_stderr_loglevel: An int reprenting the minimum logging level at
2508 which runtime log messages should be written to stderr. See
2509 devappserver2.py for possible values.
2510 php_config: A runtime_config_pb2.PhpConfig instances containing PHP
2511 runtime-specific configuration. If None then defaults are used.
2512 python_config: A runtime_config_pb2.PythonConfig instance containing
2513 Python runtime-specific configuration. If None then defaults are used.
2514 java_config: A runtime_config_pb2.JavaConfig instance containing
2515 Java runtime-specific configuration. If None then defaults are used.
2516 cloud_sql_config: A runtime_config_pb2.CloudSQL instance containing the
2517 required configuration for local Google Cloud SQL development. If None
2518 then Cloud SQL will not be available.
2519 vm_config: A runtime_config_pb2.VMConfig instance containing
2520 VM runtime-specific configuration. If None all docker-related stuff
2522 default_version_port: An int containing the port of the default version.
2523 port_registry: A dispatcher.PortRegistry used to provide the Dispatcher
2524 with a mapping of port to Module and Instance.
2525 request_data: A wsgi_request_info.WSGIRequestInfo that will be provided
2526 with request information for use by API stubs.
2527 dispatcher: A Dispatcher instance that can be used to make HTTP requests.
2528 use_mtime_file_watcher: A bool containing whether to use mtime polling to
2529 monitor file changes even if other options are available on the
2531 allow_skipped_files: If True then all files in the application's directory
2532 are readable, even if they appear in a static handler or "skip_files"
2534 threadsafe_override: If not None, ignore the YAML file value of threadsafe
2535 and use this value instead.
2537 super(InteractiveCommandModule
, self
).__init
__(
2538 module_configuration
,
2544 runtime_stderr_loglevel
,
2550 default_version_port
,
2555 use_mtime_file_watcher
=use_mtime_file_watcher
,
2556 automatic_restarts
=True,
2557 allow_skipped_files
=allow_skipped_files
,
2558 threadsafe_override
=threadsafe_override
)
2559 # Use a single instance so that state is consistent across requests.
2560 self
._inst
_lock
= threading
.Lock()
2564 def balanced_port(self
):
2565 """The port that the balanced HTTP server for the Module is listening on.
2567 The InteractiveCommandModule does not actually listen on this port but it is
2568 used when constructing the "SERVER_PORT" in the WSGI-environment.
2570 return self
._balanced
_port
2573 """Stops the InteractiveCommandModule."""
2575 self
._inst
.quit(force
=True)
2578 def _handle_script_request(self
,
2585 request_type
=instance
.INTERACTIVE_REQUEST
):
2586 """Handles a interactive request by forwarding it to the managed Instance.
2589 environ: An environ dict for the request as defined in PEP-333.
2590 start_response: A function with semantics defined in PEP-333.
2591 url_map: An appinfo.URLMap instance containing the configuration for the
2592 handler that matched.
2593 match: A re.MatchObject containing the result of the matched URL pattern.
2594 request_id: A unique string id associated with the request.
2595 inst: The instance.Instance to send the request to.
2596 request_type: The type of the request. See instance.*_REQUEST module
2597 constants. This must be instance.INTERACTIVE_REQUEST.
2600 An iterable over strings containing the body of the HTTP response.
2603 assert request_type
== instance
.INTERACTIVE_REQUEST
2605 start_time
= time
.time()
2606 timeout_time
= start_time
+ self
._MAX
_REQUEST
_WAIT
_TIME
2608 while time
.time() < timeout_time
:
2609 new_instance
= False
2610 with self
._inst
_lock
:
2612 self
._inst
= self
._instance
_factory
.new_instance(
2613 AutoScalingModule
.generate_instance_id(),
2614 expect_ready_request
=False)
2622 return inst
.handle(environ
, start_response
, url_map
, match
,
2623 request_id
, request_type
)
2624 except instance
.CannotAcceptRequests
:
2625 inst
.wait(timeout_time
)
2627 # If the instance is restarted while handling a request then the
2628 # exception raises is unpredictable.
2629 if inst
!= self
._inst
:
2630 start_response('503 Service Unavailable', [])
2631 return ['Instance was restarted while executing command']
2632 logging
.exception('Unexpected exception handling command: %r', environ
)
2635 start_response('503 Service Unavailable', [])
2636 return ['The command timed-out while waiting for another one to complete']
2639 """Restarts the module."""
2640 with self
._inst
_lock
:
2642 self
._inst
.quit(force
=True)
2645 def send_interactive_command(self
, command
):
2646 """Sends an interactive command to the module.
2649 command: The command to send e.g. "print 5+5".
2652 A string representing the result of the command e.g. "10\n".
2655 InteractiveCommandError: if the command failed for any reason.
2657 start_response
= start_response_utils
.CapturingStartResponse()
2659 # 192.0.2.0 is an example address defined in RFC 5737.
2660 environ
= self
.build_request_environ(
2661 'POST', '/', [], command
, '192.0.2.0', self
.balanced_port
)
2664 response
= self
._handle
_request
(
2667 request_type
=instance
.INTERACTIVE_REQUEST
)
2668 except Exception as e
:
2669 raise InteractiveCommandError('Unexpected command failure: ', str(e
))
2671 if start_response
.status
!= '200 OK':
2672 raise InteractiveCommandError(start_response
.merged_response(response
))
2674 return start_response
.merged_response(response
)