3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 This module allows apps to flush logs, provide status messages, and
22 programmatically access their request and application logs.
30 from __future__
import with_statement
42 from google
.net
.proto
import ProtocolBuffer
43 from google
.appengine
.api
import api_base_pb
44 from google
.appengine
.api
import apiproxy_stub_map
45 from google
.appengine
.api
.logservice
import log_service_pb
46 from google
.appengine
.api
.logservice
import logsutil
47 from google
.appengine
.datastore
import datastore_rpc
48 from google
.appengine
.runtime
import apiproxy_errors
51 AUTOFLUSH_ENABLED
= True
54 AUTOFLUSH_EVERY_SECONDS
= 60
57 AUTOFLUSH_EVERY_BYTES
= 4096
60 AUTOFLUSH_EVERY_LINES
= 50
63 MAX_ITEMS_PER_FETCH
= 1000
66 LOG_LEVEL_DEBUG
= logsutil
.LOG_LEVEL_DEBUG
67 LOG_LEVEL_INFO
= logsutil
.LOG_LEVEL_INFO
68 LOG_LEVEL_WARNING
= logsutil
.LOG_LEVEL_WARNING
69 LOG_LEVEL_ERROR
= logsutil
.LOG_LEVEL_ERROR
70 LOG_LEVEL_CRITICAL
= logsutil
.LOG_LEVEL_CRITICAL
73 MODULE_ID_RE_STRING
= r
'(?!-)[a-z\d\-]{1,63}'
76 MODULE_VERSION_RE_STRING
= r
'(?!-)[a-z\d\-]{1,100}'
77 _MAJOR_VERSION_ID_PATTERN
= r
'^(?:(?:(%s):)?)(%s)$' % (MODULE_ID_RE_STRING
,
78 MODULE_VERSION_RE_STRING
)
80 _MAJOR_VERSION_ID_RE
= re
.compile(_MAJOR_VERSION_ID_PATTERN
)
82 _REQUEST_ID_PATTERN
= r
'^[\da-fA-F]+$'
83 _REQUEST_ID_RE
= re
.compile(_REQUEST_ID_PATTERN
)
86 class Error(Exception):
87 """Base error class for this module."""
90 class InvalidArgumentError(Error
):
91 """Function argument has invalid value."""
94 class TimeoutError(Error
):
95 """Requested timeout for fetch() call has expired while iterating results."""
97 def __init__(self
, msg
, offset
, last_end_time
):
98 Error
.__init
__(self
, msg
)
99 self
.__offset
= offset
100 self
.__last
_end
_time
= last_end_time
104 """Binary offset indicating the current position in the result stream.
106 May be submitted to future Log read requests to continue iterating logs
107 starting exactly where this iterator left off.
110 A byte string representing an offset into the log stream, or None.
115 def last_end_time(self
):
116 """End time of the last request examined prior to the timeout, or None.
119 A float representing the completion time in seconds since the Unix
120 epoch of the last request examined.
122 return self
.__last
_end
_time
125 class LogsBufferNew(object):
126 """Threadsafe buffer for storing and periodically flushing app logs."""
128 def __init__(self
, stream
=None, stderr
=False):
129 """Initializes the buffer, which wraps an internal buffer or sys.stderr.
131 The state of the LogsBuffer is protected by a separate lock. The lock is
132 acquired before any variables are mutated or accessed, and released
133 afterward. A recursive lock is used so that a single thread can acquire the
134 lock multiple times, and release it only when an identical number of
135 'unlock()' calls have been performed.
138 stream: Unused. Left there for backward compatibility.
139 stderr: If specified, use sys.stderr as the underlying stream.
142 ValueError: if stream is provided.
144 if stream
is not None:
145 raise ValueError('underlying streams are no longer supported')
150 self
._buffer
= collections
.deque()
151 self
._stderr
= stderr
152 self
._lock
= threading
.RLock()
155 _MAX_FLUSH_SIZE
= 1000 * 1000
156 _MAX_LINE_SIZE
= _MAX_FLUSH_SIZE
159 def _truncate(line
, max_length
=_MAX_LINE_SIZE
):
160 """Truncates a potentially long log down to a specified maximum length."""
161 if len(line
) > max_length
:
162 original_length
= len(line
)
163 suffix
= '...(length %d)' % original_length
164 line
= line
[:max_length
- len(suffix
)] + suffix
168 """Returns the underlying file-like object used to buffer logs."""
175 return cStringIO
.StringIO(self
.contents())
178 """Returns the number of log lines currently buffered."""
180 return len(self
._buffer
)
183 """Returns the size of the log buffer, in bytes."""
188 """Returns the number of seconds since the log buffer was flushed."""
190 return time
.time() - self
._flush
_time
192 def flush_time(self
):
193 """Returns last time that the log buffer was flushed."""
195 return self
._flush
_time
198 """Returns the contents of the logs buffer."""
200 return self
._contents
()
203 """Internal version of contents() with no locking."""
204 return ''.join(self
._buffer
)
207 """Resets the buffer state, without clearing the underlying stream."""
212 """Internal version of reset() with no locking."""
213 self
._bytes
= sum(len(line
) for line
in self
._buffer
)
214 self
._flush
_time
= time
.time()
215 self
._request
= logsutil
.RequestID()
218 """Clears the contents of the logs buffer, and resets autoflush state."""
223 """Internal version of clear() with no locking."""
228 """Closes the underlying stream, flushing the current contents."""
233 """Internal version of close() with no locking."""
236 def parse_logs(self
):
237 """Parse the contents of the buffer and return an array of log lines."""
238 without_newlines
= (line
[:-1] if line
[-1] == '\n' else line
239 for line
in self
._buffer
)
240 return [logsutil
.ParseLogEntry(line
) for line
in without_newlines
if line
]
242 def write(self
, line
):
243 """Writes a line to the logs buffer."""
245 return self
._write
(line
)
247 def writelines(self
, seq
):
248 """Writes each line in the given sequence to the logs buffer."""
252 def _put_line(self
, line
):
253 """Write the line in the internal buffer for the next flush."""
254 self
._buffer
.append(line
)
255 self
._bytes
+= len(line
)
258 """Get and deque the oldest log line from the internal buffer."""
259 line
= self
._buffer
.popleft()
260 self
._bytes
-= len(line
)
263 def _rollback_line(self
, line
):
264 """Write back the line as the oldest in the internal buffer."""
265 self
._buffer
.appendleft(line
)
266 self
._bytes
+= len(line
)
268 def _write(self
, line
):
269 """Writes a line to the logs buffer."""
270 if self
._request
!= logsutil
.RequestID():
275 sys
.stderr
.write(line
)
281 """Flushes the contents of the logs buffer.
283 This method holds the buffer lock until the API call has finished to ensure
284 that flush calls are performed in the correct order, so that log messages
285 written during the flush call aren't dropped or accidentally wiped, and so
286 that the other buffer state variables (flush time, lines, bytes) are updated
287 synchronously with the flush.
293 """Internal version of flush() with no locking."""
298 lines_to_be_flushed
= []
301 group
= log_service_pb
.UserAppLogGroup()
302 bytes_left
= LogsBufferNew
._MAX
_FLUSH
_SIZE
304 bare_line
= self
._get
_line
()
306 timestamp_usec
, level
, message
= logsutil
.ParseLogEntry(bare_line
)
308 if message
[-1] == '\n':
309 message
= message
[:-1]
316 message
= LogsBufferNew
._truncate
(
317 message
, LogsBufferNew
._MAX
_LINE
_SIZE
)
320 if len(message
) > bytes_left
:
321 self
._rollback
_line
(bare_line
)
324 lines_to_be_flushed
.append(bare_line
)
326 line
= group
.add_log_line()
327 line
.set_timestamp_usec(timestamp_usec
)
328 line
.set_level(level
)
329 line
.set_message(message
)
331 bytes_left
-= 1 + group
.lengthString(line
.ByteSize())
333 request
= log_service_pb
.FlushRequest()
334 request
.set_logs(group
.Encode())
335 response
= api_base_pb
.VoidProto()
336 apiproxy_stub_map
.MakeSyncCall('logservice', 'Flush', request
, response
)
339 except apiproxy_errors
.CancelledError
:
342 lines_to_be_flushed
.reverse()
343 self
._buffer
.extendleft(lines_to_be_flushed
)
345 lines_to_be_flushed
.reverse()
346 self
._buffer
.extendleft(lines_to_be_flushed
)
349 msg
= 'ERROR: Could not flush to log_service (%s)\n%s\n%s\n%s\n'
350 sys
.stderr
.write(msg
% (str(e
), line
, '\n'.join(self
._buffer
), line
))
357 """Flushes the buffer if certain conditions have been met."""
359 return self
._autoflush
()
361 def _autoflush(self
):
362 """Internal version of autoflush() with no locking."""
363 if not self
.autoflush_enabled():
366 if ((AUTOFLUSH_EVERY_SECONDS
and self
.age() >= AUTOFLUSH_EVERY_SECONDS
) or
367 (AUTOFLUSH_EVERY_LINES
and self
.lines() >= AUTOFLUSH_EVERY_LINES
) or
368 (AUTOFLUSH_EVERY_BYTES
and self
.bytes() >= AUTOFLUSH_EVERY_BYTES
)):
371 def autoflush_enabled(self
):
372 """Indicates if the buffer will periodically flush logs during a request."""
373 return AUTOFLUSH_ENABLED
377 """Returns the LogsBuffer used by the current request."""
382 return _global_buffer
386 """Adds 'message' to the logs buffer, and checks for autoflush.
389 message: A message (string) to be written to application logs.
391 logs_buffer().write(message
)
395 """Clear the logs buffer and reset the autoflush state."""
396 logs_buffer().clear()
400 """If AUTOFLUSH conditions have been met, performs a Flush API call."""
401 logs_buffer().autoflush()
405 """Flushes log lines that are currently buffered."""
406 logs_buffer().flush()
410 """Returns last time that the logs buffer was flushed."""
411 return logs_buffer().flush_time()
414 def log_buffer_age():
415 """Returns the number of seconds since the logs buffer was flushed."""
416 return logs_buffer().age()
419 def log_buffer_contents():
420 """Returns the contents of the logs buffer."""
421 return logs_buffer().contents()
424 def log_buffer_bytes():
425 """Returns the size of the logs buffer, in bytes."""
426 return logs_buffer().bytes()
429 def log_buffer_lines():
430 """Returns the number of log lines currently buffered."""
431 return logs_buffer().lines()
434 class _LogQueryResult(object):
435 """A container that holds a log request and provides an iterator to read logs.
437 A _LogQueryResult object is the standard returned item for a call to fetch().
438 It is iterable - each value returned is a log that the user has queried for,
439 and internally, it holds a cursor that it uses to fetch more results once the
440 current, locally held set, are exhausted.
443 _request: A LogReadRequest that contains the parameters the user has set for
444 the initial fetch call, which will be updated with a more current cursor
445 if more logs are requested.
446 _logs: A list of RequestLogs corresponding to logs the user has asked for.
447 _read_called: A boolean that indicates if a Read call has even been made
448 with the request stored in this object.
451 def __init__(self
, request
, timeout
=None):
455 request: A LogReadRequest object that will be used for Read calls.
457 self
._request
= request
459 self
._read
_called
= False
460 self
._last
_end
_time
= None
461 self
._end
_time
= None
462 if timeout
is not None:
463 self
._end
_time
= time
.time() + timeout
466 """Provides an iterator that yields log records one at a time."""
468 for log_item
in self
._logs
:
469 yield RequestLog(log_item
)
470 if not self
._read
_called
or self
._request
.has_offset():
471 if self
._end
_time
and time
.time() >= self
._end
_time
:
473 if self
._request
.has_offset():
474 offset
= self
._request
.offset().Encode()
475 raise TimeoutError('A timeout occurred while iterating results',
476 offset
=offset
, last_end_time
=self
._last
_end
_time
)
477 self
._read
_called
= True
483 """Acquires additional logs via cursor.
485 This method is used by the iterator when it has exhausted its current set of
486 logs to acquire more logs and update its internal structures accordingly.
488 response
= log_service_pb
.LogReadResponse()
491 apiproxy_stub_map
.MakeSyncCall('logservice', 'Read', self
._request
,
493 except apiproxy_errors
.ApplicationError
, e
:
494 if e
.application_error
== log_service_pb
.LogServiceError
.INVALID_REQUEST
:
495 raise InvalidArgumentError(e
.error_detail
)
496 raise Error(e
.error_detail
)
498 self
._logs
= response
.log_list()
499 self
._request
.clear_offset()
500 if response
.has_offset():
501 self
._request
.mutable_offset().CopyFrom(response
.offset())
502 self
._last
_end
_time
= None
503 if response
.has_last_end_time():
504 self
._last
_end
_time
= response
.last_end_time() / 1e6
507 class RequestLog(object):
508 """Complete log information about a single request to an application."""
510 def __init__(self
, request_log
=None):
511 if type(request_log
) is str:
512 self
.__pb
= log_service_pb
.RequestLog(base64
.b64decode(request_log
))
513 elif request_log
.__class
__ == log_service_pb
.RequestLog
:
514 self
.__pb
= request_log
516 self
.__pb
= log_service_pb
.RequestLog()
520 return 'RequestLog(\'%s\')' % base64
.b64encode(self
.__pb
.Encode())
523 if self
.module_id
== 'default':
524 return ('<RequestLog(app_id=%s, version_id=%s, request_id=%s)>' %
525 (self
.app_id
, self
.version_id
, base64
.b64encode(self
.request_id
)))
527 return ('<RequestLog(app_id=%s, module_id=%s, version_id=%s, '
529 (self
.app_id
, self
.module_id
, self
.version_id
,
530 base64
.b64encode(self
.request_id
)))
538 """Application id that handled this request, as a string."""
539 return self
.__pb
.app_id()
543 """Module id that handled this request, as a string."""
544 return self
.__pb
.module_id()
547 def version_id(self
):
548 """Version of the application that handled this request, as a string."""
549 return self
.__pb
.version_id()
552 def request_id(self
):
553 """Globally unique identifier for a request, based on request start time.
555 Request ids for requests which started later will compare greater as
556 binary strings than those for requests which started earlier.
559 A byte string containing a unique identifier for this request.
561 return self
.__pb
.request_id()
565 """Binary offset indicating current position in the result stream.
567 May be submitted to future Log read requests to continue immediately after
571 A byte string representing an offset into the active result stream.
573 if self
.__pb
.has_offset():
574 return self
.__pb
.offset().Encode()
579 """The origin IP address of the request, as a string."""
580 return self
.__pb
.ip()
584 """Nickname of the user that made the request if known and logged in.
587 A string representation of the logged in user's nickname, or None.
589 if self
.__pb
.has_nickname():
590 return self
.__pb
.nickname()
594 def start_time(self
):
595 """Time at which request was known to have begun processing.
598 A float representing the time this request began processing in seconds
599 since the Unix epoch.
601 return self
.__pb
.start_time() / 1e6
605 """Time at which request was known to have completed.
608 A float representing the request completion time in seconds since the
611 return self
.__pb
.end_time() / 1e6
615 """Time required to process request in seconds, as a float."""
616 return self
.__pb
.latency() / 1e6
620 """Number of machine cycles used to process request, as an integer."""
621 return self
.__pb
.mcycles()
625 """Request method (GET, PUT, POST, etc), as a string."""
626 return self
.__pb
.method()
630 """Resource path on server requested by client.
632 For example, http://nowhere.com/app would have a resource string of '/app'.
635 A string containing the path component of the request URL.
637 return self
.__pb
.resource()
640 def http_version(self
):
641 """HTTP version of request, as a string."""
642 return self
.__pb
.http_version()
646 """Response status of request, as an int."""
647 return self
.__pb
.status()
650 def response_size(self
):
651 """Size in bytes sent back to client by request, as a long."""
652 return self
.__pb
.response_size()
656 """Referrer URL of request as a string, or None."""
657 if self
.__pb
.has_referrer():
658 return self
.__pb
.referrer()
662 def user_agent(self
):
663 """User agent used to make the request as a string, or None."""
664 if self
.__pb
.has_user_agent():
665 return self
.__pb
.user_agent()
669 def url_map_entry(self
):
670 """File or class within URL mapping used for request.
672 Useful for tracking down the source code which was responsible for managing
673 request, especially for multiply mapped handlers.
676 A string containing a file or class name.
678 return self
.__pb
.url_map_entry()
682 """Apache combined log entry for request.
684 The information in this field can be constructed from the rest of
685 this message, however, this field is included for convenience.
688 A string containing an Apache-style log line in the form documented at
689 http://httpd.apache.org/docs/1.3/logs.html.
691 return self
.__pb
.combined()
694 def api_mcycles(self
):
695 """Number of machine cycles spent in API calls while processing request.
697 Deprecated. This value is no longer meaningful.
700 Number of API machine cycles used as a long, or None if not available.
702 warnings
.warn('api_mcycles does not return a meaningful value.',
703 DeprecationWarning, stacklevel
=2)
704 if self
.__pb
.has_api_mcycles():
705 return self
.__pb
.api_mcycles()
710 """The Internet host and port number of the resource being requested.
713 A string representing the host and port receiving the request, or None
716 if self
.__pb
.has_host():
717 return self
.__pb
.host()
722 """The estimated cost of this request, in fractional dollars.
725 A float representing an estimated fractional dollar cost of this
726 request, or None if not available.
728 if self
.__pb
.has_cost():
729 return self
.__pb
.cost()
733 def task_queue_name(self
):
734 """The request's queue name, if generated via the Task Queue API.
737 A string containing the request's queue name if relevant, or None.
739 if self
.__pb
.has_task_queue_name():
740 return self
.__pb
.task_queue_name()
745 """The request's task name, if generated via the Task Queue API.
748 A string containing the request's task name if relevant, or None.
750 if self
.__pb
.has_task_name():
751 return self
.__pb
.task_name()
754 def was_loading_request(self
):
755 """Returns whether this request was a loading request for an instance.
758 A bool indicating whether this request was a loading request.
760 return bool(self
.__pb
.was_loading_request())
763 def pending_time(self
):
764 """Time this request spent in the pending request queue.
767 A float representing the time in seconds that this request was pending.
769 return self
.__pb
.pending_time() / 1e6
772 def replica_index(self
):
773 """The module replica that handled the request as an integer, or None."""
774 if self
.__pb
.has_replica_index():
775 return self
.__pb
.replica_index()
780 """Whether or not this log represents a finished request, as a bool."""
781 return bool(self
.__pb
.finished())
784 def instance_key(self
):
785 """Mostly-unique identifier for the instance that handled the request.
788 A string encoding of an instance key if available, or None.
790 if self
.__pb
.has_clone_key():
791 return self
.__pb
.clone_key()
796 """Logs emitted by the application while serving this request.
799 A list of AppLog objects representing the log lines for this request, or
800 an empty list if none were emitted or the query did not request them.
802 if not self
.__lines
and self
.__pb
.line_size():
803 self
.__lines
= [AppLog(time
=line
.time() / 1e6
, level
=line
.level(),
804 message
=line
.log_message())
805 for line
in self
.__pb
.line_list()]
809 def app_engine_release(self
):
810 """App Engine Infrastructure release that served this request.
813 A string containing App Engine version that served this request, or None
816 if self
.__pb
.has_app_engine_release():
817 return self
.__pb
.app_engine_release()
821 class AppLog(object):
822 """Application log line emitted while processing a request."""
826 def __init__(self
, time
=None, level
=None, message
=None):
829 self
._message
= message
831 def __eq__(self
, other
):
832 return (self
.time
== other
.time
and self
.level
and other
.level
and
833 self
.message
== other
.message
)
836 return ('AppLog(time=%f, level=%d, message=\'%s\')' %
837 (self
.time
, self
.level
, self
.message
))
841 """Time log entry was made, in seconds since the Unix epoch, as a float."""
846 """Level or severity of log, as an int."""
851 """Application-provided log message, as a string."""
856 _FETCH_KWARGS
= frozenset(['prototype_request', 'timeout', 'batch_size',
860 @datastore_rpc._positional
(0)
861 def fetch(start_time
=None,
864 minimum_log_level
=None,
865 include_incomplete
=False,
866 include_app_logs
=False,
867 module_versions
=None,
871 """Returns an iterator yielding an application's request and application logs.
873 Logs will be returned by the iterator in reverse chronological order by
874 request end time, or by last flush time for requests still in progress (if
875 requested). The items yielded are RequestLog objects, the contents of which
876 are accessible via method calls.
878 All parameters are optional.
881 start_time: The earliest request completion or last-update time that
882 results should be fetched for, in seconds since the Unix epoch.
883 end_time: The latest request completion or last-update time that
884 results should be fetched for, in seconds since the Unix epoch.
885 offset: A byte string representing an offset into the log stream, extracted
886 from a previously emitted RequestLog. This iterator will begin
887 immediately after the record from which the offset came.
888 minimum_log_level: An application log level which serves as a filter on the
889 requests returned--requests with no application log at or above the
890 specified level will be omitted. Works even if include_app_logs is not
891 True. In ascending order, the available log levels are:
892 logservice.LOG_LEVEL_DEBUG, logservice.LOG_LEVEL_INFO,
893 logservice.LOG_LEVEL_WARNING, logservice.LOG_LEVEL_ERROR,
894 and logservice.LOG_LEVEL_CRITICAL.
895 include_incomplete: Whether or not to include requests that have started but
896 not yet finished, as a boolean. Defaults to False.
897 include_app_logs: Whether or not to include application level logs in the
898 results, as a boolean. Defaults to False.
899 module_versions: A list of tuples of the form (module, version), that
900 indicate that the logs for the given module/version combination should be
901 fetched. Duplicate tuples will be ignored. This kwarg may not be used
902 in conjunction with the 'version_ids' kwarg.
903 version_ids: A list of version ids whose logs should be queried against.
904 Defaults to the application's current version id only. This kwarg may not
905 be used in conjunction with the 'module_versions' kwarg.
906 request_ids: If not None, indicates that instead of a time-based scan, logs
907 for the specified requests should be returned. Malformed request IDs will
908 cause the entire request to be rejected, while any requests that are
909 unknown will be ignored. This option may not be combined with any
910 filtering options such as start_time, end_time, offset, or
911 minimum_log_level. version_ids is ignored. IDs that do not correspond to
912 a request log will be ignored. Logs will be returned in the order
916 An iterable object containing the logs that the user has queried for.
919 InvalidArgumentError: Raised if any of the input parameters are not of the
923 args_diff
= set(kwargs
) - _FETCH_KWARGS
925 raise InvalidArgumentError('Invalid arguments: %s' % ', '.join(args_diff
))
927 request
= log_service_pb
.LogReadRequest()
929 request
.set_app_id(os
.environ
['APPLICATION_ID'])
931 if start_time
is not None:
932 if not isinstance(start_time
, (float, int, long)):
933 raise InvalidArgumentError('start_time must be a float or integer')
934 request
.set_start_time(long(start_time
* 1000000))
936 if end_time
is not None:
937 if not isinstance(end_time
, (float, int, long)):
938 raise InvalidArgumentError('end_time must be a float or integer')
939 request
.set_end_time(long(end_time
* 1000000))
941 if offset
is not None:
943 request
.mutable_offset().ParseFromString(offset
)
944 except (TypeError, ProtocolBuffer
.ProtocolBufferDecodeError
):
945 raise InvalidArgumentError('offset must be a string or read-only buffer')
947 if minimum_log_level
is not None:
948 if not isinstance(minimum_log_level
, int):
949 raise InvalidArgumentError('minimum_log_level must be an int')
951 if minimum_log_level
not in logsutil
.LOG_LEVELS
:
952 raise InvalidArgumentError(
953 'minimum_log_level must be one of %s' % repr(
954 logsutil
.LOG_LEVELS
))
955 request
.set_minimum_log_level(minimum_log_level
)
957 if not isinstance(include_incomplete
, bool):
958 raise InvalidArgumentError('include_incomplete must be a boolean')
959 request
.set_include_incomplete(include_incomplete
)
961 if not isinstance(include_app_logs
, bool):
962 raise InvalidArgumentError('include_app_logs must be a boolean')
963 request
.set_include_app_logs(include_app_logs
)
965 if 'server_versions' in kwargs
:
966 logging
.warning('The server_versions kwarg to the fetch() method is '
967 'deprecated. Please use the module_versions kwarg '
969 module_versions
= kwargs
.pop('server_versions')
970 if version_ids
and module_versions
:
971 raise InvalidArgumentError('version_ids and module_versions may not be '
972 'used at the same time.')
974 if version_ids
is None and module_versions
is None:
975 module_version
= request
.add_module_version()
976 if os
.environ
['CURRENT_MODULE_ID'] != 'default':
978 module_version
.set_module_id(os
.environ
['CURRENT_MODULE_ID'])
979 module_version
.set_version_id(
980 os
.environ
['CURRENT_VERSION_ID'].split('.')[0])
983 if not isinstance(module_versions
, list):
984 raise InvalidArgumentError('module_versions must be a list')
986 req_module_versions
= set()
987 for entry
in module_versions
:
988 if not isinstance(entry
, (list, tuple)):
989 raise InvalidArgumentError('module_versions list entries must all be '
992 raise InvalidArgumentError('module_versions list entries must all be '
994 req_module_versions
.add((entry
[0], entry
[1]))
996 for module
, version
in sorted(req_module_versions
):
997 req_module_version
= request
.add_module_version()
1000 if module
!= 'default':
1001 req_module_version
.set_module_id(module
)
1002 req_module_version
.set_version_id(version
)
1005 if not isinstance(version_ids
, list):
1006 raise InvalidArgumentError('version_ids must be a list')
1007 for version_id
in version_ids
:
1008 if not _MAJOR_VERSION_ID_RE
.match(version_id
):
1009 raise InvalidArgumentError(
1010 'version_ids must only contain valid major version identifiers')
1011 request
.add_module_version().set_version_id(version_id
)
1013 if request_ids
is not None:
1014 if not isinstance(request_ids
, list):
1015 raise InvalidArgumentError('request_ids must be a list')
1017 raise InvalidArgumentError('request_ids must not be empty')
1018 if len(request_ids
) != len(set(request_ids
)):
1019 raise InvalidArgumentError('request_ids must not contain duplicates')
1020 for request_id
in request_ids
:
1021 if not _REQUEST_ID_RE
.match(request_id
):
1022 raise InvalidArgumentError(
1023 '%s is not a valid request log id' % request_id
)
1024 request
.request_id_list()[:] = request_ids
1026 prototype_request
= kwargs
.get('prototype_request')
1027 if prototype_request
:
1028 if not isinstance(prototype_request
, log_service_pb
.LogReadRequest
):
1029 raise InvalidArgumentError('prototype_request must be a LogReadRequest')
1030 request
.MergeFrom(prototype_request
)
1032 timeout
= kwargs
.get('timeout')
1033 if timeout
is not None:
1034 if not isinstance(timeout
, (float, int, long)):
1035 raise InvalidArgumentError('timeout must be a float or integer')
1037 batch_size
= kwargs
.get('batch_size')
1038 if batch_size
is not None:
1039 if not isinstance(batch_size
, (int, long)):
1040 raise InvalidArgumentError('batch_size must be an integer')
1043 raise InvalidArgumentError('batch_size must be greater than zero')
1045 if batch_size
> MAX_ITEMS_PER_FETCH
:
1046 raise InvalidArgumentError('batch_size specified is too large')
1047 request
.set_count(batch_size
)
1049 return _LogQueryResult(request
, timeout
=timeout
)
1059 class LogsBufferOld(object):
1060 """Threadsafe buffer for storing and periodically flushing app logs."""
1062 _MAX_FLUSH_SIZE
= 1000 * 1000
1063 _MAX_LINE_SIZE
= _MAX_FLUSH_SIZE
1064 assert _MAX_LINE_SIZE
<= _MAX_FLUSH_SIZE
1066 def __init__(self
, stream
=None, stderr
=False):
1067 """Initializes the buffer, which wraps the given stream or sys.stderr.
1069 The state of the LogsBuffer is protected by a separate lock. The lock is
1070 acquired before any variables are mutated or accessed, and released
1071 afterward. A recursive lock is used so that a single thread can acquire the
1072 lock multiple times, and release it only when an identical number of
1073 'unlock()' calls have been performed.
1076 stream: A file-like object to store logs. Defaults to a cStringIO object.
1077 stderr: If specified, use sys.stderr as the underlying stream.
1079 self
._stderr
= stderr
1081 assert stream
is None
1083 self
._stream
= stream
or cStringIO
.StringIO()
1084 self
._lock
= threading
.RLock()
1088 """Returns the underlying file-like object used to buffer logs."""
1097 """Returns the number of log lines currently buffered."""
1102 """Returns the size of the log buffer, in bytes."""
1107 """Returns the number of seconds since the log buffer was flushed."""
1109 return time
.time() - self
._flush
_time
1111 def flush_time(self
):
1112 """Returns last time that the log buffer was flushed."""
1114 return self
._flush
_time
1117 """Returns the contents of the logs buffer."""
1119 return self
._contents
()
1121 def _contents(self
):
1122 """Internal version of contents() with no locking."""
1124 return self
.stream().getvalue()
1125 except AttributeError:
1131 """Resets the buffer state, without clearing the underlying stream."""
1133 return self
._reset
()
1136 """Internal version of reset() with no locking."""
1137 contents
= self
._contents
()
1138 self
._bytes
= len(contents
)
1139 self
._lines
= len(contents
.split('\n')) - 1
1140 self
._flush
_time
= time
.time()
1141 self
._request
= logsutil
.RequestID()
1144 """Clears the contents of the logs buffer, and resets autoflush state."""
1146 return self
._clear
()
1149 """Internal version of clear() with no locking."""
1151 self
.stream().truncate(0)
1155 """Closes the underlying stream, flushing the current contents."""
1157 return self
._close
()
1160 """Internal version of close() with no locking."""
1162 self
.stream().close()
1164 def parse_logs(self
):
1165 """Parse the contents of the buffer and return an array of log lines."""
1166 return logsutil
.ParseLogs(self
.contents())
1168 def write(self
, line
):
1169 """Writes a line to the logs buffer."""
1171 return self
._write
(line
)
1173 def writelines(self
, seq
):
1174 """Writes each line in the given sequence to the logs buffer."""
1178 def _write(self
, line
):
1179 """Writes a line to the logs buffer."""
1180 if self
._request
!= logsutil
.RequestID():
1184 self
.stream().write(line
)
1187 self
._bytes
+= len(line
)
1191 def _truncate(line
, max_length
=_MAX_LINE_SIZE
):
1192 """Truncates a potentially long log down to a specified maximum length."""
1193 if len(line
) > max_length
:
1194 original_length
= len(line
)
1195 suffix
= '...(length %d)' % original_length
1196 line
= line
[:max_length
- len(suffix
)] + suffix
1200 """Flushes the contents of the logs buffer.
1202 This method holds the buffer lock until the API call has finished to ensure
1203 that flush calls are performed in the correct order, so that log messages
1204 written during the flush call aren't dropped or accidentally wiped, and so
1205 that the other buffer state variables (flush time, lines, bytes) are updated
1206 synchronously with the flush.
1212 """Internal version of flush() with no locking."""
1213 logs
= self
.parse_logs()
1217 group
= log_service_pb
.UserAppLogGroup()
1220 for timestamp_usec
, level
, message
in logs
:
1223 message
= self
._truncate
(message
, LogsBufferOld
._MAX
_LINE
_SIZE
)
1225 if byte_size
+ len(message
) > LogsBufferOld
._MAX
_FLUSH
_SIZE
:
1227 line
= group
.add_log_line()
1228 line
.set_timestamp_usec(timestamp_usec
)
1229 line
.set_level(level
)
1230 line
.set_message(message
)
1231 byte_size
+= 1 + group
.lengthString(line
.ByteSize())
1233 assert n
> 0 or not logs
1236 request
= log_service_pb
.FlushRequest()
1237 request
.set_logs(group
.Encode())
1238 response
= api_base_pb
.VoidProto()
1239 apiproxy_stub_map
.MakeSyncCall('logservice', 'Flush', request
, response
)
1243 def autoflush(self
):
1244 """Flushes the buffer if certain conditions have been met."""
1246 return self
._autoflush
()
1248 def _autoflush(self
):
1249 """Internal version of autoflush() with no locking."""
1250 if not self
.autoflush_enabled():
1253 if ((AUTOFLUSH_EVERY_SECONDS
and self
.age() >= AUTOFLUSH_EVERY_SECONDS
) or
1254 (AUTOFLUSH_EVERY_LINES
and self
.lines() >= AUTOFLUSH_EVERY_LINES
) or
1255 (AUTOFLUSH_EVERY_BYTES
and self
.bytes() >= AUTOFLUSH_EVERY_BYTES
)):
1258 def autoflush_enabled(self
):
1259 """Indicates if the buffer will periodically flush logs during a request."""
1260 return AUTOFLUSH_ENABLED
1265 LogsBuffer
= LogsBufferOld
1268 _global_buffer
= LogsBuffer(stderr
=True)